一尘不染

如何在自定义的编译表达式中使用bindparam()?

python

我的代码基于@zzzeeek对这个问题)的回答。我对其进行了扩展,因此考虑了PostgreSQL的NULL和ARRAY。

class values(FromClause):
    named_with_column = True

    def __init__(self, columns, *args, **kw):
        self._column_args = columns
        self.list = args
        self.alias_name = self.name = kw.pop('alias_name', None)

    def _populate_column_collection(self):
        # self._columns.update((col.name, col) for col in self._column_args)
        for c in self._column_args:
        c._make_proxy(self, c.name)


@compiles(values)
def compile_values(element, compiler, asfrom=False, **kw):
    columns = element.columns
    v = "VALUES %s" % ", ".join(
    "(%s)" % ", ".join(
        ((compiler.visit_array(elem)+'::'+str(column.type)) if isinstance(column.type, ARRAY) else
         compiler.render_literal_value(elem, column.type))
        if elem is not None else compiler.render_literal_value(elem, NULLTYPE)
        for elem, column in zip(tup, columns))
    for tup in element.list
    )
    if asfrom:
        if element.alias_name:
            v = "(%s) AS %s (%s)" % (v, element.alias_name, (", ".join(c.name for c in element.columns)))
        else:
            v = "(%s)" % v
    return v

一切工作正常,直到结果证明我无法在此VALUES子句中插入带有“%”符号的值-它们插入到结果语句中,这似乎导致绑定问题

我想如果不是render_literal_value()我们使用的话,我们bindparam()可以避免这样的错误。但是下面的所有内容都@compiles应该返回纯文本,对吗?我如何修改它以获得基于参数的查询?


阅读 139

收藏
2021-01-20

共1个答案

一尘不染

我知道了。bindparams的实际值保存在一个称为的对象中,该对象SQLCompiler通常是特定于方言的。
这里(链接到GitHub)是在SQLCompiler查询编译过程中将bindparams存储在实例中的位置

因此,我的代码段的最终版本如下所示:

class values(FromClause):
    named_with_column = True

    def __init__(self, columns, *args, **kw):
        self._column_args = columns
        self.list = args
        self.alias_name = self.name = kw.pop('alias_name', None)

    def _populate_column_collection(self):
        # self._columns.update((col.name, col) for col in self._column_args)
        for c in self._column_args:
            c._make_proxy(self, c.name)


@compiles(values)
def compile_values(clause, compiler, asfrom=False, **kw):
    def decide(value, column):
        add_type_hint = False
        if isinstance(value, array) and not value.clauses:  # for empty array literals
            add_type_hint = True

        if isinstance(value, ClauseElement):
            intermediate = compiler.process(value)
            if add_type_hint:
                intermediate += '::' + str(column.type)
            return intermediate

        elif value is None:
            return compiler.render_literal_value(
                value,
                NULLTYPE
            ) + '::' + str(column.type)
        else:
            return compiler.process(
                bindparam(
                    None,
                    value=compiler.render_literal_value(
                        value,
                        column.type
                    ).strip("'")
                )
            ) + '::' + str(column.type)

    columns = clause.columns
    v = "VALUES %s" % ", ".join(
        "(%s)" % ", ".join(
            decide(elem, column)
            for elem, column in zip(tup, columns))
        for tup in clause.list
    )
    if asfrom:
        if clause.alias_name:
            v = "(%s) AS %s (%s)" % (v, clause.alias_name, (", ".join(c.name for c in clause.columns)))
        else:
            v = "(%s)" % v
    return v
2021-01-20