我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.event.listen()。
def _listen(cls, event_key, propagate=True, **kw): target, identifier, fn = \ event_key.dispatch_target, event_key.identifier, \ event_key._listen_fn def listen(target_cls, *arg): listen_cls = target() if propagate and issubclass(target_cls, listen_cls): return fn(target_cls, *arg) elif not propagate and target_cls is listen_cls: return fn(target_cls, *arg) def remove(ref): key = event.registry._EventKey( None, identifier, listen, instrumentation._instrumentation_factory) getattr(instrumentation._instrumentation_factory.dispatch, identifier).remove(key) target = weakref.ref(target.class_, remove) event_key.\ with_dispatch_target(instrumentation._instrumentation_factory).\ with_wrapper(listen).base_listen(**kw)
def _listen(cls, event_key, raw=False, propagate=False, **kw): target, identifier, fn = \ event_key.dispatch_target, event_key.identifier, event_key.fn if target.class_ in target.all_holds: collection = target.all_holds[target.class_] else: collection = target.all_holds[target.class_] = {} event.registry._stored_in_collection(event_key, target) collection[event_key._key] = (event_key, raw, propagate) if propagate: stack = list(target.class_.__subclasses__()) while stack: subclass = stack.pop(0) stack.extend(subclass.__subclasses__()) subject = target.resolve(subclass) if subject is not None: # we are already going through __subclasses__() # so leave generic propagate flag False event_key.with_dispatch_target(subject).\ listen(raw=raw, propagate=False, **kw)
def _accept_with(cls, target): if isinstance(target, scoped_session): target = target.session_factory if not isinstance(target, sessionmaker) and \ ( not isinstance(target, type) or not issubclass(target, Session) ): raise exc.ArgumentError( "Session event listen on a scoped_session " "requires that its creation callable " "is associated with the Session class.") if isinstance(target, sessionmaker): return target.class_ elif isinstance(target, type): if issubclass(target, scoped_session): return Session elif issubclass(target, Session): return target elif isinstance(target, Session): return target else: return None
def define_views(cls, metadata, schema): for table_name in ('users', 'email_addresses'): fullname = table_name if schema: fullname = "%s.%s" % (schema, table_name) view_name = fullname + '_v' query = "CREATE VIEW %s AS SELECT * FROM %s" % ( view_name, fullname) event.listen( metadata, "after_create", DDL(query) ) event.listen( metadata, "before_drop", DDL("DROP VIEW %s" % view_name) )
def __init__(self, tracer, service, engine): self.tracer = tracer self.engine = engine self.vendor = sqlx.normalize_vendor(engine.name) self.service = service or self.vendor self.name = "%s.query" % self.vendor # set the service info. self.tracer.set_service_info( service=self.service, app=self.vendor, app_type=sqlx.APP_TYPE) # attach the PIN Pin( app=self.vendor, tracer=tracer, service=self.service, app_type=sqlx.APP_TYPE, ).onto(engine) listen(engine, 'before_cursor_execute', self._before_cur_exec) listen(engine, 'after_cursor_execute', self._after_cur_exec) listen(engine, 'dbapi_error', self._dbapi_error)
def _setup(self): kwargs = {} if self._database_url.get_driver_name() == 'postgresql': kwargs['use_native_unicode'] = self._use_native_unicode if self._pool_size is not None: kwargs['pool_size'] = self._pool_size self._engine = create_engine(self._database_url, **kwargs) if self._engine_events: for (name, listener) in self._engine_events: event.listen(self._engine, name, listener) self._factory = sessionmaker() self._factory.configure(bind=self._engine)
def get_engine(connection_string): """ Get a SQLAlchemy engine that allows us to connect to a database. """ # Set echo to False, as otherwise we get full SQL Query outputted, which can overwhelm the terminal engine = create_engine( connection_string, echo=False, connect_args={'check_same_thread': False}, poolclass=NullPool, convert_unicode=True, ) if connection_string == get_default_db_string() and connection_string.startswith('sqlite'): event.listen(engine, "connect", set_sqlite_connection_pragma) connection = engine.connect() connection.execute(START_PRAGMAS) connection.close() return engine
def _listen(cls, target, identifier, fn, propagate=True): def listen(target_cls, *arg): listen_cls = target() if propagate and issubclass(target_cls, listen_cls): return fn(target_cls, *arg) elif not propagate and target_cls is listen_cls: return fn(target_cls, *arg) def remove(ref): event.Events._remove(orm.instrumentation._instrumentation_factory, identifier, listen) target = weakref.ref(target.class_, remove) event.Events._listen(orm.instrumentation._instrumentation_factory, identifier, listen)
def _accept_with(cls, target): if isinstance(target, orm.scoped_session): target = target.session_factory if not isinstance(target, orm.sessionmaker) and \ ( not isinstance(target, type) or not issubclass(target, orm.Session) ): raise exc.ArgumentError( "Session event listen on a scoped_session " "requires that its creation callable " "is associated with the Session class.") if isinstance(target, orm.sessionmaker): return target.class_ elif isinstance(target, type): if issubclass(target, orm.scoped_session): return orm.Session elif issubclass(target, orm.Session): return target elif isinstance(target, orm.Session): return target else: return None
def listens_for(target, identifier, *args, **kw): """Decorate a function as a listener for the given target + identifier. e.g.:: from sqlalchemy import event from sqlalchemy.schema import UniqueConstraint @event.listens_for(UniqueConstraint, "after_parent_attach") def unique_constraint_name(const, table): const.name = "uq_%s_%s" % ( table.name, list(const.columns)[0].name ) """ def decorate(fn): listen(target, identifier, fn, *args, **kw) return fn return decorate
def init_event_listeners(db_session): """ Initialize/register all SQLAlchemy event listeners. See http://docs.sqlalchemy.org/en/latest/orm/events.html :param db_session: the Database Session :type db_session: sqlalchemy.orm.session.Session """ logger.debug('Setting up DB model event listeners') event.listen( Transaction.actual_amount, 'set', handle_trans_amount_change, active_history=True, named=True ) event.listen( db_session, 'before_flush', handle_before_flush )
def configure_db(app): from sqlalchemy import event from sqlalchemy.orm import mapper from sqlalchemy.inspection import inspect alembic.init_app(app) db.init_app(app) nplusone.init_app(app) @event.listens_for(mapper, "init") def instant_defaults_listener(target, args, kwargs): for key, column in inspect(type(target)).columns.items(): if column.default is not None: if callable(column.default.arg): setattr(target, key, column.default.arg(target)) else: setattr(target, key, column.default.arg) event.listen(mapper, 'init', instant_defaults_listener)