根据唯一 ID 列表删除 Big Query 表中的批量行

如何解决根据唯一 ID 列表删除 Big Query 表中的批量行

所以我尝试使用这样的简单查询删除 Big Query 表中的一些行:

client = bigquery.Client()
query = "DELETE FROM Sample_Dataset.Sample_Table WHERE Sample_Table_Id IN {}".format(primary_key_list)
query_job = client.query(query,location="us-west1")

其中primary_key_list 是一些python 列表,其中包含一个Sample_Table 唯一ID 列表,例如:[123,124,125,...]

我检索的小数据运行良好,但是当 primary_key_list 增长时,它给了我一个错误:

查询太大。最大标准 SQL 查询长度为 1024.00K 字符,包括注释和空白字符。

我意识到查询将足够长以达到最大查询长度,在搜索堆栈溢出后,我发现有一个使用参数化查询的解决方案,因此我将代码更改为:

client = bigquery.Client()
query = "DELETE FROM Sample_Dataset.Sample_Table WHERE Sample_Table_Id IN UNNEST(@List_Sample_Table_Id)"
job_config = bigquery.QueryJobConfig(
    query_parameters=[
        bigquery.ArrayQueryParameter("List_Sample_Table_Id","INT64",primary_key_list),]
)
query_job = client.query(query,job_config=job_config)

它停止给我最大标准 SQL 查询长度,但返回另一个错误异常,知道从 Big Query 中删除批量行吗?

我不知道这些信息是否有用,但是,我正在 google cloud Dataproc 上运行此 python 代码,这只是我最近添加的一些功能,在添加此功能之前一切正常,这是我使用参数化查询运行删除得到的日志。

Traceback (most recent call last):
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py",line 320,in _send_until_done
    return self.connection.send(data)
  File "/opt/conda/default/lib/python3.7/site-packages/OpenSSL/SSL.py",line 1737,in send
    self._raise_ssl_error(self._ssl,result)
  File "/opt/conda/default/lib/python3.7/site-packages/OpenSSL/SSL.py",line 1639,in _raise_ssl_error
    raise SysCallError(errno,errorcode.get(errno))
OpenSSL.SSL.SysCallError: (32,'EPIPE')

