一尘不染

Kafka JDBC Connect查询导致ORA-00933:SQL命令未正确结束

sql

我有这个Oracle SQL查询:

SELECT * FROM 
    (SELECT SO_ORDER_KEY,QUEUE_TYPE,SYS_NO,
    DENSE_RANK() OVER (PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) ORDER_RANK 
    FROM TSY940) 
WHERE ORDER_RANK=1;

在SQL Developer中运行时,它将返回所需的结果。

由于某些原因,当我在kafka-connect-jdbc属性中使用此查询时,得到

ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='null', query='SELECT * FROM (SELECT SO_ORDER_KEY,QUEUE_TYPE,SYS_NO,DENSE_RANK() OVER (PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) ORDER_RANK FROM TSY940) WHERE ORDER_RANK=1', topicPrefix='TSY940', timestampColumn='SYS_NO', incrementingColumn='null'}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:247)
java.sql.SQLSyntaxErrorException: ORA-00933: SQL command not properly ended

        at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:450)
        at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:399)
        at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1059)
        at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:522)
        at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:257)
        at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:587)
        at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:225)
        at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:53)
        at oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:774)
        at oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:925)
        at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1111)
        at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:4798)
        at oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:4845)
        at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeQuery(OraclePreparedStatementWrapper.java:1501)
        at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:201)
        at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
        at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
        at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:179)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

这是我的属性文件:

name=poc-oracle-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.password = ********
connection.url = jdbc:oracle:thin:@***.***.***.**:****/******
connection.user = ***********
table.types=TABLE
query=SELECT * FROM (SELECT SO_ORDER_KEY,QUEUE_TYPE,SYS_NO,DENSE_RANK() OVER (PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) ORDER_RANK FROM TSY940) WHERE ORDER_RANK=1
mode=timestamp
timestamp.column.name=SYS_NO
topic.prefix=TSY940
batch.max.rows = 500
poll.interval.ms=60000

transforms=createKey,extract
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=SO_ORDER_KEY
transforms.extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extract.field=SO_ORDER_KEY

我使用ojdbc7驱动程序。

WHERE子句似乎是个问题,因为当我将query属性替换为时,没有出现异常

query=SELECT * FROM (SELECT SO_ORDER_KEY,QUEUE_TYPE,SYS_NO,DENSE_RANK() OVER (PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) ORDER_RANK FROM TSY940)

阅读 263

收藏
2021-05-16

共1个答案

一尘不染

然后,您可以尝试执行此查询,从而完全消除 ORDER_RANK

SELECT SO_ORDER_KEY, QUEUE_TYPE, SYS_NO
FROM (
 SELECT SO_ORDER_KEY, QUEUE_TYPE, SYS_NO 
 FROM (SELECT SO_ORDER_KEY, QUEUE_TYPE, SYS_NO,
      DENSE_RANK() OVER(PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) AS ORDER_RANK 
      FROM TSY940) sub
 WHERE sub.ORDER_RANK=1
)

查看导致问题的原因的最佳方法是启用10046跟踪,并查看发送到数据库且导致该问题的确切查询ORA-00933

2021-05-16