我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用alembic.op.bulk_insert()。
def upgrade(): """Insert fixtures for models, tests purpose only""" users = [generate_users(i) for i in range(1, 52)] articles = [generate_articles(i, 50) for i in range(1, 200)] article = table( 'article', sa.Column('id', sa.Integer(), nullable=False, primary_key=True, autoincrement=True), sa.Column('title', sa.String(length=100), nullable=False), sa.Column('content', sa.Text(), nullable=False), sa.Column('user_id', sa.Integer(), nullable=True), ) user = table( 'user', sa.Column('id', sa.Integer(), nullable=False, primary_key=True, autoincrement=True), sa.Column('username', sa.String(length=80), nullable=False), sa.Column('email', sa.String(length=120), nullable=False), ) op.bulk_insert(user, users) op.bulk_insert(article, articles)
def upgrade(): ### commands auto generated by Alembic - please adjust! ### voting_types = op.create_table('voting_types', sa.Column('id', sa.Integer(), nullable=False), sa.Column('name', sa.String(length=25), nullable=False), sa.PrimaryKeyConstraint('id') ) ### Populate with question types op.bulk_insert(voting_types, [ {'name':'triangle'}, {'name':'linear'} ] ) op.add_column(u'question', sa.Column('voting_type_id', sa.Integer(), nullable=False, server_default="1")) op.create_foreign_key('fk_quesion_voting_types', 'question', 'voting_types', ['voting_type_id'], ['id']) ### end Alembic commands ###
def upgrade(): ### commands auto generated by Alembic - please adjust! ### question_types = op.create_table('question_types', sa.Column('id', sa.Integer(), nullable=False), sa.Column('name', sa.String(length=25), nullable=False), sa.PrimaryKeyConstraint('id') ) ### Populate with question types op.bulk_insert(question_types, [ {'name':'standard'}, {'name':'image'} ] ) op.add_column(u'question', sa.Column('question_type_id', sa.Integer(), nullable=False, server_default="1")) op.create_foreign_key('fk_quesion_question_types', 'question', 'question_types', ['question_type_id'], ['id']) ### end Alembic commands ###
def upgrade(): roles_table = sa.sql.table( 'roles', sa.sql.column('name', sa.String) ) # Insert new roles op.bulk_insert( roles_table, [ {'name': 'superblogger'}, {'name': 'superuploader'}, ] )
def bulk_insert(self, table, rows): """Issue a "bulk insert" operation using the current migration context. This provides a means of representing an INSERT of multiple rows which works equally well in the context of executing on a live connection as well as that of generating a SQL script. In the case of a SQL script, the values are rendered inline into the statement. e.g.:: from alembic import op from datetime import date from sqlalchemy.sql import table, column from sqlalchemy import String, Integer, Date # Create an ad-hoc table to use for the insert statement. accounts_table = table('account', column('id', Integer), column('name', String), column('create_date', Date) ) op.bulk_insert(accounts_table, [ {'id':1, 'name':'John Smith', 'create_date':date(2010, 10, 5)}, {'id':2, 'name':'Ed Williams', 'create_date':date(2007, 5, 27)}, {'id':3, 'name':'Wendy Jones', 'create_date':date(2008, 8, 15)}, ] ) """ self.impl.bulk_insert(table, rows)
def upgrade(): ### commands auto generated by Alembic - please adjust! ### t = op.create_table('office_numbers', sa.Column('id', sa.Integer(), nullable=False), sa.Column('date_created', sa.DateTime(), nullable=True), sa.Column('date_modified', sa.DateTime(), nullable=True), sa.Column('name', sa.String(length=128), nullable=False), sa.Column('number_prefix', sa.String(length=128), nullable=False), sa.Column('main_number', sa.String(length=128), nullable=False), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('name') ) op.bulk_insert(t, [ { 'name': 'Main Office', 'number_prefix': '555-1', 'main_number': '555-13211-3221', }, { 'name': 'Babel Office', 'number_prefix': '555-2', 'main_number': '555-21111-3221', }, { 'name': 'Office Minnows', 'number_prefix': '555-9', 'main_number': '555-99911-3221', }, ]) ### end Alembic commands ###
def _seed(table_obj, file_path, tx=None): with open(file_path, 'r') as f: reader = DictReader(f, delimiter=',') rows = list(reader) if tx is not None: rows = [_tx_row(row, tx) for row in rows] op.bulk_insert(table_obj, rows)
def copy_table(session, model_cls): rows = session.query(model_cls).all() rows = list(map(lambda x: vars(x), rows)) op.bulk_insert(model_cls.__table__, rows)
def upgrade(): if current_env in ['test', 'dev']: users_table = table( 'users', sa.Column('id', postgresql.UUID, server_default=sa.text('uuid_generate_v1()'), primary_key=True), sa.Column('name', sa.Text, nullable=False, unique=True), sa.Column('password', sa.Text, nullable=False), sa.Column('modified', sa.DateTime, server_default=sa.text('clock_timestamp()')), sa.Column('created', sa.DateTime, server_default=sa.text('now()')) ) op.bulk_insert(users_table, [{'name': "test_user", 'password': hash_password('test123')}])
def upgrade(): op.bulk_insert(subject_table, [ {'id': 3, 'name': 'Sociology', 'tag': 'book:stax-soc'}, ] )
def upgrade(): ### commands auto generated by Alembic - please adjust! ### op.create_table('subjects', sa.Column('id', sa.Integer(), nullable=False), sa.Column('name', sa.String(), nullable=True), sa.Column('tag', sa.String(), nullable=True), sa.PrimaryKeyConstraint('id') ) op.create_table('user_unqualified_exercises', sa.Column('id', sa.Integer(), nullable=False), sa.Column('user_id', sa.Integer(), nullable=True), sa.Column('exercise_id', sa.Integer(), nullable=True), sa.ForeignKeyConstraint(['exercise_id'], ['exercises.id'], ), sa.ForeignKeyConstraint(['user_id'], ['users.id'], ), sa.PrimaryKeyConstraint('id') ) op.add_column(u'exercises', sa.Column('subject_id', sa.Integer(), nullable=True)) op.create_foreign_key('exercises_subject_id_fkey', 'exercises', 'subjects', ['subject_id'], ['id']) op.add_column(u'responses', sa.Column('subject', sa.String(), nullable=True)) op.add_column(u'responses', sa.Column('subject_id', sa.Integer(), nullable=True)) op.create_foreign_key('responses_subject_id_fkey', 'responses', 'subjects', ['subject_id'], ['id']) ### end Alembic commands ### op.bulk_insert(subject_table, [ {'id': 1, 'name': 'Biology', 'tag': 'apbio'}, {'id': 2, 'name': 'Physics', 'tag': 'k12phys'} ] )
def upgrade(): ### commands auto generated by Alembic - please adjust! ### DistributedQueryTask = namedtuple('DistributedQueryTask', [ 'id', 'status', 'retrieved', 'guid', 'node_id']) distributed_query_task = op.create_table('distributed_query_task', sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), sa.Column('guid', sa.String(), nullable=False), sa.Column('status', sa.Integer(), nullable=False), sa.Column('timestamp', sa.DateTime(), nullable=True), sa.Column('distributed_query_id', sa.Integer(), nullable=False), sa.Column('node_id', sa.Integer(), nullable=False), sa.ForeignKeyConstraint(['distributed_query_id'], ['distributed_query.id'], ), sa.ForeignKeyConstraint(['node_id'], ['node.id'], ), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('guid') ) cursor = op.get_bind().execute(""" SELECT id, status, retrieved, guid, node_id FROM distributed_query ORDER BY id;""" ) results = map(DistributedQueryTask._make, cursor.fetchall()) distributed_query_tasks = [dict( distributed_query_id=r.id, status=r.status, timestamp=r.retrieved, guid=r.guid, node_id=r.node_id) for r in results] op.bulk_insert(distributed_query_task, distributed_query_tasks) op.add_column(u'distributed_query', sa.Column('description', sa.String(), nullable=True)) op.drop_constraint(u'distributed_query_guid_key', 'distributed_query', type_='unique') op.drop_constraint(u'distributed_query_node_id_fkey', 'distributed_query', type_='foreignkey') op.drop_column(u'distributed_query', 'status') op.drop_column(u'distributed_query', 'retrieved') op.drop_column(u'distributed_query', 'guid') op.drop_column(u'distributed_query', 'node_id') op.add_column(u'distributed_query_result', sa.Column('distributed_query_task_id', sa.Integer(), nullable=True)) # distributed queries and tasks were the same before, # so their id's will remain the same as well. op.execute(""" UPDATE distributed_query_result SET distributed_query_task_id = distributed_query_id;""" ) op.alter_column(u'distributed_query_result', 'distributed_query_task_id', nullable=False) op.create_foreign_key(None, 'distributed_query_result', 'distributed_query_task', ['distributed_query_task_id'], ['id']) ### end Alembic commands ###
def downgrade(): ### commands auto generated by Alembic - please adjust! ### DistributedQuery = namedtuple('DistributedQuery', [ 'task_id', 'query_id', 'guid', 'status', 'sql', 'timestamp', 'not_before', 'retrieved', 'node_id']) cursor = op.get_bind().execute(""" SELECT DISTINCT t.id AS task_id, q.id AS query_id, t.guid, t.status, q.sql, q.timestamp, q.not_before, t.timestamp AS retrieved, t.node_id FROM distributed_query q INNER JOIN distributed_query_task t ON q.id = t.distributed_query_id ORDER BY t.id; """) results = map(DistributedQuery._make, cursor.fetchall()) op.drop_constraint(u'distributed_query_result_distributed_query_task_id_fkey', 'distributed_query_result', type_='foreignkey') op.drop_column(u'distributed_query_result', 'distributed_query_task_id') op.drop_constraint(u'distributed_query_task_distributed_query_id_fkey', 'distributed_query_task', type_='foreignkey') op.drop_table(u'distributed_query') distributed_query = op.create_table('distributed_query', sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), sa.Column('guid', sa.String(), nullable=False), sa.Column('status', sa.Integer(), nullable=False), sa.Column('sql', sa.String(), nullable=False), sa.Column('timestamp', sa.DateTime(), nullable=True), sa.Column('not_before', sa.DateTime(), nullable=True), sa.Column('retrieved', sa.DateTime(), nullable=True), sa.Column('node_id', sa.Integer(), nullable=False), sa.ForeignKeyConstraint(['node_id'], ['node.id'], ), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('guid') ) distributed_queries = [dict( guid=r.guid, status=r.status, sql=r.sql, timestamp=r.timestamp, not_before=r.not_before, retrieved=r.retrieved, node_id=r.node_id) for r in results] op.bulk_insert(distributed_query, distributed_queries) op.drop_table('distributed_query_task') ### end Alembic commands ###
def upgrade(): """Upgrade the database to a newer revision.""" # ### commands auto generated by Alembic - please adjust! ### ecosystems = op.create_table('ecosystems', sa.Column('id', sa.Integer(), nullable=False), sa.Column('name', sa.String(length=255), nullable=True), sa.Column('_backend', sa.Enum('none', 'npm', 'maven', 'pypi', 'rubygems', 'scm', 'crates', name='ecosystem_backend_enum'), nullable=True), sa.Column('url', sa.String(length=255), nullable=True), sa.Column('fetch_url', sa.String(length=255), nullable=True), sa.PrimaryKeyConstraint('id')) op.bulk_insert(ecosystems, [ {'id': 1, 'name': 'rubygems', '_backend': 'rubygems', 'url': 'https://rubygems.org/', 'fetch_url': 'https://rubygems.org/api/v1'}, {'id': 2, 'name': 'npm', '_backend': 'npm', 'url': 'https://www.npmjs.com/', 'fetch_url': 'https://registry.npmjs.org/'}, {'id': 3, 'name': 'maven', '_backend': 'maven', 'url': 'https://repo1.maven.org/maven2/', 'fetch_url': None}, {'id': 4, 'name': 'pypi', '_backend': 'pypi', 'url': 'https://pypi.python.org/', 'fetch_url': 'https://pypi.python.org/pypi'}, {'id': 5, 'name': 'go', '_backend': 'scm', 'url': None, 'fetch_url': None}, {'id': 6, 'name': 'crates', '_backend': 'crates', 'url': 'https://crates.io/', 'fetch_url': None}, ]) op.create_table('packages', sa.Column('id', sa.Integer(), nullable=False), sa.Column('ecosystem_id', sa.Integer(), nullable=True), sa.Column('name', sa.String(length=255), nullable=True), sa.ForeignKeyConstraint(['ecosystem_id'], ['ecosystems.id'], ), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('ecosystem_id', 'name', name='ep_unique')) op.create_table('versions', sa.Column('id', sa.Integer(), nullable=False), sa.Column('package_id', sa.Integer(), nullable=True), sa.Column('identifier', sa.String(length=255), nullable=True), sa.ForeignKeyConstraint(['package_id'], ['packages.id'], ), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('package_id', 'identifier', name='pv_unique')) op.add_column('analyses', sa.Column('version_id', sa.Integer(), nullable=True)) op.create_foreign_key(None, 'analyses', 'versions', ['version_id'], ['id']) op.drop_column('analyses', 'package') op.drop_column('analyses', 'ecosystem') op.drop_column('analyses', 'version') op.add_column('analysis_requests', sa.Column('version_id', sa.Integer(), nullable=True)) op.drop_index('epv_index', table_name='analysis_requests') op.create_index('epv_index', 'analysis_requests', ['version_id'], unique=True, postgresql_where=sa.text('fulfilled_at IS NULL')) op.create_foreign_key(None, 'analysis_requests', 'versions', ['version_id'], ['id']) op.drop_column('analysis_requests', 'package') op.drop_column('analysis_requests', 'ecosystem') op.drop_column('analysis_requests', 'version') # ### end Alembic commands ###
def upgrade(): # ### commands auto generated by Alembic - please adjust! ### video_file_table = op.create_table('video_file', sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False), sa.Column('bangumi_id', postgresql.UUID(as_uuid=True), nullable=False), sa.Column('episode_id', postgresql.UUID(as_uuid=True), nullable=False), sa.Column('file_name', sa.String(), nullable=True), sa.Column('file_path', sa.String(), nullable=True), sa.Column('torrent_id', sa.String(), nullable=True), sa.Column('download_url', sa.String(), nullable=True), sa.Column('status', sa.Integer(), nullable=False), sa.Column('resolution_w', sa.Integer(), nullable=True), sa.Column('resolution_h', sa.Integer(), nullable=True), sa.Column('duration', sa.Integer(), nullable=True), sa.Column('label', sa.String(), nullable=True), sa.ForeignKeyConstraint(['bangumi_id'], ['bangumi.id'], ), sa.ForeignKeyConstraint(['episode_id'], ['episodes.id'], ), sa.PrimaryKeyConstraint('id') ) # ### end Alembic commands ### connection = op.get_bind() result = connection.execute(sa.text( 'SELECT t.episode_id, t.torrent_id, t.file_path, eps.bangumi_id, eps.episode_no FROM torrentfile t LEFT JOIN episodes eps ON eps.id = t.episode_id WHERE file_path NOTNULL')) video_file_list = [] for row in result: video_file = { 'id': uuid4(), 'status': 3 } if row[1] == -1 or __is_uuid4(row[1]): video_file['torrent_id'] = None else: video_file['torrent_id'] = row[1] video_file['episode_id'] = row[0] video_file['file_path'] = row[2] video_file['bangumi_id'] = row[3] meta_info = video_manager.get_video_meta( u'{0}/{1}/{2}'.format(get_base_path(), str(video_file['bangumi_id']), video_file['file_path'])) if meta_info is None: continue video_file['resolution_w'] = meta_info.get('width') video_file['resolution_h'] = meta_info.get('height') video_file['duration'] = meta_info.get('duration') video_file_list.append(video_file) op.bulk_insert(video_file_table, video_file_list) connection.execute(sa.text('UPDATE episodes SET status = 0 WHERE status = 1'))