一尘不染

使用sqlalchemy orm从查询创建临时表

sql

我可以这样创建一个临时表:

session.execute("CREATE TABLE temptable SELECT existingtable.id, "
    "existingtable.column2 FROM existingtable WHERE existingtable.id<100000")

但是新表不可读,因为它说它没有主键。 existingtable.id是exisitingtable的主键,因此我希望它在temp表中得到相同的处理。

但是,无论如何,我宁愿找到某种ORM方式来执行此操作。鉴于:

temp_table = Table('temptable', metadata, 
    Column('id', Integer, primary_key=True),
    Column('column2', Integer),
    useexisting=True )
class TempTable(object):
    pass
mapper(TempTable, temp_table)
temp_table.create(bind=session.bind, checkfirst=True)
if session.query(TempTable).delete(): #make sure it's empty
    session.commit()

如何temp_tableexistingtable不执行100000session.query.add(TempTable(...))命令的情况下填充一些选定的内容?还是有一种方法可以通过类似于上面的普通SQL版本的查询来创建表?


阅读 224

收藏
2021-05-05

共1个答案

一尘不染

它不完全是ORM,但为了最初创建表,我将克隆表结构(请参见cloneTable下面的示例)。为了复制数据,我将使用InsertFromSelect示例

编辑: 自版本0.8.3起,SqlAlchemy支持
开箱即用的Insert.from_select()
。因此,下面的示例中的InsertFromSelect类和相应的访问者可以直接替换,不再需要。由于历史原因,我将原始示例保持不变。

这是一个有效的例子

from sqlalchemy import Table
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import UpdateBase

class InsertFromSelect(UpdateBase):
    def __init__(self, table, select):
        self.table = table
        self.select = select

@compiles(InsertFromSelect)
def visit_insert_from_select(element, compiler, **kw):
    return "INSERT INTO %s %s" % (
        compiler.process(element.table, asfrom=True),
        compiler.process(element.select)
    )

def cloneTable(name, table, metadata):
    cols = [c.copy() for c in table.columns]
    constraints = [c.copy() for c in table.constraints]
    return Table(name, metadata, *(cols + constraints))

# test data
from sqlalchemy import MetaData, Column, Integer
from sqlalchemy.engine import create_engine
e = create_engine('sqlite://')
m = MetaData(e)
t = Table('t', m, Column('id', Integer, primary_key=True),
          Column('number', Integer))
t.create()
e.execute(t.insert().values(id=1, number=3))
e.execute(t.insert().values(id=9, number=-3))

# create temp table
temp = cloneTable('temp', t, m)
temp.create()

# copy data
ins = InsertFromSelect(temp, t.select().where(t.c.id>5))
e.execute(ins)

# print result
for r in e.execute(temp.select()):
    print(r)
2021-05-05