我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.Table()。
def _equities_table_schema(metadata): # NOTE: When modifying this schema, update the ASSET_DB_VERSION value return sa.Table( 'equities', metadata, sa.Column( 'sid', sa.Integer, unique=True, nullable=False, primary_key=True, ), sa.Column('symbol', sa.Text), sa.Column('company_symbol', sa.Text, index=True), sa.Column('share_class_symbol', sa.Text), sa.Column('fuzzy_symbol', sa.Text, index=True), sa.Column('asset_name', sa.Text), sa.Column('start_date', sa.Integer, default=0, nullable=False), sa.Column('end_date', sa.Integer, nullable=False), sa.Column('first_traded', sa.Integer, nullable=False), sa.Column('auto_close_date', sa.Integer), sa.Column('exchange', sa.Text), )
def _futures_root_symbols_schema(metadata): # NOTE: When modifying this schema, update the ASSET_DB_VERSION value return sa.Table( 'futures_root_symbols', metadata, sa.Column( 'root_symbol', sa.Text, unique=True, nullable=False, primary_key=True, ), sa.Column('root_symbol_id', sa.Integer), sa.Column('sector', sa.Text), sa.Column('description', sa.Text), sa.Column( 'exchange', sa.Text, sa.ForeignKey('futures_exchanges.exchange'), ), )
def _version_table_schema(metadata): # NOTE: When modifying this schema, update the ASSET_DB_VERSION value return sa.Table( 'version_info', metadata, sa.Column( 'id', sa.Integer, unique=True, nullable=False, primary_key=True, ), sa.Column( 'version', sa.Integer, unique=True, nullable=False, ), # This constraint ensures a single entry in this table sa.CheckConstraint('id <= 1'), )
def _show_create_table(self, connection, table, charset=None, full_name=None): """Run SHOW CREATE TABLE for a ``Table``.""" if full_name is None: full_name = self.identifier_preparer.format_table(table) st = "SHOW CREATE TABLE %s" % full_name rp = None try: rp = connection.execution_options( skip_user_error_events=True).execute(st) except exc.DBAPIError as e: if self._extract_error_code(e.orig) == 1146: raise exc.NoSuchTableError(full_name) else: raise row = self._compat_first(rp, charset=charset) if not row: raise exc.NoSuchTableError(full_name) return row[1].strip() return sql
def _describe_table(self, connection, table, charset=None, full_name=None): """Run DESCRIBE for a ``Table`` and return processed rows.""" if full_name is None: full_name = self.identifier_preparer.format_table(table) st = "DESCRIBE %s" % full_name rp, rows = None, None try: try: rp = connection.execution_options( skip_user_error_events=True).execute(st) except exc.DBAPIError as e: if self._extract_error_code(e.orig) == 1146: raise exc.NoSuchTableError(full_name) else: raise rows = self._compat_fetchall(rp, charset=charset) finally: if rp: rp.close() return rows
def __init__( self, element, on=None, bind=None, include_foreign_key_constraints=None): """Create a :class:`.CreateTable` construct. :param element: a :class:`.Table` that's the subject of the CREATE :param on: See the description for 'on' in :class:`.DDL`. :param bind: See the description for 'bind' in :class:`.DDL`. :param include_foreign_key_constraints: optional sequence of :class:`.ForeignKeyConstraint` objects that will be included inline within the CREATE construct; if omitted, all foreign key constraints that do not specify use_alter=True are included. .. versionadded:: 1.0.0 """ super(CreateTable, self).__init__(element, on=on, bind=bind) self.columns = [CreateColumn(column) for column in element.columns ] self.include_foreign_key_constraints = include_foreign_key_constraints
def bind(self): """Return the current "bind". In online mode, this is an instance of :class:`sqlalchemy.engine.Connection`, and is suitable for ad-hoc execution of any kind of usage described in :ref:`sqlexpression_toplevel` as well as for usage with the :meth:`sqlalchemy.schema.Table.create` and :meth:`sqlalchemy.schema.MetaData.create_all` methods of :class:`~sqlalchemy.schema.Table`, :class:`~sqlalchemy.schema.MetaData`. Note that when "standard output" mode is enabled, this bind will be a "mock" connection handler that cannot return results and is only appropriate for a very limited subset of commands. """ return self.connection
def get_columns_from_etl_table(self): try: extra = {} meta = MetaData(**extra.get('metadata_params', {})) table = Table( self.sql_table_name, meta, schema=self.schema or None, autoload=True, autoload_with=self.local_engine) except Exception: raise Exception( "Table doesn't seem to exist in the specified database, " "couldn't fetch column information") return len(table.columns)
def _gen_sa_table(sectype, metadata=None): """Generate SQLAlchemy Table object by sectype. """ if metadata is None: metadata = MetaData() table = Table( sectype, metadata, Column('Symbol', String(20), primary_key=True), Column('DataType', String(20), primary_key=True), Column('BarSize', String(10), primary_key=True), Column('TickerTime', DateTime(), primary_key=True), Column('opening', Float(10, 2)), Column('high', Float(10, 2)), Column('low', Float(10, 2)), Column('closing', Float(10, 2)), Column('volume', mysqlINTEGER(unsigned=True)), Column('barcount', mysqlINTEGER(unsigned=True)), Column('average', Float(10, 2)) ) return table
def init_tables(): db_instance = Table("instance", dbmeta, Column("id", Integer, primary_key=True), Column("ip", String(20)), Column("name", String(90)), Column("uuid", String(40)), Column("state", String(90)), Column("dns_domain", String(90)), Column("created_at", DateTime), Column("updated_at", DateTime), Column("deleted_at", DateTime) ) db_event = Table("event", dbmeta, #will be used for synchronization Column("id", Integer, primary_key=True), Column("fk_instance_id", Integer, ForeignKey("instance.id")), Column("type", String(24)),#enum Column("description", String(90)), Column("date", DateTime) ) dbmeta.drop_all(dbengine) dbmeta.create_all(dbengine)
def _show_create_table(self, connection, table, charset=None, full_name=None): """Run SHOW CREATE TABLE for a ``Table``.""" if full_name is None: full_name = self.identifier_preparer.format_table(table) st = "SHOW CREATE TABLE %s" % full_name rp = None try: rp = connection.execute(st) except exc.DBAPIError as e: if self._extract_error_code(e.orig) == 1146: raise exc.NoSuchTableError(full_name) else: raise row = self._compat_first(rp, charset=charset) if not row: raise exc.NoSuchTableError(full_name) return row[1].strip() return sql
def _describe_table(self, connection, table, charset=None, full_name=None): """Run DESCRIBE for a ``Table`` and return processed rows.""" if full_name is None: full_name = self.identifier_preparer.format_table(table) st = "DESCRIBE %s" % full_name rp, rows = None, None try: try: rp = connection.execute(st) except exc.DBAPIError as e: if self._extract_error_code(e.orig) == 1146: raise exc.NoSuchTableError(full_name) else: raise rows = self._compat_fetchall(rp, charset=charset) finally: if rp: rp.close() return rows
def setup_class(cls): if engine_is_sqlite(): raise SkipTest("PostGIS is required for this test") # This will create the PostGIS tables (geometry_columns and # spatial_ref_sys) which were deleted when rebuilding the database table = Table('spatial_ref_sys', meta.metadata) if not table.exists(): create_postgis_tables() # When running the tests with the --reset-db option for some # reason the metadata holds a reference to the `package_extent` # table after being deleted, causing an InvalidRequestError # exception when trying to recreate it further on if 'package_extent' in meta.metadata.tables: meta.metadata.remove(meta.metadata.tables['package_extent']) spatial_db_setup() # Setup the harvest tables harvest_model_setup()
def upgrade(migrate_engine): meta = sqlalchemy.MetaData() meta.bind = migrate_engine job = sqlalchemy.Table( 'job', meta, sqlalchemy.Column('id', sqlalchemy.String(50), primary_key=True, nullable=False), sqlalchemy.Column('scheduler_id', sqlalchemy.String(36), nullable=False), sqlalchemy.Column('job_type', sqlalchemy.String(10), nullable=False), sqlalchemy.Column('parameters', types.Dict), sqlalchemy.Column('created_at', sqlalchemy.DateTime), sqlalchemy.Column('updated_at', sqlalchemy.DateTime), mysql_engine='InnoDB', mysql_charset='utf8' ) try: job.create() except Exception: LOG.error("Table |%s| not created!", repr(job)) raise
def define_user_notification_table(): global user_notification_table user_notification_table = Table('ckanext_requestdata_user_notification', metadata, Column('id', types.UnicodeText, primary_key=True, default=make_uuid), Column('package_maintainer_id', types.UnicodeText, nullable=False), Column('seen', types.Boolean, default=False), Index('ckanext_requestdata_user_' 'notification_id_idx', 'id')) mapper( ckanextUserNotification, user_notification_table )
def define_maintainers_table(): global maintainers_table maintainers_table = Table('ckanext_requestdata_maintainers', metadata, Column('id', types.UnicodeText, primary_key=True, default=make_uuid), Column('request_data_id', types.UnicodeText, ForeignKey('ckanext_requestdata_' 'requests.id')), Column('maintainer_id', types.UnicodeText), Column('email', types.UnicodeText), Index('ckanext_requestdata_maintainers_id_idx', 'id')) mapper( ckanextMaintainers, maintainers_table )
def test_fancy_coltypes(self): Table( 'simple_items', self.metadata, Column('enum', ENUM('A', 'B', name='blah')), Column('bool', BOOLEAN), Column('number', NUMERIC(10, asdecimal=False)), ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import Boolean, Column, Enum, MetaData, Numeric, Table metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('enum', Enum('A', 'B', name='blah')), Column('bool', Boolean), Column('number', Numeric(10, asdecimal=False)) ) """
def test_boolean_detection(self): Table( 'simple_items', self.metadata, Column('bool1', INTEGER), Column('bool2', SMALLINT), Column('bool3', TINYINT), CheckConstraint('simple_items.bool1 IN (0, 1)'), CheckConstraint('simple_items.bool2 IN (0, 1)'), CheckConstraint('simple_items.bool3 IN (0, 1)') ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import Boolean, Column, MetaData, Table metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('bool1', Boolean), Column('bool2', Boolean), Column('bool3', Boolean) ) """
def test_enum_detection(self): Table( 'simple_items', self.metadata, Column('enum', VARCHAR(255)), CheckConstraint(r"simple_items.enum IN ('A', '\'B', 'C')") ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import Column, Enum, MetaData, Table metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('enum', Enum('A', "\\\\'B", 'C')) ) """
def test_column_adaptation(self): Table( 'simple_items', self.metadata, Column('id', BIGINT), Column('length', DOUBLE_PRECISION) ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import BigInteger, Column, Float, MetaData, Table metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('id', BigInteger), Column('length', Float) ) """
def test_mysql_column_types(self): Table( 'simple_items', self.metadata, Column('id', mysql.INTEGER), Column('name', mysql.VARCHAR(255)) ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import Column, Integer, MetaData, String, Table metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('id', Integer), Column('name', String(255)) ) """
def test_noindexes_table(self): simple_items = Table( 'simple_items', self.metadata, Column('number', INTEGER), CheckConstraint('number > 2') ) simple_items.indexes.add(Index('idx_number', simple_items.c.number)) assert self.generate_code(noindexes=True) == """\ # coding: utf-8 from sqlalchemy import CheckConstraint, Column, Integer, MetaData, Table metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('number', Integer), CheckConstraint('number > 2') ) """
def test_no_inflect(self): Table( 'simple_items', self.metadata, Column('id', INTEGER, primary_key=True) ) assert self.generate_code(noinflect=True) == """\ # coding: utf-8 from sqlalchemy import Column, Integer from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() metadata = Base.metadata class SimpleItems(Base): __tablename__ = 'simple_items' id = Column(Integer, primary_key=True) """
def test_table_kwargs(self): Table( 'simple_items', self.metadata, Column('id', INTEGER, primary_key=True), schema='testschema' ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import Column, Integer from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() metadata = Base.metadata class SimpleItem(Base): __tablename__ = 'simple_items' __table_args__ = {'schema': 'testschema'} id = Column(Integer, primary_key=True) """
def test_schema_table(self): Table( 'simple_items', self.metadata, Column('name', VARCHAR), schema='testschema' ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import Column, MetaData, String, Table metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('name', String), schema='testschema' ) """
def test_schema_boolean(self): Table( 'simple_items', self.metadata, Column('bool1', INTEGER), CheckConstraint('testschema.simple_items.bool1 IN (0, 1)'), schema='testschema' ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import Boolean, Column, MetaData, Table metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('bool1', Boolean), schema='testschema' ) """
def test_foreign_key_options(self): Table( 'simple_items', self.metadata, Column('name', VARCHAR, ForeignKey('simple_items.name', ondelete='CASCADE', onupdate='CASCADE', deferrable=True, initially='DEFERRED')) ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import Column, ForeignKey, MetaData, String, Table metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('name', String, ForeignKey('simple_items.name', ondelete='CASCADE', onupdate='CASCADE', \ deferrable=True, initially='DEFERRED')) ) """
def test_pk_default(self): Table( 'simple_items', self.metadata, Column('id', INTEGER, primary_key=True, server_default=text('uuid_generate_v4()')) ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import Column, Integer, text from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() metadata = Base.metadata class SimpleItem(Base): __tablename__ = 'simple_items' id = Column(Integer, primary_key=True, server_default=text("uuid_generate_v4()")) """
def test_pascal(self): Table( 'CustomerAPIPreference', self.metadata, Column('id', INTEGER, primary_key=True) ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import Column, Integer from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() metadata = Base.metadata class CustomerAPIPreference(Base): __tablename__ = 'CustomerAPIPreference' id = Column(Integer, primary_key=True) """
def __init__(self, *args, **kwargs): super(SQLAlchemyConnector, self).__init__(*args, **kwargs) self.table = kwargs.get('table') self.schema = kwargs.get('schema') self.dbEngine = None self.sessions = {} # dbparams can include values in http://www.postgresql.org/docs/ # current/static/libpq-connect.html#LIBPQ-PARAMKEYWORDS self.dbparams = kwargs.get('dbparams', {}) self.databaseUri = self.adjustDBUri(kwargs.get('uri')) # Additional parameters: # idletime: seconds after which a connection is considered idle # abandontime: seconds after which a connection will be abandoned self.dbIdleTime = float(kwargs.get('idletime', 300)) self.dbAbandonTime = float(kwargs.get('abandontime', self.dbIdleTime * 5)) self.databaseOperators = DatabaseOperators self.fields = None self.allowFieldFunctions = True self.allowSortFunctions = True self.allowFilterFunctions = True self.initialized = True self.types = {type: getattr(sqlalchemy, type) for type in dir(sqlalchemy) if isinstance(getattr(sqlalchemy, type), sqlalchemy.sql.visitors.VisitableType)} class Table(object): """ This is used to handle table properties from SQLAlchemy. """ pass self.tableClass = Table self._allowedFunctions = { 'cast': True, 'count': True, 'distinct': True, }
def check_version_info(version_table, expected_version): """ Checks for a version value in the version table. Parameters ---------- version_table : sa.Table The version table of the asset database expected_version : int The expected version of the asset database Raises ------ AssetDBVersionError If the version is in the table and not equal to ASSET_DB_VERSION. """ # Read the version out of the table version_from_table = sa.select((version_table.c.version,)).scalar() # A db without a version is considered v0 if version_from_table is None: version_from_table = 0 # Raise an error if the versions do not match if (version_from_table != expected_version): raise AssetDBVersionError(db_version=version_from_table, expected_version=expected_version)
def write_version_info(version_table, version_value): """ Inserts the version value in to the version table. Parameters ---------- version_table : sa.Table The version table of the asset database version_value : int The version to write in to the database """ sa.insert(version_table, values={'version': version_value}).execute()
def _futures_exchanges_schema(metadata): # NOTE: When modifying this schema, update the ASSET_DB_VERSION value return sa.Table( 'futures_exchanges', metadata, sa.Column( 'exchange', sa.Text, unique=True, nullable=False, primary_key=True, ), sa.Column('timezone', sa.Text), )
def _asset_router_schema(metadata): # NOTE: When modifying this schema, update the ASSET_DB_VERSION value return sa.Table( 'asset_router', metadata, sa.Column( 'sid', sa.Integer, unique=True, nullable=False, primary_key=True), sa.Column('asset_type', sa.Text), )
def get_column_specification(self, column, **kwargs): colspec = ( self.preparer.format_column(column) + " " + self.dialect.type_compiler.process( column.type, type_expression=column) ) if column.nullable is not None: if not column.nullable or column.primary_key or \ isinstance(column.default, sa_schema.Sequence): colspec += " NOT NULL" else: colspec += " NULL" if column.table is None: raise exc.CompileError( "mssql requires Table-bound columns " "in order to generate DDL") # install an IDENTITY Sequence if we either a sequence or an implicit # IDENTITY column if isinstance(column.default, sa_schema.Sequence): if column.default.start == 0: start = 0 else: start = column.default.start or 1 colspec += " IDENTITY(%s,%s)" % (start, column.default.increment or 1) elif column is column.table._autoincrement_column: colspec += " IDENTITY(1,1)" else: default = self.get_column_default_string(column) if default is not None: colspec += " DEFAULT " + default return colspec
def before_create(self, target, connection, **kw): """Called before CREATE statements are emitted. :param target: the :class:`.MetaData` or :class:`.Table` object which is the target of the event. :param connection: the :class:`.Connection` where the CREATE statement or statements will be emitted. :param \**kw: additional keyword arguments relevant to the event. The contents of this dictionary may vary across releases, and include the list of tables being generated for a metadata-level event, the checkfirst flag, and other elements used by internal events. """
def after_create(self, target, connection, **kw): """Called after CREATE statements are emitted. :param target: the :class:`.MetaData` or :class:`.Table` object which is the target of the event. :param connection: the :class:`.Connection` where the CREATE statement or statements have been emitted. :param \**kw: additional keyword arguments relevant to the event. The contents of this dictionary may vary across releases, and include the list of tables being generated for a metadata-level event, the checkfirst flag, and other elements used by internal events. """