Python alembic.op 模块,bulk_insert() 实例源码
我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用alembic.op.bulk_insert()。
def upgrade():
"""Insert fixtures for models,tests purpose only"""
users = [generate_users(i) for i in range(1, 52)]
articles = [generate_articles(i, 50) for i in range(1, 200)]
article = table(
'article',
sa.Column('id', sa.Integer(), nullable=False, primary_key=True, autoincrement=True),
sa.Column('title', sa.String(length=100), nullable=False),
sa.Column('content', sa.Text(),
sa.Column('user_id', nullable=True),
)
user = table(
'user',
sa.Column('username', sa.String(length=80),
sa.Column('email', sa.String(length=120),
)
op.bulk_insert(user, users)
op.bulk_insert(article, articles)
def upgrade():
### commands auto generated by Alembic - please adjust! ###
voting_types = op.create_table('voting_types',
sa.Column('id',
sa.Column('name', sa.String(length=25),
sa.PrimaryKeyConstraint('id')
)
### Populate with question types
op.bulk_insert(voting_types,
[
{'name':'triangle'},
{'name':'linear'}
]
)
op.add_column(u'question', sa.Column('voting_type_id', server_default="1"))
op.create_foreign_key('fk_quesion_voting_types', 'question', 'voting_types', ['voting_type_id'], ['id'])
### end Alembic commands ###
def upgrade():
### commands auto generated by Alembic - please adjust! ###
question_types = op.create_table('question_types',
sa.PrimaryKeyConstraint('id')
)
### Populate with question types
op.bulk_insert(question_types,
[
{'name':'standard'},
{'name':'image'}
]
)
op.add_column(u'question', sa.Column('question_type_id', server_default="1"))
op.create_foreign_key('fk_quesion_question_types', 'question_types', ['question_type_id'], ['id'])
### end Alembic commands ###
def upgrade():
roles_table = sa.sql.table(
'roles',
sa.sql.column('name', sa.String)
)
# Insert new roles
op.bulk_insert(
roles_table,
[
{'name': 'superblogger'},
{'name': 'superuploader'},
]
)
def bulk_insert(self, table, rows):
"""Issue a "bulk insert" operation using the current
migration context.
This provides a means of representing an INSERT of multiple rows
which works equally well in the context of executing on a live
connection as well as that of generating a SQL script. In the
case of a SQL script,the values are rendered inline into the
statement.
e.g.::
from alembic import op
from datetime import date
from sqlalchemy.sql import table,column
from sqlalchemy import String,Integer,Date
# Create an ad-hoc table to use for the insert statement.
accounts_table = table('account',
column('id',Integer),
column('name',String),
column('create_date',Date)
)
op.bulk_insert(accounts_table,
[
{'id':1,'name':'John Smith',
'create_date':date(2010,10,5)},
{'id':2,'name':'Ed Williams',
'create_date':date(2007,5,27)},
{'id':3,'name':'Wendy Jones',
'create_date':date(2008,8,15)},
]
)
"""
self.impl.bulk_insert(table, rows)
def upgrade():
### commands auto generated by Alembic - please adjust! ###
t = op.create_table('office_numbers',
sa.Column('date_created', sa.DateTime(),
sa.Column('date_modified', sa.String(length=128),
sa.Column('number_prefix',
sa.Column('main_number',
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name')
)
op.bulk_insert(t, [
{
'name': 'Main Office',
'number_prefix': '555-1',
'main_number': '555-13211-3221',
},
{
'name': 'Babel Office',
'number_prefix': '555-2',
'main_number': '555-21111-3221',
{
'name': 'Office Minnows',
'number_prefix': '555-9',
'main_number': '555-99911-3221',
])
### end Alembic commands ###
def _seed(table_obj, file_path, tx=None):
with open(file_path, 'r') as f:
reader = DictReader(f, delimiter=',')
rows = list(reader)
if tx is not None:
rows = [_tx_row(row, tx) for row in rows]
op.bulk_insert(table_obj, rows)
def copy_table(session, model_cls):
rows = session.query(model_cls).all()
rows = list(map(lambda x: vars(x), rows))
op.bulk_insert(model_cls.__table__, rows)
def upgrade():
if current_env in ['test', 'dev']:
users_table = table(
'users',
sa.Column('id', postgresql.UUID, server_default=sa.text('uuid_generate_v1()'), primary_key=True),
sa.Column('name', sa.Text, unique=True),
sa.Column('password',
sa.Column('modified', sa.DateTime, server_default=sa.text('clock_timestamp()')),
sa.Column('created', server_default=sa.text('now()'))
)
op.bulk_insert(users_table, [{'name': "test_user", 'password': hash_password('test123')}])
def upgrade():
op.bulk_insert(subject_table,
[
{'id': 3, 'name': 'Sociology', 'tag': 'book:stax-soc'},
]
)
def upgrade():
### commands auto generated by Alembic - please adjust! ###
op.create_table('subjects', sa.String(),
sa.Column('tag',
sa.PrimaryKeyConstraint('id')
)
op.create_table('user_unqualified_exercises',
sa.Column('user_id',
sa.Column('exercise_id',
sa.ForeignKeyConstraint(['exercise_id'], ['exercises.id'], ),
sa.ForeignKeyConstraint(['user_id'], ['users.id'],
sa.PrimaryKeyConstraint('id')
)
op.add_column(u'exercises', sa.Column('subject_id', nullable=True))
op.create_foreign_key('exercises_subject_id_fkey', 'exercises', 'subjects', ['subject_id'], ['id'])
op.add_column(u'responses', sa.Column('subject', nullable=True))
op.add_column(u'responses', nullable=True))
op.create_foreign_key('responses_subject_id_fkey', 'responses', ['id'])
### end Alembic commands ###
op.bulk_insert(subject_table,
[
{'id': 1, 'name': 'Biology', 'tag': 'apbio'},
{'id': 2, 'name': 'Physics', 'tag': 'k12phys'}
]
)
def upgrade():
### commands auto generated by Alembic - please adjust! ###
DistributedQueryTask = namedtuple('DistributedQueryTask', [
'id', 'status', 'retrieved', 'guid', 'node_id'])
distributed_query_task = op.create_table('distributed_query_task', autoincrement=True,
sa.Column('guid',
sa.Column('status',
sa.Column('timestamp',
sa.Column('distributed_query_id',
sa.Column('node_id',
sa.ForeignKeyConstraint(['distributed_query_id'], ['distributed_query.id'],
sa.ForeignKeyConstraint(['node_id'], ['node.id'],
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('guid')
)
cursor = op.get_bind().execute("""
SELECT id,status,retrieved,guid,node_id
FROM distributed_query
ORDER BY id;"""
)
results = map(DistributedQueryTask._make, cursor.fetchall())
distributed_query_tasks = [dict(
distributed_query_id=r.id,
status=r.status,
timestamp=r.retrieved,
guid=r.guid,
node_id=r.node_id) for r in results]
op.bulk_insert(distributed_query_task, distributed_query_tasks)
op.add_column(u'distributed_query', sa.Column('description', nullable=True))
op.drop_constraint(u'distributed_query_guid_key', 'distributed_query', type_='unique')
op.drop_constraint(u'distributed_query_node_id_fkey', type_='foreignkey')
op.drop_column(u'distributed_query', 'status')
op.drop_column(u'distributed_query', 'retrieved')
op.drop_column(u'distributed_query', 'guid')
op.drop_column(u'distributed_query', 'node_id')
op.add_column(u'distributed_query_result', sa.Column('distributed_query_task_id', nullable=True))
# distributed queries and tasks were the same before,
# so their id's will remain the same as well.
op.execute("""
UPDATE distributed_query_result
SET distributed_query_task_id = distributed_query_id;"""
)
op.alter_column(u'distributed_query_result', 'distributed_query_task_id', nullable=False)
op.create_foreign_key(None, 'distributed_query_result', 'distributed_query_task', ['distributed_query_task_id'], ['id'])
### end Alembic commands ###
def downgrade():
### commands auto generated by Alembic - please adjust! ###
DistributedQuery = namedtuple('DistributedQuery', [
'task_id', 'query_id',
'guid', 'sql', 'timestamp', 'not_before',
'retrieved', 'node_id'])
cursor = op.get_bind().execute("""
SELECT DISTINCT t.id AS task_id,q.id AS query_id,
t.guid,t.status,q.sql,q.timestamp,q.not_before,
t.timestamp AS retrieved,t.node_id
FROM distributed_query q
INNER JOIN distributed_query_task t
ON q.id = t.distributed_query_id
ORDER BY t.id;
""")
results = map(DistributedQuery._make, cursor.fetchall())
op.drop_constraint(u'distributed_query_result_distributed_query_task_id_fkey', type_='foreignkey')
op.drop_column(u'distributed_query_result', 'distributed_query_task_id')
op.drop_constraint(u'distributed_query_task_distributed_query_id_fkey', type_='foreignkey')
op.drop_table(u'distributed_query')
distributed_query = op.create_table('distributed_query',
sa.Column('sql',
sa.Column('not_before',
sa.Column('retrieved',
sa.UniqueConstraint('guid')
)
distributed_queries = [dict(
guid=r.guid,
sql=r.sql,
timestamp=r.timestamp,
not_before=r.not_before,
retrieved=r.retrieved,
node_id=r.node_id) for r in results]
op.bulk_insert(distributed_query, distributed_queries)
op.drop_table('distributed_query_task')
### end Alembic commands ###
def upgrade():
"""Upgrade the database to a newer revision."""
# ### commands auto generated by Alembic - please adjust! ###
ecosystems = op.create_table('ecosystems',
sa.Column('id',
sa.Column('name', sa.String(length=255),
sa.Column('_backend', sa.Enum('none', 'npm', 'maven', 'pypi',
'rubygems', 'scm', 'crates',
name='ecosystem_backend_enum'),
nullable=True),
sa.Column('url',
sa.Column('fetch_url',
sa.PrimaryKeyConstraint('id'))
op.bulk_insert(ecosystems, 'name': 'rubygems', '_backend': 'rubygems',
'url': 'https://rubygems.org/',
'fetch_url': 'https://rubygems.org/api/v1'}, 'name': 'npm', '_backend': 'npm',
'url': 'https://www.npmjs.com/',
'fetch_url': 'https://registry.npmjs.org/'},
{'id': 3, 'name': 'maven', '_backend': 'maven',
'url': 'https://repo1.maven.org/maven2/', 'fetch_url': None},
{'id': 4, 'name': 'pypi', '_backend': 'pypi',
'url': 'https://pypi.python.org/',
'fetch_url': 'https://pypi.python.org/pypi'},
{'id': 5, 'name': 'go', '_backend': 'scm', 'url': None,
{'id': 6, 'name': 'crates', '_backend': 'crates',
'url': 'https://crates.io/',
'fetch_url': None}, ])
op.create_table('packages',
sa.Column('id',
sa.Column('ecosystem_id',
sa.Column('name',
sa.ForeignKeyConstraint(['ecosystem_id'], ['ecosystems.id'],
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('ecosystem_id', 'name', name='ep_unique'))
op.create_table('versions',
sa.Column('package_id',
sa.Column('identifier',
sa.ForeignKeyConstraint(['package_id'], ['packages.id'],
sa.UniqueConstraint('package_id', 'identifier', name='pv_unique'))
op.add_column('analyses', sa.Column('version_id', nullable=True))
op.create_foreign_key(None, 'analyses', 'versions', ['version_id'], ['id'])
op.drop_column('analyses', 'package')
op.drop_column('analyses', 'ecosystem')
op.drop_column('analyses', 'version')
op.add_column('analysis_requests', nullable=True))
op.drop_index('epv_index', table_name='analysis_requests')
op.create_index('epv_index', 'analysis_requests', unique=True,
postgresql_where=sa.text('fulfilled_at IS NULL'))
op.create_foreign_key(None, ['id'])
op.drop_column('analysis_requests', 'package')
op.drop_column('analysis_requests', 'ecosystem')
op.drop_column('analysis_requests', 'version')
# ### end Alembic commands ###
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
video_file_table = op.create_table('video_file',
sa.Column('id', postgresql.UUID(as_uuid=True),
sa.Column('bangumi_id',
sa.Column('episode_id',
sa.Column('file_name',
sa.Column('file_path',
sa.Column('torrent_id',
sa.Column('download_url',
sa.Column('status',
sa.Column('resolution_w',
sa.Column('resolution_h',
sa.Column('duration',
sa.Column('label',
sa.ForeignKeyConstraint(['bangumi_id'], ['bangumi.id'],
sa.ForeignKeyConstraint(['episode_id'], ['episodes.id'],
sa.PrimaryKeyConstraint('id')
)
# ### end Alembic commands ###
connection = op.get_bind()
result = connection.execute(sa.text(
'SELECT t.episode_id,t.torrent_id,t.file_path,eps.bangumi_id,eps.episode_no FROM torrentfile t LEFT JOIN episodes eps ON eps.id = t.episode_id WHERE file_path NOTNULL'))
video_file_list = []
for row in result:
video_file = {
'id': uuid4(),
'status': 3
}
if row[1] == -1 or __is_uuid4(row[1]):
video_file['torrent_id'] = None
else:
video_file['torrent_id'] = row[1]
video_file['episode_id'] = row[0]
video_file['file_path'] = row[2]
video_file['bangumi_id'] = row[3]
meta_info = video_manager.get_video_meta(
u'{0}/{1}/{2}'.format(get_base_path(), str(video_file['bangumi_id']), video_file['file_path']))
if meta_info is None:
continue
video_file['resolution_w'] = meta_info.get('width')
video_file['resolution_h'] = meta_info.get('height')
video_file['duration'] = meta_info.get('duration')
video_file_list.append(video_file)
op.bulk_insert(video_file_table, video_file_list)
connection.execute(sa.text('UPDATE episodes SET status = 0 WHERE status = 1'))
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。