如何解决从PostgreSQL提取大数据块到Pandas DataFrames
我正在使用psycopg2
和pandas
从Postgres中提取数据。
pandas.read_sql_query
在提供chunksize
参数时支持Python“生成器”模式。在处理大型数据集时,它并不是很有用,因为最初将整个数据从数据库中检索到客户端内存中,然后根据chunksize
分成单独的帧。使用这种方法,大型数据集将很容易遇到内存不足的问题。
Postgres / psycopg2正在使用server-side cursors解决此问题。但是熊猫似乎并没有支持它。
代替:
iter = sql.read_sql_query(sql,conn,index_col='col1',chunksize=chunksize)
我尝试过像这样重新实现它:
from pandas.io.sql import SQLiteDatabase
curs = conn.cursor(name='cur_name') # server side cursor creation
curs.itersize = chunksize
pandas_sql = SQLiteDatabase(curs,is_cursor=True)
iter = pandas_sql.read_query(
sql,chunksize=chunksize)
但是它失败了,因为Pandas试图访问cursor.description
,由于服务器端游标的某种原因,它为NULL(并知道为什么)。
最好的方法是什么? Tnx
P.S。
- 当SQLAlchemy不可用时,SQLiteDatabase与Postgres一起使用
- 对熊猫的功能要求-https://github.com/pandas-dev/pandas/issues/35689
解决方法
您需要重写熊猫的read_query()
,以免使用cursor.description
。只需将列名称列表传递给read_query()
即可使用它而不是cursor.description`:
import psycopg2
from pandas.io.sql import SQLiteDatabase,_convert_params
# modify read_query as you need and overwrite it
# added column names as argument
def read_query_modified(
self,sql,columns,index_col=None,coerce_float=True,params=None,parse_dates=None,chunksize=None,):
args = _convert_params(sql,params)
cursor = self.execute(*args)
# columns = [col_desc[0] for col_desc in cursor.description]
if chunksize is not None:
return self._query_iterator(
cursor,chunksize,index_col=index_col,coerce_float=coerce_float,parse_dates=parse_dates,)
else:
data = self._fetchall_as_list(cursor)
cursor.close()
frame = _wrap_result(
data,)
return frame
# replace read_query with your version
SQLiteDatabase.read_query = read_query_modified
chunksize = 2
conn = psycopg2.connect("dbname=mf port=5959 host=localhost user=mf_usr")
curs = conn.cursor(name='cur_name')
curs.itersize = chunksize
sql = 'select * from users where id = 366196'
columns = ['id','firstname','lastname','birth','gender','nationality']
pandas_sql = SQLiteDatabase(curs,is_cursor=True)
iter = pandas_sql.read_query(
sql,index_col='id',chunksize=chunksize)
for x in iter:
print(x)
输出:
firstname lastname birth gender nationality
id
366196 Michael Kronberger None None at
,
我对@Maurice Meyer的回答做了一些改进,以防止需要将columns
作为参数传递:
class CustomSQLiteDatabase(SQLiteDatabase):
def read_query(
self,):
args = _convert_params(sql,params)
self.con.execute(*args)
if chunksize is not None:
columns = None
while True:
data = self.con.fetchmany(chunksize)
if not columns:
columns = [col_desc[0] for col_desc in self.con.description]
if type(data) == tuple:
data = list(data)
if not data:
self.con.close()
break
else:
yield _wrap_result(
data,)
else:
data = self._fetchall_as_list(self.con)
columns = [col_desc[0] for col_desc in self.con.description]
self.con.close()
frame = _wrap_result(
data,)
return frame
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。