如何通过非气流操作员python函数访问Xcom值

如何解决如何通过非气流操作员python函数访问Xcom值

我有一个要存储的XCom值,我想传递给另一个不使用PythonOperator调用的python函数。


def sql_file_template():
    <some code which uses xcom variable>

def call_stored_proc(**kwargs):
        
        #project = kwargs['row_id']
        print("INSIDE CALL STORE PROC ------------")   
        query = """CALL `{0}.dataset_name.store_proc`(
                          '{1}' # source table,['{2}'] # row_ids,'{3}' # pivot_col_name,'{4}' # pivot_col_value,100 # max_columns,'MAX' # aggregation
                );"""
        query = query.format(kwargs['project'],kwargs['source_tbl'],kwargs['row_id'],kwargs['pivot_col'],kwargs['pivot_val'])
        job = client.query(query,location="US")
        for result in job.result():
            task_instance = kwargs['task_instance']
            task_instance.xcom_push(key='query_string',value=result) 
                print result
                return result



bq_cmd = PythonOperator (
    task_id=                    'task1'
    provide_context=            True,python_callable=            call_stored_proc,op_kwargs=                  {'project'        : project,'source_tbl'     : source_tbl,'row_id'         : row_id,'pivot_col'      : pivot_col,'pivot_val'      : pivot_val
                                },dag=                        dag
)

dummy_operator >> bq_cmd
sql_file_template()

存储的proc的输出是使用xcom捕获的字符串。

现在,我想将此值传递给某些python函数 sql_file_template ,而无需使用PythonOperator。

根据Airflow文档,只能在任务之间访问xcom。

有人可以帮忙吗?

解决方法

如果您有权查询要查询的Airflow安装(配置,数据库访问和代码),则可以使用Airflow的airflow.models.XCom:get_one类方法:

from datetime import datetime

from airflow.models import XCom


execution_date = datetime(2020,8,28)
xcom_value = XCom.get_one(execution_date=execution_date,task_id="the_task_id",dag_id="the_dag_id")            
,

所以您想在Airflow之外访问XCOM (可能是一个不同的项目/模块,不创建任何Airflow DAG /任务)?


Airflow使用SQLAlchemy将所有models(包括XCOM)映射到相应的SQLAlchemy后端(meta-db)表

因此,可以通过两种方式完成

  1. 利用气流的SQLAlchemy模型

    (无需创建任务或DAG)。这是一个 unested 代码段供参考

from typing import List
from airflow.models import XCom
from airflow.settings import Session
from airflow.utils.db import provide_session
from pendulum import Pendulum


@provide_session
def read_xcom_values(dag_id: str,task_id: str,execution_date: Pendulum,session: Optional[Session]) -> List[str]:
    """
    Function that reads and returns 'values' of XCOMs with given filters
    :param dag_id: 
    :param task_id: 
    :param execution_date: datetime object
    :param session: Airflow's SQLAlchemy Session (this param must not be passed,it will be automatically supplied by
                    '@provide_session' decorator)
    :return: 
    """
    # read XCOMs
    xcoms: List[XCom] = session.query(XCom).filter(
        XCom.dag_id == dag_id,XCom.task_id == task_id,XCom.execution_date == execution_date).all()
    # retrive 'value' fields from XCOMs
    xcom_values: List[str] = list(map(lambda xcom: xcom.value,xcoms))
    return xcom_values

请注意,由于它是导入气流包,因此它仍然需要在python类路径上正常安装气流(以及与backend-db的连接),但是在我们没有创建任何任务或任务(此代码段可以在独立的python文件中运行)

对于此摘要,我提到了views.py,这是我最喜欢的 Airflow的SQLAlchemy魔术


  1. 直接查询Airflow的SQLAlchemy后端元数据库

    连接到元数据库并运行此查询

    SELECT value FROM xcom WHERE dag_id='' AND task_id='' AND ..

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-