如何解决根据唯一 ID 列表删除 Big Query 表中的批量行
所以我尝试使用这样的简单查询删除 Big Query 表中的一些行:
client = bigquery.Client()
query = "DELETE FROM Sample_Dataset.Sample_Table WHERE Sample_Table_Id IN {}".format(primary_key_list)
query_job = client.query(query,location="us-west1")
其中primary_key_list 是一些python 列表,其中包含一个Sample_Table 唯一ID 列表,例如:[123,124,125,...]
我检索的小数据运行良好,但是当 primary_key_list 增长时,它给了我一个错误:
查询太大。最大标准 SQL 查询长度为 1024.00K 字符,包括注释和空白字符。
我意识到查询将足够长以达到最大查询长度,在搜索堆栈溢出后,我发现有一个使用参数化查询的解决方案,因此我将代码更改为:
client = bigquery.Client()
query = "DELETE FROM Sample_Dataset.Sample_Table WHERE Sample_Table_Id IN UNNEST(@List_Sample_Table_Id)"
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ArrayQueryParameter("List_Sample_Table_Id","INT64",primary_key_list),]
)
query_job = client.query(query,job_config=job_config)
它停止给我最大标准 SQL 查询长度,但返回另一个错误异常,知道从 Big Query 中删除批量行吗?
我不知道这些信息是否有用,但是,我正在 google cloud Dataproc 上运行此 python 代码,这只是我最近添加的一些功能,在添加此功能之前一切正常,这是我使用参数化查询运行删除得到的日志。
Traceback (most recent call last):
File "/opt/conda/default/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py",line 320,in _send_until_done
return self.connection.send(data)
File "/opt/conda/default/lib/python3.7/site-packages/OpenSSL/SSL.py",line 1737,in send
self._raise_ssl_error(self._ssl,result)
File "/opt/conda/default/lib/python3.7/site-packages/OpenSSL/SSL.py",line 1639,in _raise_ssl_error
raise SysCallError(errno,errorcode.get(errno))
OpenSSL.SSL.SysCallError: (32,'EPIPE')
During handling of the above exception,another exception occurred:
Traceback (most recent call last):
File "/opt/conda/default/lib/python3.7/site-packages/urllib3/connectionpool.py",line 600,in urlopen
chunked=chunked)
File "/opt/conda/default/lib/python3.7/site-packages/urllib3/connectionpool.py",line 354,in _make_request
conn.request(method,url,**httplib_request_kw)
File "/opt/conda/default/lib/python3.7/http/client.py",line 1244,in request
self._send_request(method,body,headers,encode_chunked)
File "/opt/conda/default/lib/python3.7/http/client.py",line 1290,in _send_request
self.endheaders(body,encode_chunked=encode_chunked)
File "/opt/conda/default/lib/python3.7/http/client.py",line 1239,in endheaders
self._send_output(message_body,line 1065,in _send_output
self.send(chunk)
File "/opt/conda/default/lib/python3.7/http/client.py",line 987,in send
self.sock.sendall(data)
File "/opt/conda/default/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py",line 331,in sendall
sent = self._send_until_done(data[total_sent:total_sent + SSL_WRITE_BLOCKSIZE])
File "/opt/conda/default/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py",line 326,in _send_until_done
raise SocketError(str(e))
OSError: (32,another exception occurred:
Traceback (most recent call last):
File "/opt/conda/default/lib/python3.7/site-packages/requests/adapters.py",line 449,in send
timeout=timeout
File "/opt/conda/default/lib/python3.7/site-packages/urllib3/connectionpool.py",line 638,in urlopen
_stacktrace=sys.exc_info()[2])
File "/opt/conda/default/lib/python3.7/site-packages/urllib3/util/retry.py",line 368,in increment
raise six.reraise(type(error),error,_stacktrace)
File "/opt/conda/default/lib/python3.7/site-packages/urllib3/packages/six.py",line 685,in reraise
raise value.with_traceback(tb)
File "/opt/conda/default/lib/python3.7/site-packages/urllib3/connectionpool.py",in _send_until_done
raise SocketError(str(e))
urllib3.exceptions.ProtocolError: ('Connection aborted.',OSError("(32,'EPIPE')"))
During handling of the above exception,another exception occurred:
Traceback (most recent call last):
File "/tmp/a05a15822be04a9abdc1ba05e317bb2f/ItemHistory-get.py",line 92,in <module>
delete_duplicate_pk()
File "/tmp/a05a15822be04a9abdc1ba05e317bb2f/ItemHistory-get.py",line 84,in delete_duplicate_pk
query_job = client.query(query,job_config=job_config,location="asia-southeast2")
File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/bigquery/client.py",line 2893,in query
query_job._begin(retry=retry,timeout=timeout)
File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/bigquery/job/query.py",line 1069,in _begin
super(QueryJob,self)._begin(client=client,retry=retry,timeout=timeout)
File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/bigquery/job/base.py",line 438,in _begin
timeout=timeout,File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/bigquery/client.py",line 643,in _call_api
return call()
File "/opt/conda/default/lib/python3.7/site-packages/google/api_core/retry.py",line 286,in retry_wrapped_func
on_error=on_error,File "/opt/conda/default/lib/python3.7/site-packages/google/api_core/retry.py",line 184,in retry_target
return target()
File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/_http.py",line 434,in api_request
timeout=timeout,File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/_http.py",line 292,in _make_request
method,data,target_object,timeout=timeout
File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/_http.py",line 330,in _do_request
url=url,method=method,headers=headers,data=data,timeout=timeout
File "/opt/conda/default/lib/python3.7/site-packages/google/auth/transport/requests.py",line 470,in request
**kwargs
File "/opt/conda/default/lib/python3.7/site-packages/requests/sessions.py",line 533,in request
resp = self.send(prep,**send_kwargs)
File "/opt/conda/default/lib/python3.7/site-packages/requests/sessions.py",line 646,in send
r = adapter.send(request,**kwargs)
File "/opt/conda/default/lib/python3.7/site-packages/requests/adapters.py",line 498,in send
raise ConnectionError(err,request=request)
requests.exceptions.ConnectionError: ('Connection aborted.','EPIPE')"))
解决方法
正如 @blackbishop 提到的,您可以尝试将请求升级到最新版本(在我的情况下它解决了问题),但由于我尝试更新批量行(假设 Big Query 中有 500.000+ 行,其中每个行有一个唯一的 id),结果它给了我一个我使用的小型机器类型 Dataproc 集群的超时(如果有人有资源可以尝试使用更好的 Dataproc 集群并且成功,请随时编辑此答案)。
因此,我使用此处记录的 Merge 语句: https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#merge_statement
脚本将是这样更新现有行(假设我有我检索的新数据并已将其加载到 Staging_Sample_Dataset.Staging_Sample_Table):
def merge_data():
client = bigquery.Client()
query = """MERGE INTO Sample_Dataset.Sample_Table st
USING Staging_Sample_Dataset.Staging_Sample_Table ss
ON st.Sample_Table_Id = ss.Sample_Table_Id
WHEN MATCHED UPDATE SET st.Your_Column = ss.Your_Column -- and the list goes on...
WHEN NOT MATCHED THEN INSERT ROW
"""
query_job = client.query(query,location="asia-southeast2")
results = query_job.result()
或者我可以删除批量行并在此函数执行后调用另一个函数来加载:
def bulk_delete():
client = bigquery.Client()
query = """MERGE INTO Sample_Dataset.Sample_Table st
USING Staging_Sample_Dataset.Staging_Sample_Table sst
ON st.Sample_Table_Id = sst.Sample_Table_Id
WHEN MATCHED THEN DELETE
"""
query_job = client.query(query,location="asia-southeast2")
results = query_job.result()
,
这个概念是循环你拥有的 id 数组,然后对每个 id 运行删除
它只是结构,而不是实际代码。所以它可能需要的不仅仅是调整
set arrlength = ARRAY_LENGTH(primary_key_list)
client = bigquery.Client()
WHILE i < arrlength DO
query = "delete from Sample_Dataset.Sample_Table WHERE Sample_Table_Id=primary_key_list[OFFSET(i)]"
query_job = client.query(query,location="us-west1")
set i = i+1;
END WHILE;
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。