我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用sqlalchemy.orm.Session()。
def get_or_create(session, model, defaults=None, **kwargs): """ :param Session session: :param model: :param defaults: :param kwargs: :return: """ instance = session.query(model).filter_by(**kwargs).first() if instance: return instance, False else: params = dict((k, v) for k, v in kwargs.items() if not isinstance(v, ClauseElement)) params.update(defaults or {}) instance = model(**params) session.add(instance) return instance, True
def __init__(self, session, model_class, key_name, key_value, property_name): """Constructor for Storage. Args: session: An instance of :class:`sqlalchemy.orm.Session`. model_class: SQLAlchemy declarative mapping. key_name: string, key name for the entity that has the credentials key_value: key value for the entity that has the credentials property_name: A string indicating which property on the ``model_class`` to store the credentials. This property must be a :class:`CredentialsType` column. """ super(Storage, self).__init__() self.session = session self.model_class = model_class self.key_name = key_name self.key_value = key_value self.property_name = property_name
def sessions_scope(local_session, commit=False): """Provide a transactional scope around a series of operations.""" try: yield local_session if commit: local_session.commit() logger.debug('DB session auto-committed as requested') except Exception as e: # We log the exception before re-raising it, in case the rollback also # fails logger.exception('Exception during scoped worker transaction, ' 'rolling back.') # This rollback is potentially redundant with the remove call below, # depending on how the scoped session is configured, but we'll be # explicit here. local_session.rollback() raise e finally: local_session.remove() logger.debug('Session complete, db session closed')
def session(engine, tables): connection = engine.connect() # begin the nested transaction transaction = connection.begin() # use the connection with the already started transaction session = Session(bind=connection) yield session session.close() # roll back the broader transaction transaction.rollback() # put back the connection to the connection pool connection.close() #################################################### # tests
def init_DB(): """ Opens the database connection """ global running global DB global TABLES if running: return Base = automap_base() engine = create_engine(param_all.DBENGINE) Base.prepare(engine, reflect=True) TABLES = {} for k in Base.classes.keys(): TABLES[k] = Base.classes[k] DB = Session(engine) running = True
def setUp(self) -> None: """ Set up some mock model classes and create an endpoint """ self.session = mock.MagicMock(spec=Session) # type: Session self.request = mock.MagicMock(spec=Request) # type: Request self.service_list = mock.MagicMock( spec=ServiceList ) # type: ServiceList self._app = Flask(__name__) self._app.add_url_rule( '/', view_func=ServiceDetail.as_view( ServiceDetail.__name__, self.session ) ) self._context = self._app.test_request_context() self._context.push()
def setUp(self) -> None: """ Create a fake DB session and a fake request, and put them into the concrete endpoint """ self.session = mock.MagicMock(spec=Session) # type: Session self.request = mock.MagicMock(spec=Request) # type: Request self.endpoint = self.ConcreteGetEndpoint( self.session, self.request ) self.app = Flask(__name__) self.app.add_url_rule( '/', view_func=self.ConcreteGetEndpoint.as_view( self.ConcreteGetEndpoint.__name__ ) ) self._context = self.app.test_request_context() self._context.push()
def new( cls, name: str, description: str, registration_schema: JSON, result_schema: JSON, database_session: Session) -> 'Service': """ Create a new service and save the details of this service to some persistent storage. :param name: The name of the newly-created service :param description: A human-readable description describing what the service does. :param registration_schema: A JSON schema describing the set of all JSON objects that are allowed as parameters for this server's jobs :param result_schema: A JSON schema describing the set of all JSON objects that are allowed as results for this server's jobs :param database_session: The SQLAlchemy session to which the database model is to be written :return: The newly-created service """ raise NotImplementedError()
def __init__( self, session: Session, flask_request: Request=request, job_list_model: Optional[JobListInterface]=None ) -> None: """ :param session: The session to use :param flask_request: The Flask request that this endpoint needs to process """ super(self.__class__, self).__init__(session, flask_request) if job_list_model is None: self.job_list = JobListModel(self.database_session) else: self.job_list = job_list_model
def __init__( self, session: Session, flask_request: Request=request, metadata: APIMetadataModelInterface=MetadataModel() ) -> None: """ :param session: The SQLAlchemy ORM Session to use for communicating with the database. This is required by ``AbstractEndpoint`` in order to perform its transaction management :param flask_request: The request to process. By default, this is flask's ``request`` variable :param metadata: The metadata to serialize. By default, this is an instance of the ``MetadataModel`` that pulls all of its values from the ``config`` script. """ super(APIMetadata, self).__init__(session, request=flask_request) self._api_metadata = metadata
def __init__( self, session: Session, flask_request: Request=request, service_list: Optional[ServiceListInterface]=None ) -> None: """ Create a service list model that is capable of getting services from the DB :param session: The database session to use """ super(self.__class__, self).__init__(session, flask_request) if service_list is not None: self.service_list = service_list else: self.service_list = ServiceListModel(session)
def create_logged_in_user(dbsession:Session, registry:Registry, web_server:str, browser:DriverAPI, admin:bool=False, email:str=EMAIL, password:str=PASSWORD): """For a web browser test session, creates a new user and log it in inside the test browser.""" # Catch some common argument misordering issues assert isinstance(registry, Registry) assert isinstance(web_server, str) with transaction.manager: create_user(dbsession, registry, admin=admin, email=email, password=password) b = browser b.visit("{}/{}".format(web_server, "login")) assert b.is_element_present_by_css("#login-form") b.fill("username", email) b.fill("password", password) b.find_by_name("login_email").click() # After login we log out link to confirm login has succeeded assert b.is_element_present_by_css("#nav-logout")
def make_dummy_request(dbsession: Session, registry: Registry) -> IRequest: """Creates a non-functional HTTP request with registry and dbsession configured. Useful for crafting requests with custom settings See also :func:`make_routable_request`. """ @implementer(IRequest) class DummyRequest: pass _request = DummyRequest() _request.dbsession = dbsession _request.user = None _request.registry = registry return _request
def dbsession(engine, db_extension, db_composite_type, tables): """Returns an sqlalchemy session, and after the test tears down everything properly.""" connection = engine.connect() # begin the nested transaction transaction = connection.begin() # use the connection with the already started transaction session = Session(bind=connection) yield session db_cleanup(session) session.close() # roll back the broader transaction transaction.rollback() # put back the connection to the connection pool connection.close()
def foo_session(foo_engine, request): # connect to the database connection = foo_engine.connect() # begin a non-ORM transaction trans = connection.begin() # bind an individual Session to the connection session = Session(bind=connection) def fin(): session.close() # rollback - everything that happened with the # Session above (including calls to commit()) # is rolled back. trans.rollback() # return connection to the Engine connection.close() request.addfinalizer(fin) return session
def test_session(test_engine, request): # engine.echo=True # connect to the database connection = test_engine.connect() # begin a non-ORM transaction trans = connection.begin() # bind an individual Session to the connection session = Session(bind=connection) def fin(): session.close() # rollback - everything that happened with the # Session above (including calls to commit()) # is rolled back. trans.rollback() # return connection to the Engine connection.close() request.addfinalizer(fin) return session
def __wrapped_call__(self, model, session, *values, create=True, **named_values_and_defaults): assert session is None or isinstance(session, ( Session, scoped_session)), 'If provided, session should be an sqlalchemy session object.' # compute filter values (includes call to related models) values, defaults = self.apply_filters(self.filters, values, create=create, session=session, **named_values_and_defaults) # get cache dictionary and key cache = _get_cache_dict(model, session) cache_key = (model,) + tuple(values.items()) # if no cache, delegate to real (decorated) Getter method if not cache_key in cache: # wrapped method call cache[cache_key] = self.getter(model, session, values) if session else None # store cache if cache[cache_key] is None: cache[cache_key] = model(**values, **defaults) if create else None # tie object to session, if available if cache[cache_key] and session: session.add(cache[cache_key]) return cache[cache_key]
def rowmethod(f): @functools.wraps(f) def method(cls_or_self, session, *args, **kwargs): if not isinstance(session, (Session, scoped_session)): raise ValueError('Model methods must take a session as second argument, got {}.'.format(repr(session))) return f(cls_or_self, session, *args, **kwargs) return method
def insert_db(self,new): try: Session = sessionmaker(bind=engine) session = Session() Base.metadata.create_all(engine) session.add(new) session.commit() engine.dispose() return True except Exception as e: return False
def insert_db(self,new): try: Session = sessionmaker(bind=self.engine) session = Session() self.Base.metadata.create_all(self.engine) session.add(new) session.commit() self.engine.dispose() return True except Exception as e: return False
def test_invocation(self): dbapi_session = ReplayableSession() creator = config.db.pool._creator recorder = lambda: dbapi_session.recorder(creator()) engine = create_engine( config.db.url, creator=recorder, use_native_hstore=False) self.metadata = MetaData(engine) self.engine = engine self.session = Session(engine) self.setup_engine() try: self._run_steps(ctx=self._dummy_ctx) finally: self.teardown_engine() engine.dispose() player = lambda: dbapi_session.player() engine = create_engine( config.db.url, creator=player, use_native_hstore=False) self.metadata = MetaData(engine) self.engine = engine self.session = Session(engine) self.setup_engine() try: self._run_steps(ctx=profiling.count_functions) finally: self.session.close() engine.dispose()
def safe_session(self): """Context manager for database session.""" session = Session(bind=self.engine) try: yield session session.commit() except: # if something goes wrong, do not commit session.rollback() raise finally: session.close()
def session(request, engine): from sqlalchemy.orm import Session session = Session(engine) Base.metadata.create_all(engine) def fin(): Base.metadata.drop_all(engine) session.close() request.addfinalizer(fin) return session
def __aenter__(self): # Convenience for `async with ctx.typing(), db.Session() as session` return self
def test_eval_none_flag_orm(self): from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import Session Base = declarative_base() class Data(Base): __table__ = self.tables.data_table s = Session(testing.db) d1 = Data(name='d1', data=None, nulldata=None) s.add(d1) s.commit() s.bulk_insert_mappings( Data, [{"name": "d2", "data": None, "nulldata": None}] ) eq_( s.query( cast(self.tables.data_table.c.data, String(convert_unicode="force")), cast(self.tables.data_table.c.nulldata, String) ).filter(self.tables.data_table.c.name == 'd1').first(), ("null", None) ) eq_( s.query( cast(self.tables.data_table.c.data, String(convert_unicode="force")), cast(self.tables.data_table.c.nulldata, String) ).filter(self.tables.data_table.c.name == 'd2').first(), ("null", None) )
def setUpClass(cls) -> None: """ Add a service to the DB """ AcceptanceTestCase.setUpClass() cls.service_name = 'Testing Service' cls.description = 'Description for the Testing Service' cls.job_registration_schema = JSONSchema( title='Job Registration Schema', description='Must be fulfilled for an experiment' ).dump(cls.JobRegistrationSchema()) cls.job_result_schema = JSONSchema( title='Job Result Schema', description='Must be fulfilled to post results' ).dump(cls.JobRegistrationSchema()) session = Session(bind=APP_FACTORY.engine, expire_on_commit=False) service_list = ServiceList(session) cls.service = service_list.new( cls.service_name, cls.description, cls.job_registration_schema, cls.job_result_schema ) session.commit()
def tearDownClass(cls) -> None: """ Delete the service from the DB """ if hasattr(cls, 'service'): session = Session(bind=APP_FACTORY.engine) service_list = ServiceList(session) del service_list[cls.service.id] AcceptanceTestCase.tearDownClass()
def setUp(self) -> None: TestAPI.setUp(self) self.session = mock.MagicMock(spec=Session) self.request = mock.MagicMock(spec=Request) self.service_list = mock.MagicMock(spec=ServiceList) self.testing_app.add_url_rule( '/test_url/<service_id>', view_func=JobsForServiceEndpoint.as_view( JobsForServiceEndpoint.__name__ ) )
def setUp(self) -> None: self.session = mock.MagicMock(spec=Session) self.request = mock.MagicMock(spec=Request) app = Flask(__name__) app.add_url_rule( '/', view_func=JSONSchemaValidator.as_view( JSONSchemaValidator.__name__ ) ) self.context = app.test_request_context() self.context.push()
def setUp(self) -> None: """ Set up the test """ self.session = mock.MagicMock(spec=Session) self.request = mock.MagicMock(spec=Request) app = Flask(__name__) app.add_url_rule( '/', view_func=JobDetail.as_view( JobDetail.__name__, ) ) self.context = app.test_request_context() self.context.push()
def setUp(self) -> None: """ Create a fake service list endpoint """ TestAPI.setUp(self) self.session = mock.MagicMock(spec=Session) # type: Session self.request = mock.MagicMock(spec=Request) # type: Request
def setUp(self) -> None: """ Create a mock Flask request a mock DB session, and have the endpoint handle these fake requests """ self.request = mock.MagicMock(spec=Request) self.session = mock.MagicMock(spec=Session) self._app = Flask(__name__) self._app.add_url_rule('/', view_func=JobsList.as_view( JobsList.__name__ )) self._context = self._app.test_request_context() self._context.push()
def setUp(self) -> None: self.session = mock.MagicMock(spec=Session) # type: Session self.request = mock.MagicMock(spec=Request) # type: Request app = Flask(__name__) app.add_url_rule('/', view_func=NextJob.as_view( NextJob.__name__ )) self.context = app.test_request_context() self.context.push()
def setUp(self) -> None: """ Create a mock DB session, and make the service list use this mock session for testing """ self.db_session = MagicMock(spec=Session) # type: Session self.service_list = ServiceList(self.db_session) self.service_id = MagicMock(spec=UUID) # type: UUID
def setUp(self) -> None: """ Set up mock p, AsyncIterator,arameters for the query, the session, and the mock job list. """ self.session = mock.MagicMock(spec=Session) # type: Session self.root_query = mock.MagicMock(spec=Query) # type: Query self.job_list = self.MockJobListFromQuery( self.session, self.root_query )
def __init__(self, db_session: Session, root_job_query: Query): super(self.__class__, self).__init__(db_session) self._root_query = root_job_query
def new( self, name: str, description: str, registration_schema: JSON, result_schema: JSON ) -> ServiceInterface: return MockService.new( name, description, registration_schema, result_schema, mock.MagicMock(spec=Session) # type: Session )
def _write_service(cls, service: Service, session: Session) -> None: session.add(service) cls._safely_commit_session(session)