如何解决如何通过非气流操作员python函数访问Xcom值
我有一个要存储的XCom值,我想传递给另一个不使用PythonOperator调用的python函数。
def sql_file_template():
<some code which uses xcom variable>
def call_stored_proc(**kwargs):
#project = kwargs['row_id']
print("INSIDE CALL STORE PROC ------------")
query = """CALL `{0}.dataset_name.store_proc`(
'{1}' # source table,['{2}'] # row_ids,'{3}' # pivot_col_name,'{4}' # pivot_col_value,100 # max_columns,'MAX' # aggregation
);"""
query = query.format(kwargs['project'],kwargs['source_tbl'],kwargs['row_id'],kwargs['pivot_col'],kwargs['pivot_val'])
job = client.query(query,location="US")
for result in job.result():
task_instance = kwargs['task_instance']
task_instance.xcom_push(key='query_string',value=result)
print result
return result
bq_cmd = PythonOperator (
task_id= 'task1'
provide_context= True,python_callable= call_stored_proc,op_kwargs= {'project' : project,'source_tbl' : source_tbl,'row_id' : row_id,'pivot_col' : pivot_col,'pivot_val' : pivot_val
},dag= dag
)
dummy_operator >> bq_cmd
sql_file_template()
存储的proc的输出是使用xcom捕获的字符串。
现在,我想将此值传递给某些python函数 sql_file_template ,而无需使用PythonOperator。
根据Airflow文档,只能在任务之间访问xcom。
有人可以帮忙吗?
解决方法
如果您有权查询要查询的Airflow安装(配置,数据库访问和代码),则可以使用Airflow的airflow.models.XCom:get_one
类方法:
from datetime import datetime
from airflow.models import XCom
execution_date = datetime(2020,8,28)
xcom_value = XCom.get_one(execution_date=execution_date,task_id="the_task_id",dag_id="the_dag_id")
,
所以您想在Airflow之外访问XCOM (可能是一个不同的项目/模块,不创建任何Airflow DAG /任务)?
Airflow使用SQLAlchemy
将所有model
s(包括XCOM
)映射到相应的SQLAlchemy后端(meta-db)表
因此,可以通过两种方式完成
-
利用气流的SQLAlchemy模型
(无需创建任务或DAG)。这是一个 unested 代码段供参考
from typing import List from airflow.models import XCom from airflow.settings import Session from airflow.utils.db import provide_session from pendulum import Pendulum @provide_session def read_xcom_values(dag_id: str,task_id: str,execution_date: Pendulum,session: Optional[Session]) -> List[str]: """ Function that reads and returns 'values' of XCOMs with given filters :param dag_id: :param task_id: :param execution_date: datetime object :param session: Airflow's SQLAlchemy Session (this param must not be passed,it will be automatically supplied by '@provide_session' decorator) :return: """ # read XCOMs xcoms: List[XCom] = session.query(XCom).filter( XCom.dag_id == dag_id,XCom.task_id == task_id,XCom.execution_date == execution_date).all() # retrive 'value' fields from XCOMs xcom_values: List[str] = list(map(lambda xcom: xcom.value,xcoms)) return xcom_values
请注意,由于它是导入气流包,因此它仍然需要在python类路径上正常安装气流(以及与backend-db的连接),但是在我们没有创建任何任务或任务(此代码段可以在独立的python文件中运行)
对于此摘要,我提到了views.py
,这是我最喜欢的 Airflow的SQLAlchemy魔术
-
直接查询Airflow的SQLAlchemy后端元数据库
连接到元数据库并运行此查询
SELECT value FROM xcom WHERE dag_id='' AND task_id='' AND ..
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。