我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用alembic.context.get_current_revision()。
def get_current_revision(self): """Return the current revision, usually that which is present in the ``alembic_version`` table in the database. This method intends to be used only for a migration stream that does not contain unmerged branches in the target database; if there are multiple branches present, an exception is raised. The :meth:`.MigrationContext.get_current_heads` should be preferred over this method going forward in order to be compatible with branch migration support. If this :class:`.MigrationContext` was configured in "offline" mode, that is with ``as_sql=True``, the ``starting_rev`` parameter is returned instead, if any. """ heads = self.get_current_heads() if len(heads) == 0: return None elif len(heads) > 1: raise util.CommandError( "Version table '%s' has more than one head present; " "please use get_current_heads()" % self.version_table) else: return heads[0]
def get_current_revision(self): """Return the current revision, usually that which is present in the ``alembic_version`` table in the database. If this :class:`.MigrationContext` was configured in "offline" mode, that is with ``as_sql=True``, the ``starting_rev`` parameter is returned instead, if any. """ if self.as_sql: return self._start_from_rev else: if self._start_from_rev: raise util.CommandError( "Can't specify current_rev to context " "when using a database connection") self._version.create(self.connection, checkfirst=True) return self.connection.scalar(self._version.select())
def get_current_heads(self): """Return a tuple of the current 'head versions' that are represented in the target database. For a migration stream without branches, this will be a single value, synonymous with that of :meth:`.MigrationContext.get_current_revision`. However when multiple unmerged branches exist within the target database, the returned tuple will contain a value for each head. If this :class:`.MigrationContext` was configured in "offline" mode, that is with ``as_sql=True``, the ``starting_rev`` parameter is returned in a one-length tuple. If no version table is present, or if there are no revisions present, an empty tuple is returned. .. versionadded:: 0.7.0 """ if self.as_sql: start_from_rev = self._start_from_rev if start_from_rev is not None and self.script: start_from_rev = \ self.script.get_revision(start_from_rev).revision return util.to_tuple(start_from_rev, default=()) else: if self._start_from_rev: raise util.CommandError( "Can't specify current_rev to context " "when using a database connection") if not self._has_version_table(): return () return tuple( row[0] for row in self.connection.execute(self._version.select()) )
def get_current_heads(self): """Return a tuple of the current 'head versions' that are represented in the target database. For a migration stream without branches, this will be a single value, synonymous with that of :meth:`.MigrationContext.get_current_revision`. However when multiple unmerged branches exist within the target database, the returned tuple will contain a value for each head. If this :class:`.MigrationContext` was configured in "offline" mode, that is with ``as_sql=True``, the ``starting_rev`` parameter is returned in a one-length tuple. If no version table is present, or if there are no revisions present, an empty tuple is returned. .. versionadded:: 0.7.0 """ if self.as_sql: start_from_rev = self._start_from_rev if start_from_rev == 'base': start_from_rev = None elif start_from_rev is not None and self.script: start_from_rev = \ self.script.get_revision(start_from_rev).revision return util.to_tuple(start_from_rev, default=()) else: if self._start_from_rev: raise util.CommandError( "Can't specify current_rev to context " "when using a database connection") if not self._has_version_table(): return () return tuple( row[0] for row in self.connection.execute(self._version.select()) )
def run_migrations(self, **kw): """Run the migration scripts established for this :class:`.MigrationContext`, if any. The commands in :mod:`alembic.command` will set up a function that is ultimately passed to the :class:`.MigrationContext` as the ``fn`` argument. This function represents the "work" that will be done when :meth:`.MigrationContext.run_migrations` is called, typically from within the ``env.py`` script of the migration environment. The "work function" then provides an iterable of version callables and other version information which in the case of the ``upgrade`` or ``downgrade`` commands are the list of version scripts to invoke. Other commands yield nothing, in the case that a command wants to run some other operation against the database such as the ``current`` or ``stamp`` commands. :param \**kw: keyword arguments here will be passed to each migration callable, that is the ``upgrade()`` or ``downgrade()`` method within revision scripts. """ current_rev = rev = False self.impl.start_migrations() for change, prev_rev, rev, doc in self._migrations_fn( self.get_current_revision(), self): if current_rev is False: current_rev = prev_rev if self.as_sql and not current_rev: self._version.create(self.connection) if doc: log.info("Running %s %s -> %s, %s", change.__name__, prev_rev, rev, doc) else: log.info("Running %s %s -> %s", change.__name__, prev_rev, rev) if self.as_sql: self.impl.static_output( "-- Running %s %s -> %s" % (change.__name__, prev_rev, rev) ) change(**kw) if not self.impl.transactional_ddl: self._update_current_rev(prev_rev, rev) prev_rev = rev if rev is not False: if self.impl.transactional_ddl: self._update_current_rev(current_rev, rev) if self.as_sql and not rev: self._version.drop(self.connection)