如何解决Luigi/SQLite:初始加载后如何更新数据库?
我正在使用以下代码通过 Luigi 将数据加载到 SQLite 数据库中:
class LoadData(luigi.Task):
def requires(self):
return TransformData()
def run(self):
with sqlite3.connect('database.db') as db:
cursor = db.cursor()
cursor.execute("INSERT INTO prod SELECT * FROM staging;")
def output(self):
return luigi.LocalTarget('database.db')
这可行,但是当我想更新或插入新数据时,任务不会执行,因为 Luigi 认为它已完成(database.db
已存在)。
也许我没有理解LocalTarget的好用处。解决这个问题的正确方法是什么?
///EDIT:我的问题适用于 this page 上给出的示例(le_create_db.py
的代码)。您如何解决该示例中的更新和插入问题?
///EDIT: This question 关于附加到文件是类似的,但使用标记文件的解决方案不起作用,因为 sqla 需要 SQLAlchemyTarget
输出。还有其他答案吗,特别是关于附加到数据库的答案?
解决方法
考虑使用模拟文件: http://gouthamanbalaraman.com/blog/building-luigi-task-pipeline.html
在每次执行中,您将创建一个新文件。
另一种解决方案可能是使用在数据库内创建标记表的策略,例如:https://luigi.readthedocs.io/en/stable/api/luigi.contrib.postgres.html#luigi.contrib.postgres.PostgresTarget
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。