一尘不染

为什么 SQL Merge (Upsert) 的相同代码在 Microsoft SQL Server 控制台中有效,但在 Python 中无效?

sql

我的主 Python 文件中有一个函数,它被 main() 调用并使用来自不同文件和函数的 pyodbc 执行 SQL Merge (Upsert) 语句。具体来说,SQL 语句通过不同的事务日期时间遍历具有事务详细信息的源表,并将客户合并到一个单独的目标表中。下面附上执行语句的函数和返回完成的 SQL 语句的函数。

当我运行我的 Python 脚本时,它没有按预期工作,并且仅将大约 70 行(有时是 69、71 或 72)插入到目标客户表中。但是,当我使用相同的 SQL 语句并在 Microsoft SQL Server Management Studio 控制台(附在下面)中执行它时,它可以正常工作并插入 4302 行(如预期的那样)。

我不确定出了什么问题..非常感谢任何帮助!

Python 主文件中的 SQL 语句执行器:

    def stage_to_dim(connection, cursor, now):
        log(f"Filling {cfg.dim_customer} and {cfg.dim_product}")
        try:
            cursor.execute(sql_statements.stage_to_dim_statement(now))
            connection.commit()
        except Exception as e:
            log(f"Error in stage_to_dim: {e}" )
            sys.exit(1) 
        log("Stage2Dimensions complete.")

Python中的SQL语句公式器:

    def stage_to_dim_statement(now):
        return f"""
        DECLARE @dates table(id INT IDENTITY(1,1), date DATETIME)
        INSERT INTO @dates (date)
            SELECT DISTINCT TransactionDateTime FROM {cfg.stage_table} ORDER BY TransactionDateTime;

        DECLARE @i INT;
        DECLARE @cnt INT;
        DECLARE @date DATETIME;

        SELECT @i = MIN(id) - 1, @cnt = MAX(id) FROM @dates;
        WHILE @i < @cnt
        BEGIN
            SET @i = @i + 1
            SET @date = (SELECT date FROM @dates WHERE id = @i)
            MERGE {cfg.dim_customer} AS Target
            USING (SELECT * FROM {cfg.stage_table} WHERE TransactionDateTime = @date) AS Source 
                ON Target.CustomerCodeNK = Source.CustomerID
            WHEN MATCHED THEN
                UPDATE SET Target.AquiredDate = Source.AcquisitionDate, Target.AquiredSource = Source.AcquisitionSource,
            Target.ZipCode = Source.Zipcode, Target.LoadDate = CONVERT(DATETIME, '{now}'), Target.LoadSource = '{cfg.ingest_file_path}'
            WHEN NOT MATCHED THEN
                INSERT (CustomerCodeNK, AquiredDate, AquiredSource, ZipCode, LoadDate, LoadSource) VALUES (Source.CustomerID, 
            Source.AcquisitionDate, Source.AcquisitionSource, Source.Zipcode, CONVERT(DATETIME,'{now}'), '{cfg.ingest_file_path}');
        END
    """

来自 MS SQL Server 控制台的 SQL 语句:

    DECLARE @dates table(id INT IDENTITY(1,1), date DATETIME)
    INSERT INTO @dates (date)
        SELECT DISTINCT TransactionDateTime FROM dbo.STG_CustomerTransactions ORDER BY TransactionDateTime;
    DECLARE @i INT;
    DECLARE @cnt INT;
    DECLARE @date DATETIME;
    SELECT @i = MIN(id) - 1, @cnt = MAX(id) FROM @dates;
    WHILE @i < @cnt
    BEGIN
        SET @i = @i + 1
        SET @date = (SELECT date FROM @dates WHERE id = @i)
        MERGE dbo.DIM_CustomerDup AS Target
        USING (SELECT * FROM dbo.STG_CustomerTransactions WHERE TransactionDateTime = @date) AS Source 
            ON Target.CustomerCodeNK = Source.CustomerID

        WHEN MATCHED THEN
           UPDATE SET Target.AquiredDate = Source.AcquisitionDate, Target.AquiredSource = Source.AcquisitionSource,
           Target.ZipCode = Source.Zipcode, Target.LoadDate = CONVERT(DATETIME,'6/30/2022 11:53:05'), Target.LoadSource = '../csv/cleaned_original_data.csv'

        WHEN NOT MATCHED THEN
            INSERT (CustomerCodeNK, AquiredDate, AquiredSource, ZipCode, LoadDate, LoadSource) VALUES (Source.CustomerID, Source.AcquisitionDate, 
            Source.AcquisitionSource, Source.Zipcode, CONVERT(DATETIME,'6/30/2022 11:53:05'), '../csv/cleaned_original_data.csv');
    END

阅读 98

收藏
2022-07-22

共1个答案

一尘不染

如果您仔细考虑最终结果的结果,您实际上只是为每个客户获取最新的行(按日期)。因此,您可以使用标准行号方法过滤源。

Python 代码不能正常工作的确切原因尚不清楚,但下面的查询可能会更好。你也在做 SQL 注入,这很危险,也可能导致正确性问题。

此外,您应该始终使用明确的日期格式。

MERGE dbo.DIM_CustomerDup AS t
USING (
    SELECT *
    FROM (
        SELECT *,
          rn = ROW_NUMBER() OVER (PARTITION BY s.CustomerID ORDER BY s.TransactionDateTime DESC)
        FROM dbo.STG_CustomerTransactions s
    ) AS s
    WHERE s.rn = 1
) AS s
ON t.CustomerCodeNK = s.CustomerID

WHEN MATCHED THEN
    UPDATE SET
      AquiredDate = s.AcquisitionDate,
      AquiredSource = s.AcquisitionSource,
      ZipCode = s.Zipcode,
      LoadDate = SYSDATETIME(),
      LoadSource = '../csv/cleaned_original_data.csv'

WHEN NOT MATCHED THEN
    INSERT (CustomerCodeNK, AquiredDate, AquiredSource, ZipCode, LoadDate, LoadSource)
    VALUES (s.CustomerID, s.AcquisitionDate, s.AcquisitionSource, s.Zipcode, SYSDATETIME(), '../csv/cleaned_original_data.csv')
;
2022-07-22