During handling of the above exception,another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/connectionpool.py",line 600,in urlopen
    chunked=chunked)
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/connectionpool.py",line 354,in _make_request
    conn.request(method,url,**httplib_request_kw)
  File "/opt/conda/default/lib/python3.7/http/client.py",line 1244,in request
    self._send_request(method,body,headers,encode_chunked)
  File "/opt/conda/default/lib/python3.7/http/client.py",line 1290,in _send_request
    self.endheaders(body,encode_chunked=encode_chunked)
  File "/opt/conda/default/lib/python3.7/http/client.py",line 1239,in endheaders
    self._send_output(message_body,line 1065,in _send_output
    self.send(chunk)
  File "/opt/conda/default/lib/python3.7/http/client.py",line 987,in send
    self.sock.sendall(data)
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py",line 331,in sendall
    sent = self._send_until_done(data[total_sent:total_sent + SSL_WRITE_BLOCKSIZE])
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py",line 326,in _send_until_done
    raise SocketError(str(e))
OSError: (32,another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/default/lib/python3.7/site-packages/requests/adapters.py",line 449,in send
    timeout=timeout
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/connectionpool.py",line 638,in urlopen
    _stacktrace=sys.exc_info()[2])
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/util/retry.py",line 368,in increment
    raise six.reraise(type(error),error,_stacktrace)
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/packages/six.py",line 685,in reraise
    raise value.with_traceback(tb)
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/connectionpool.py",in _send_until_done
    raise SocketError(str(e))
urllib3.exceptions.ProtocolError: ('Connection aborted.',OSError("(32,'EPIPE')"))

During handling of the above exception,another exception occurred:

Traceback (most recent call last):
  File "/tmp/a05a15822be04a9abdc1ba05e317bb2f/ItemHistory-get.py",line 92,in <module>
    delete_duplicate_pk()
  File "/tmp/a05a15822be04a9abdc1ba05e317bb2f/ItemHistory-get.py",line 84,in delete_duplicate_pk
    query_job = client.query(query,job_config=job_config,location="asia-southeast2")
  File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/bigquery/client.py",line 2893,in query
    query_job._begin(retry=retry,timeout=timeout)
  File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/bigquery/job/query.py",line 1069,in _begin
    super(QueryJob,self)._begin(client=client,retry=retry,timeout=timeout)
  File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/bigquery/job/base.py",line 438,in _begin
    timeout=timeout,File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/bigquery/client.py",line 643,in _call_api
    return call()
  File "/opt/conda/default/lib/python3.7/site-packages/google/api_core/retry.py",line 286,in retry_wrapped_func
    on_error=on_error,File "/opt/conda/default/lib/python3.7/site-packages/google/api_core/retry.py",line 184,in retry_target
    return target()
  File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/_http.py",line 434,in api_request
    timeout=timeout,File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/_http.py",line 292,in _make_request
    method,data,target_object,timeout=timeout
  File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/_http.py",line 330,in _do_request
    url=url,method=method,headers=headers,data=data,timeout=timeout
  File "/opt/conda/default/lib/python3.7/site-packages/google/auth/transport/requests.py",line 470,in request
    **kwargs
  File "/opt/conda/default/lib/python3.7/site-packages/requests/sessions.py",line 533,in request
    resp = self.send(prep,**send_kwargs)
  File "/opt/conda/default/lib/python3.7/site-packages/requests/sessions.py",line 646,in send
    r = adapter.send(request,**kwargs)
  File "/opt/conda/default/lib/python3.7/site-packages/requests/adapters.py",line 498,in send
    raise ConnectionError(err,request=request)
requests.exceptions.ConnectionError: ('Connection aborted.','EPIPE')"))

解决方法

正如 @blackbishop 提到的,您可以尝试将请求升级到最新版本(在我的情况下它解决了问题),但由于我尝试更新批量行(假设 Big Query 中有 500.000+ 行,其中每个行有一个唯一的 id),结果它给了我一个我使用的小型机器类型 Dataproc 集群的超时(如果有人有资源可以尝试使用更好的 Dataproc 集群并且成功,请随时编辑此答案)。

因此,我使用此处记录的 Merge 语句: https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#merge_statement

脚本将是这样更新现有行(假设我有我检索的新数据并已将其加载到 Staging_Sample_Dataset.Staging_Sample_Table):

def merge_data():
    client = bigquery.Client()
    query = """MERGE INTO Sample_Dataset.Sample_Table st
            USING Staging_Sample_Dataset.Staging_Sample_Table ss
            ON st.Sample_Table_Id = ss.Sample_Table_Id
            WHEN MATCHED UPDATE SET st.Your_Column = ss.Your_Column -- and the list goes on...
            WHEN NOT MATCHED THEN INSERT ROW
            """
    query_job = client.query(query,location="asia-southeast2")
    results = query_job.result()

或者我可以删除批量行并在此函数执行后调用另一个函数来加载:

def bulk_delete():
    client = bigquery.Client()
    query = """MERGE INTO Sample_Dataset.Sample_Table st
            USING Staging_Sample_Dataset.Staging_Sample_Table sst
            ON st.Sample_Table_Id = sst.Sample_Table_Id
            WHEN MATCHED THEN DELETE
            """
    query_job = client.query(query,location="asia-southeast2")
    results = query_job.result()
,

这个概念是循环你拥有的 id 数组,然后对每个 id 运行删除

它只是结构,而不是实际代码。所以它可能需要的不仅仅是调整

set arrlength = ARRAY_LENGTH(primary_key_list)
client = bigquery.Client()
WHILE i < arrlength DO
   query = "delete from Sample_Dataset.Sample_Table WHERE Sample_Table_Id=primary_key_list[OFFSET(i)]"
   query_job = client.query(query,location="us-west1")
   set i = i+1;

END WHILE;

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-