如何解决在使用Airflow和psycopg2使用Cloud Composer将数据从Postgres DB提取到Google云存储时,如何实现命名游标
Stackoverflow:
问题陈述:需要使用Cloud composer将数据从Postgres DB提取到Google云存储。我正在使用PostgresToGoogleCloudStorageOperator。 该代码适用于普通数据量。但是,对于1亿多条记录,我遇到了内存不足异常。在分析了问题之后,根本原因是默认客户端游标,该游标试图在运行时获取所有数据,并在执行cursor.execute(self.sql,self.parameters)时导致OOM。为了减轻这种情况,我尝试创建一个名为cursor的服务器端并指定cursor.itersize,以便它批量提取数据。还尝试过cursor.fetchmany(size = 5000000)。 但是在两种方式下,我都将cursor.rowcount设为-1。 在这种情况下,如何使命名游标工作? Airflow内部使用psycopg2连接到Postgres。 以下是我尝试的2个选项:
选项1
def _query_postgres(self):
"""
Queries Postgres and returns a cursor to the results.
"""
postgres = PostgresHook(postgres_conn_id=self.postgres_conn_id)
conn = postgres.get_conn()
logging.info('DEBUG: _query_postgres after getting conn')
#cursor = conn.cursor('buffered_fetch')#('nexpose_cursor')
#cursor=conn.cursor('buffered_fetch',cursor_factory=psycopg2.extras.DictCursor)
#self.log.info('cursor.rowcount %s records',cursor.rowcount)
logging.info('DEBUG: _query_postgres after getting cursor')
with conn.cursor(name='custom_cursor',cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.fetchmany(size=5000000) # chunk size
logging.info('cursor.itersize is set to 5000000')
cursor.execute(self.sql,self.parameters)
logging.info('DEBUG: _query_postgres after cursor execute')
self.log.info('cursor.rowcount %s records',cursor.rowcount)
#cursor.itersize = 5000000
#logging.info('cursor.itersize is set to 5000000')
#cursor.execute(self.sql,self.parameters)
logging.info('DEBUG: _query_postgres after cursor execute')
self.log.info('cursor.rowcount %s records',cursor.rowcount)
return cursor
Option2
def _query_postgres(self):
"""
Queries Postgres and returns a cursor to the results.
"""
postgres = PostgresHook(postgres_conn_id=self.postgres_conn_id)
conn = postgres.get_conn()
logging.info('DEBUG: _query_postgres after getting conn')
cursor=conn.cursor('buffered_fetch',cursor_factory=psycopg2.extras.DictCursor)
self.log.info('cursor.rowcount %s records',cursor.rowcount)
logging.info('DEBUG: _query_postgres after getting cursor')
cursor.itersize = 5000000
logging.info('cursor.itersize is set to 5000000')
cursor.execute(self.sql,cursor.rowcount)
return cursor
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。