BigQuery MERGE意外的行重复

如何解决BigQuery MERGE意外的行重复

我正在使用标准SQL MERGE根据源外部表(它是存储桶中的一组CVS文件)在常规目标表上进行更新。这是一个简化的输入文件:

$ gsutil cat gs://dolphin-dev-raw/demo/input/demo_20191125_20200505050505.tsv
"id"    "PortfolioCode" "ValuationDate" "load_checksum"
"1"     "CIMDI000TT"    "2020-03-28"    "checksum1"

MERGE语句为:

MERGE xx_producer_conformed.demo T
USING xx_producer_raw.demo_raw S
ON
    S.id = T.id
WHEN NOT MATCHED THEN
    INSERT (id,PortfolioCode,ValuationDate,load_checksum,insert_time,file_name,extract_timestamp,wf_id)
    VALUES (id,CURRENT_TIMESTAMP(),_FILE_NAME,REGEXP_EXTRACT(_FILE_NAME,'.*_[0-9]{8}_([0-9]{14}).tsv'),'scheduled__2020-08-19T16:24:00+00:00')
WHEN MATCHED AND S.load_checksum != T.load_checksum THEN UPDATE SET
    T.id = S.id,T.PortfolioCode = S.PortfolioCode,T.ValuationDate = S.ValuationDate,T.load_checksum = S.load_checksum,T.file_name = S._FILE_NAME,T.extract_timestamp = REGEXP_EXTRACT(_FILE_NAME,T.wf_id = 'scheduled__2020-08-19T16:24:00+00:00'

如果我擦除目标表并重新运行MERGE,则行修改计数为1:

bq query --use_legacy_sql=false --location=asia-east2 "$(cat merge.sql  |  awk 'ORS=" "')"
Waiting on bqjob_r288f8d33_000001740b413532_1 ... (0s) Current status: DONE
Number of affected rows: 1

这成功导致目标表更新:

$ bq query --format=csv --max_rows=10 --use_legacy_sql=false "select * from ta_producer_conformed.demo"
Waiting on bqjob_r7f6b6a46_000001740b5057a3_1 ... (0s) Current status: DONE
id,wf_id
1,CIMDI000TT,2020-03-28,checksum1,2020-08-20 09:44:20,gs://dolphin-dev-raw/demo/input/demo_20191125_20200505050505.tsv,20200505050505,scheduled__2020-08-19T16:24:00+00:00

如果再次返回MERGE,则行修改计数为0:

$ bq query --use_legacy_sql=false --location=asia-east2 "$(cat merge.sql  |  awk 'ORS=" "')"
Waiting on bqjob_r3de2f833_000001740b4161b3_1 ... (0s) Current status: DONE
Number of affected rows: 0

不会对目标表进行任何更改。因此一切正常。

问题是,当我在带有许多要插入到空目标表的许多输入文件的更复杂示例上运行代码时,我最终得到的行具有相同的id,其中count(id)不是等于count(distinct id)

$ bq query --use_legacy_sql=false --max_rows=999999 --location=asia-east2 "select count(id) as total_records from xx_producer_conformed.xxx; select count(distinct id) as unique_records from xx_producer_conformed.xxx; "
Waiting on bqjob_r5df5bec8_000001740b7dfa50_1 ... (1s) Current status: DONE
select count(id) as total_records from xx_producer_conformed.xxx; -- at [1:1]
+---------------+
| total_records |
+---------------+
|         11582 |
+---------------+
select count(distinct id) as unique_records from xx_producer_conformed.xxx; -- at [1:78]
+----------------+
| unique_records |
+----------------+
|           5722 |
+----------------+

这让我感到惊讶,因为我的期望是,底层逻辑将遍历每个底层文件中的每一行,并且仅在第一个id上插入,然后在随后的任何id上进行更新。因此,我期望输入行中的行数不能超过唯一的id

如果我随后再次尝试运行MERGE,它将无法告诉我目标表中有多个具有相同ID的行:

$  bq query --use_legacy_sql=false --location=asia-east2 "$(cat merge.sql  |  awk 'ORS=" "')"
Waiting on bqjob_r2fe783fc_000001740b8271aa_1 ... (0s) Current status: DONE
Error in query string: Error processing job 'xxxx-10843454-datamesh-
dev:bqjob_r2fe783fc_000001740b8271aa_1': UPDATE/MERGE must match at most one
source row for each target row

我期望MERGE语句插入时不会有两行具有相同的“ id”。

所有使用的表和查询都是从列出“业务列”的文件中生成的。因此,上面的简单演示示例在登录和加入MERGE语句方面与全面查询相同。

为什么上面的MERGE查询会导致具有重复的“ id”的行,我该如何解决?

解决方法

通过擦拭目标表并复制相对较大的输入作为输入,可以很容易地重复该问题:

AAAA_20200805_20200814200000.tsv
AAAA_clone_20200805_20200814200000.tsv

我相信这是并行性的核心。一个由多个文件组成的大型MERGE可以并行产生许多工作线程。如果两个并行运行的辅助线程加载不同的文件以立即“看到”彼此插入,那将非常慢。相反,我希望它们能够独立运行,并且不会“看到”彼此写入单独的缓冲区。最终合并缓冲区后,将导致多个插入具有相同的id

要解决此问题,我正在使用一些CTE通过使用id根据extract_timestamp来选择任何ROW_NUMBER() OVER (PARTITION BY id ORDER BY extract_timestamp DESC)的最新记录。然后,我们可以按最低值进行筛选以选择记录的最新版本。完整的查询是:

MERGE xx_producer_conformed.demo T
USING (
    WITH cteExtractTimestamp AS (
        SELECT
            id,PortfolioCode,ValuationDate,load_checksum,_FILE_NAME,REGEXP_EXTRACT(_FILE_NAME,'.*_[0-9]{8}_([0-9]{14}).tsv') AS extract_timestamp
        FROM
            xx_producer_raw.demo_raw
    ),cteRanked AS (
        SELECT
            id,extract_timestamp,ROW_NUMBER() OVER (PARTITION BY id ORDER BY extract_timestamp DESC) AS row_num
        FROM 
            cteExtractTimestamp
    )
    SELECT 
        id,row_num,"{{ task_instance.xcom_pull(task_ids='get_run_id') }}" AS wf_id
    FROM cteRanked 
    WHERE row_num = 1
) S
ON
    S.id = T.id
WHEN NOT MATCHED THEN
    INSERT (id,insert_time,file_name,wf_id)
    VALUES (id,CURRENT_TIMESTAMP(),wf_id)
WHEN MATCHED AND S.load_checksum != T.load_checksum THEN UPDATE SET
    T.id = S.id,T.PortfolioCode = S.PortfolioCode,T.ValuationDate = S.ValuationDate,T.load_checksum = S.load_checksum,T.file_name = S._FILE_NAME,T.extract_timestamp = S.extract_timestamp,T.wf_id = S.wf_id

这意味着克隆文件而不更改文件名中的extract_timestamp将随机选择两行之一。在正常运行中,我们希望具有更新数据的后续提取成为具有新的extract_timetamp的源文件。然后,上面的查询将选择最新记录以合并到目标表中。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 <select id="xxx"> SELECT di.id, di.name, di.work_type, di.updated... <where> <if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 <property name="dynamic.classpath" value="tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-