如何解决BigQuery MERGE意外的行重复
我正在使用标准SQL MERGE根据源外部表(它是存储桶中的一组CVS文件)在常规目标表上进行更新。这是一个简化的输入文件:
$ gsutil cat gs://dolphin-dev-raw/demo/input/demo_20191125_20200505050505.tsv
"id" "PortfolioCode" "ValuationDate" "load_checksum"
"1" "CIMDI000TT" "2020-03-28" "checksum1"
MERGE语句为:
MERGE xx_producer_conformed.demo T
USING xx_producer_raw.demo_raw S
ON
S.id = T.id
WHEN NOT MATCHED THEN
INSERT (id,PortfolioCode,ValuationDate,load_checksum,insert_time,file_name,extract_timestamp,wf_id)
VALUES (id,CURRENT_TIMESTAMP(),_FILE_NAME,REGEXP_EXTRACT(_FILE_NAME,'.*_[0-9]{8}_([0-9]{14}).tsv'),'scheduled__2020-08-19T16:24:00+00:00')
WHEN MATCHED AND S.load_checksum != T.load_checksum THEN UPDATE SET
T.id = S.id,T.PortfolioCode = S.PortfolioCode,T.ValuationDate = S.ValuationDate,T.load_checksum = S.load_checksum,T.file_name = S._FILE_NAME,T.extract_timestamp = REGEXP_EXTRACT(_FILE_NAME,T.wf_id = 'scheduled__2020-08-19T16:24:00+00:00'
如果我擦除目标表并重新运行MERGE,则行修改计数为1:
bq query --use_legacy_sql=false --location=asia-east2 "$(cat merge.sql | awk 'ORS=" "')"
Waiting on bqjob_r288f8d33_000001740b413532_1 ... (0s) Current status: DONE
Number of affected rows: 1
这成功导致目标表更新:
$ bq query --format=csv --max_rows=10 --use_legacy_sql=false "select * from ta_producer_conformed.demo"
Waiting on bqjob_r7f6b6a46_000001740b5057a3_1 ... (0s) Current status: DONE
id,wf_id
1,CIMDI000TT,2020-03-28,checksum1,2020-08-20 09:44:20,gs://dolphin-dev-raw/demo/input/demo_20191125_20200505050505.tsv,20200505050505,scheduled__2020-08-19T16:24:00+00:00
如果再次返回MERGE,则行修改计数为0:
$ bq query --use_legacy_sql=false --location=asia-east2 "$(cat merge.sql | awk 'ORS=" "')"
Waiting on bqjob_r3de2f833_000001740b4161b3_1 ... (0s) Current status: DONE
Number of affected rows: 0
不会对目标表进行任何更改。因此一切正常。
问题是,当我在带有许多要插入到空目标表的许多输入文件的更复杂示例上运行代码时,我最终得到的行具有相同的id
,其中count(id)
不是等于count(distinct id)
:
$ bq query --use_legacy_sql=false --max_rows=999999 --location=asia-east2 "select count(id) as total_records from xx_producer_conformed.xxx; select count(distinct id) as unique_records from xx_producer_conformed.xxx; "
Waiting on bqjob_r5df5bec8_000001740b7dfa50_1 ... (1s) Current status: DONE
select count(id) as total_records from xx_producer_conformed.xxx; -- at [1:1]
+---------------+
| total_records |
+---------------+
| 11582 |
+---------------+
select count(distinct id) as unique_records from xx_producer_conformed.xxx; -- at [1:78]
+----------------+
| unique_records |
+----------------+
| 5722 |
+----------------+
这让我感到惊讶,因为我的期望是,底层逻辑将遍历每个底层文件中的每一行,并且仅在第一个id
上插入,然后在随后的任何id
上进行更新。因此,我期望输入行中的行数不能超过唯一的id
。
如果我随后再次尝试运行MERGE,它将无法告诉我目标表中有多个具有相同ID的行:
$ bq query --use_legacy_sql=false --location=asia-east2 "$(cat merge.sql | awk 'ORS=" "')"
Waiting on bqjob_r2fe783fc_000001740b8271aa_1 ... (0s) Current status: DONE
Error in query string: Error processing job 'xxxx-10843454-datamesh-
dev:bqjob_r2fe783fc_000001740b8271aa_1': UPDATE/MERGE must match at most one
source row for each target row
我期望MERGE语句插入时不会有两行具有相同的“ id”。
所有使用的表和查询都是从列出“业务列”的文件中生成的。因此,上面的简单演示示例在登录和加入MERGE语句方面与全面查询相同。
为什么上面的MERGE查询会导致具有重复的“ id”的行,我该如何解决?
解决方法
通过擦拭目标表并复制相对较大的输入作为输入,可以很容易地重复该问题:
AAAA_20200805_20200814200000.tsv
AAAA_clone_20200805_20200814200000.tsv
我相信这是并行性的核心。一个由多个文件组成的大型MERGE可以并行产生许多工作线程。如果两个并行运行的辅助线程加载不同的文件以立即“看到”彼此插入,那将非常慢。相反,我希望它们能够独立运行,并且不会“看到”彼此写入单独的缓冲区。最终合并缓冲区后,将导致多个插入具有相同的id
。
要解决此问题,我正在使用一些CTE通过使用id
根据extract_timestamp来选择任何ROW_NUMBER() OVER (PARTITION BY id ORDER BY extract_timestamp DESC)
的最新记录。然后,我们可以按最低值进行筛选以选择记录的最新版本。完整的查询是:
MERGE xx_producer_conformed.demo T
USING (
WITH cteExtractTimestamp AS (
SELECT
id,PortfolioCode,ValuationDate,load_checksum,_FILE_NAME,REGEXP_EXTRACT(_FILE_NAME,'.*_[0-9]{8}_([0-9]{14}).tsv') AS extract_timestamp
FROM
xx_producer_raw.demo_raw
),cteRanked AS (
SELECT
id,extract_timestamp,ROW_NUMBER() OVER (PARTITION BY id ORDER BY extract_timestamp DESC) AS row_num
FROM
cteExtractTimestamp
)
SELECT
id,row_num,"{{ task_instance.xcom_pull(task_ids='get_run_id') }}" AS wf_id
FROM cteRanked
WHERE row_num = 1
) S
ON
S.id = T.id
WHEN NOT MATCHED THEN
INSERT (id,insert_time,file_name,wf_id)
VALUES (id,CURRENT_TIMESTAMP(),wf_id)
WHEN MATCHED AND S.load_checksum != T.load_checksum THEN UPDATE SET
T.id = S.id,T.PortfolioCode = S.PortfolioCode,T.ValuationDate = S.ValuationDate,T.load_checksum = S.load_checksum,T.file_name = S._FILE_NAME,T.extract_timestamp = S.extract_timestamp,T.wf_id = S.wf_id
这意味着克隆文件而不更改文件名中的extract_timestamp将随机选择两行之一。在正常运行中,我们希望具有更新数据的后续提取成为具有新的extract_timetamp的源文件。然后,上面的查询将选择最新记录以合并到目标表中。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。