Python sqlalchemy_utils 模块,database_exists() 实例源码
我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用sqlalchemy_utils.database_exists()。
def app(request):
test_app = ApplicationFactory.create_application('testing')
test_app.app_context().push()
if database_exists(db.engine.url):
drop_database(db.engine.url)
create_database(db.engine.url)
db.create_all()
def teardown():
db.session.expunge_all()
db.session.remove()
drop_database(db.engine.url)
db.engine.dispose()
request.addfinalizer(teardown)
return test_app
def init(config):
db_connect = config.conf['db_connect']
if not db_connect:
sys.exit("Error: database connection string not "
"found in any of the configuration files")
try:
engine = create_engine(db_connect)
except exc.NoSuchModuleError as e:
sys.exit("Error: %s" % str(e))
try:
if not database_exists(engine.url):
printv(config, "Creating database: 'openpassphrase'")
create_database(engine.url)
except exc.OperationalError as e:
sys.exit("Error: %s" % str(e))
printv(config, "Creating tables based on models")
models.Base.metadata.create_all(engine)
def setUpClass(self):
self.DBURI = "postgresql+psycopg2://@:5432/dbimport"
if database_exists(self.DBURI):
drop_database(self.DBURI)
create_database(self.DBURI)
engine = create_engine(self.DBURI)
models.bind_engine(engine)
self.references = {"1": 249250621, "2": 243199373, "3": 198022430}
self.array = "test"
# referenceset, workspace, and individual registration previously
# tested
with dbimport.DBImport(self.DBURI).getSession() as session:
self.referenceset = session.registerReferenceSet(
str(uuid.uuid4()), "testAssembly", references=self.references)
self.workspace = session.registerWorkspace(
str(uuid.uuid4()), "/test/dbimport/workspace")
self.individual = session.registerIndividual(
str(uuid.uuid4()), name="testIndividual")
def setUpClass(self):
self.TESTDB_URI = "postgresql+psycopg2://@:5432/mafend2end"
if not database_exists(self.TESTDB_URI):
create_database(self.TESTDB_URI)
engine = create_engine(self.TESTDB_URI)
models.bind_engine(engine)
self.config_path = os.path.join(os.path.realpath(
sys.argv[-1]), "utils/example_configs/icgc_config.json")
with open(self.config_path, 'r') as readFP:
config_json = json.load(readFP)
config_json["TileDBConfig"] = os.path.join(os.path.realpath(
sys.argv[-1]), "utils/example_configs/tiledb_config.json")
config_json["TileDBAssembly"] = os.path.join(
os.path.realpath(sys.argv[-1]), "utils/example_configs/hg19.json")
config_json["VariantSetMap"]["VariantConfig"] = os.path.join(
os.path.realpath(sys.argv[-1]), "utils/example_configs/icgc_variants.json")
with open(self.config_path, 'w') as writeFP:
writeFP.write(json.dumps(config_json))
config = ConfigReader(self.config_path)
config.DB_URI = self.TESTDB_URI
imp.helper.registerWithMetadb(config)
def setUpClass(self):
self.DBURI = "postgresql+psycopg2://@:5432/vcfimport"
if database_exists(self.DBURI):
drop_database(self.DBURI)
create_database(self.DBURI)
engine = create_engine(self.DBURI)
models.bind_engine(engine)
self.assembly = "testAssembly"
# test files
with open("test/data/header.vcf", "r") as f:
self.header = f.read()
# create the base config
self.config_path = os.path.abspath(
"utils/example_configs/vcf_import.config")
with open(self.config_path, 'r') as readFP:
self.config = json.load(readFP)
self.config["dburi"] = self.DBURI
def init(db_url, db_log_flag = False, recycle_time = 3600):
# ?? ?? ???
DBManager.__engine = create_engine(db_url,
pool_recycle = recycle_time,
echo = db_log_flag)
# DATABASE Not exist case
if not database_exists(DBManager.__engine.url):
# Creatre Database
create_database(DBManager.__engine.url)
DBManager.__engine = create_engine(db_url,
pool_recycle = recycle_time,
echo = db_log_flag)
DBManager.__session = scoped_session(sessionmaker(autocommit = False,
autoflush = False,
bind = DBManager.__engine))
# ?? ??? ??
global dao
dao = DBManager.__session
def write(self, if_exists:str=None):
def _create_database():
return self._engine.dialect.dbapi.create_database(
user=self._username, password=self._password, host=self._server, database=self.uuid,
page_size=self._pagesize
)
from sqlalchemy_utils import database_exists
from sqlalchemy import exc as dbexceptions
try:
if not database_exists(self._engine.url):
a = _create_database()
except dbexceptions.DatabaseError:
a = _create_database()
super().write(if_exists=if_exists)
def init():
"""Create database."""
click.gecho(f'Creating database {_db.engine.url}')
if not database_exists(str(_db.engine.url)):
create_database(str(_db.engine.url))
def handle(self, **options):
"""Create test databases for all db connections."""
# Create test db
for key in settings.DATABASES:
test_db = settings.DATABASES[key]
test_db['database'] = 'test_' + test_db['database']
engine = get_engine(key)
if not database_exists(engine.url):
create_database(engine.url)
create_tables(settings.INSTALLED_APPS, warn=False)
pytest.main(options['unknown'])
def create():
"""
Create database and all tables
"""
if not database_exists(db.engine.url):
app.logger.debug('Creating a fresh database to work with')
create_database(db.engine.url)
alembic_stamp()
db.create_all()
app.logger.debug('Database already exists, please drop the database and try again!')
def drop():
"""
Drop the database if it exists
:return:
"""
app.logger.debug('Dropping the database!')
if database_exists(db.engine.url):
drop_database(db.engine.url)
app.logger.error('Database does not exists!')
def createdb():
"""
Create DB
"""
if not database_exists(app.config['SQLALCHEMY_DATABASE_URI']):
create_database(app.config['SQLALCHEMY_DATABASE_URI'])
print("DB created.")
def db_connection(db_connection_string):
"""
Create one test database for all database tests.
"""
engine = create_engine(db_connection_string)
if not database_exists(engine.url):
create_database(engine.url)
connection = engine.connect()
yield connection
connection.close()
engine.dispose()
drop_database(engine.url)
def init_db(app):
app.db_engine = create_engine(
app.config['DATABASE_URL'], convert_unicode=True
)
if not database_exists(app.db_engine.url):
create_database(app.db_engine.url)
app.db_session = scoped_session(
sessionmaker(autocommit=False, autoflush=False, bind=app.db_engine)
)
Base.query = app.db_session.query_property()
init_query_logging(app)
def restore(config): # pragma: no cover
db_connect = config.conf['db_connect']
if not db_connect:
sys.exit("Error: database connection string not "
"found in any of the configuration files")
try:
engine = create_engine(db_connect)
except exc.NoSuchModuleError as e:
sys.exit("Error: %s" % str(e))
if database_exists(engine.url):
sys.exit("Error: database already exists, will not overwrite!")
try:
create_database(engine.url)
except exc.OperationalError as e:
sys.exit("Error: %s" % str(e))
models.Base.metadata.create_all(engine)
conn = engine.connect()
print("Restoring database from backup file: opp.db.tz")
code, out, err = utils.execute("tar zxf opp.db.tz", propagate=False)
if code != 0:
sys.exit("Error extracting database archive file: %s" % err)
for table in models.Base.metadata.sorted_tables:
with open("%s.pickle" % table, "rb") as f:
while True:
try:
ins = table.insert().values(load(f))
conn.execute(ins)
except EOFError:
break
files = ["%s.pickle" % x.name for x in models.Base.metadata.sorted_tables]
code, out, err = utils.execute("rm %s" % " ".join(files))
if code != 0:
print("Unable to remove .pickle files: %s" % err)
def setUpClass(self):
self.DBURI = "postgresql+psycopg2://@:5432/dbimport"
if database_exists(self.DBURI):
drop_database(self.DBURI)
create_database(self.DBURI)
engine = create_engine(self.DBURI)
models.bind_engine(engine)
self.assembly = "testAssembly"
self.workspace = "/test/dbimport/workspace"
def setUpClass(self):
self.DBURI = "postgresql+psycopg2://@:5432/dbimport"
if database_exists(self.DBURI):
drop_database(self.DBURI)
create_database(self.DBURI)
engine = create_engine(self.DBURI)
models.bind_engine(engine)
# all these function have been previously tested
with dbimport.DBImport(self.DBURI).getSession() as session:
self.referenceset = session.registerReferenceSet(
str(uuid.uuid4()), "testAssembly")
self.workspace = session.registerWorkspace(
str(uuid.uuid4()), "/test/dbimport/workspace")
self.array = session.registerDBArray(
str(uuid.uuid4()), self.referenceset.id, self.workspace.id, "test")
self.array2 = session.registerDBArray(
str(uuid.uuid4()), self.referenceset.id, self.workspace.id, "test2")
self.variantset = session.registerVariantSet(
str(uuid.uuid4()), self.referenceset.id, "Dataset")
self.variantset2 = session.registerVariantSet(
str(uuid.uuid4()), self.referenceset.id, "Dataset2")
self.variantset3 = session.registerVariantSet(
str(uuid.uuid4()), self.referenceset.id, "Dataset3")
self.variantset4 = session.registerVariantSet(
str(uuid.uuid4()), self.referenceset.id, "Dataset4")
self.individual = session.registerIndividual(
str(uuid.uuid4()), name="testIndividual")
self.source = session.registerSample(
str(uuid.uuid4()), self.individual.guid, name="source")
self.target = session.registerSample(
str(uuid.uuid4()), self.individual.guid, name="target")
def create_all():
from zou.app import db
engine = create_engine(get_db_uri())
if not database_exists(engine.url):
create_database(engine.url)
db.create_all()
def __init__(self, problem=None, database="featurehub"):
"""Create the ORMManager and connect to DB.
If problem name is given, load it.
Parameters
----------
problem : str, optional (default=None)
Name of problem
database : str, optional (default="featurehub")
Name of database within DBMS.
"""
self.__orm = ORMManager(database)
if not database_exists(self.__orm.engine.url):
print("database {} does not seem to exist.".format(database))
print("You might want to create it by calling set_up method")
elif problem:
try:
with self.__orm.session_scope() as session:
problem = session.query(Problem)\
.filter(Problem.name == problem).one()
self.__problemid = problem.id
except NoResultFound:
print("WARNING: Problem {} does not exist!".format(problem))
print("You might want to create it by calling create_problem"
" method")
def set_up(self, drop=False):
"""Create a new DB and create the initial scheme.
If the database exists and drop=True, the existing database is dropped
and recreated. Regardless, any tables defined by the schema that do not
exist are created.
Parameters
----------
drop : bool, optional (default=False)
Drop database if it already exists.
"""
# todo extract database name from engine url and report for brevity
engine = self.__orm.engine
if database_exists(engine.url):
print("Database {} already exists.".format(engine.url))
if drop:
print("Dropping old database {}".format(engine.url))
drop_database(engine.url)
with possibly_talking_action("Re-creating database..."):
create_database(engine.url)
else:
with possibly_talking_action("Creating database..."):
create_database(engine.url)
with possibly_talking_action("Creating tables..."):
Base.metadata.create_all(engine)
print("Database {} created successfully".format(engine.url))
def init(self):
if self._local_path:
dirname = os.path.dirname(self._local_path)
if dirname and not os.path.isdir(dirname):
os.makedirs(dirname)
if not database_exists(self.db.url):
create_database(self.db.url)
# More on connection strings for sqlalchemy:
# http://docs.sqlalchemy.org/en/latest/core/engines.html
self.metadata.bind = self.db
self.metadata.create_all()
def init_db():
"""
Create database if doesn't exist and
create all tables.
"""
if not database_exists(engine.url):
create_database(engine.url)
BaseModel.metadata.create_all(bind=engine)
def test_create_and_drop(self, dsn):
assert not database_exists(dsn)
create_database(dsn)
assert database_exists(dsn)
drop_database(dsn)
assert not database_exists(dsn)
def test_exists_memory(self, dsn):
assert database_exists(dsn)
def test_exists_memory_none_database(self, sqlite_none_database_dsn):
assert database_exists(sqlite_none_database_dsn)
def write(self, if_exists:str=None):
jp.java.lang.Class.forName("com.mysql.jdbc.Driver",
True, jp.java.lang.ClassLoader.getSystemClassLoader())
from sqlalchemy_utils import database_exists, create_database
if not database_exists(self._engine.url):
create_database(self._engine.url)
super().write(if_exists=if_exists)
def init_db():
if sqlalchemy_utils.database_exists(engine.url):
sqlalchemy_utils.drop_database(engine.url)
sqlalchemy_utils.create_database(engine.url)
print("DB ??? ??")
def init_db():
if sqlalchemy_utils.database_exists(engine.url):
sqlalchemy_utils.drop_database(engine.url)
sqlalchemy_utils.create_database(engine.url)
print("DB ??? ??")
def connect(self): # noqa
if not database_exists(self.engine.url):
create_database(self.engine.url)
self.session = sessionmaker(bind=self.engine)()
_Base.metadata.create_all(self.engine)
def connect(self): # noqa
if not database_exists(self.engine.url):
create_database(self.engine.url)
self.session = sessionmaker(bind=self.engine)()
_Base.metadata.create_all(self.engine)
def destroy_database(uri):
"""Destroy the database at ``uri``, if it exists. """
if database_exists(uri):
drop_database(uri)
def create_app(config_name):
global user_datastore
app = Flask(__name__)
app.config.from_object(app_config[config_name])
csrf = CSRFProtect()
csrf.init_app(app)
assets = Environment(app)
create_assets(assets)
via = Via()
via.init_app(app)
# Code for desmostration the flask upload in several models - - - -
from user import user_photo
from restaurant import restaurant_photo
from food import food_photo
configure_uploads(app, (restaurant_photo, food_photo, user_photo))
engine = create_engine(app.config['SQLALCHEMY_DATABASE_URI'])
if not database_exists(engine.url):
create_database(engine.url)
security = Security(app, user_datastore, register_form=SecurityRegisterForm)
create_security_admin(app=app, path=os.path.join(os.path.dirname(__file__)))
with app.app_context():
db.init_app(app)
db.create_all()
user_datastore.find_or_create_role(name='admin', description='Administrator')
db.session.commit()
user_datastore.find_or_create_role(name='end-user', description='End user')
db.session.commit()
@app.route('/', methods=['GET'])
@app.route('/home', methods=['GET'])
def index():
return render_template('index.html')
@app.errorhandler(403)
def forbidden(error):
return render_template('error/403.html', title='Forbidden'), 403
@app.errorhandler(404)
def page_not_found(error):
return render_template('error/404.html', title='Page Not Found'), 404
@app.errorhandler(500)
def internal_server_error(error):
db.session.rollback()
return render_template('error/500.html', title='Server Error'), 500
return app