我的主 Python 文件中有一个函数,它被 main() 调用并使用来自不同文件和函数的 pyodbc 执行 SQL Merge (Upsert) 语句。具体来说,SQL 语句通过不同的事务日期时间遍历具有事务详细信息的源表,并将客户合并到一个单独的目标表中。下面附上执行语句的函数和返回完成的 SQL 语句的函数。
当我运行我的 Python 脚本时,它没有按预期工作,并且仅将大约 70 行(有时是 69、71 或 72)插入到目标客户表中。但是,当我使用相同的 SQL 语句并在 Microsoft SQL Server Management Studio 控制台(附在下面)中执行它时,它可以正常工作并插入 4302 行(如预期的那样)。
我不确定出了什么问题..非常感谢任何帮助!
Python 主文件中的 SQL 语句执行器:
def stage_to_dim(connection, cursor, now): log(f"Filling {cfg.dim_customer} and {cfg.dim_product}") try: cursor.execute(sql_statements.stage_to_dim_statement(now)) connection.commit() except Exception as e: log(f"Error in stage_to_dim: {e}" ) sys.exit(1) log("Stage2Dimensions complete.")
Python中的SQL语句公式器:
def stage_to_dim_statement(now): return f""" DECLARE @dates table(id INT IDENTITY(1,1), date DATETIME) INSERT INTO @dates (date) SELECT DISTINCT TransactionDateTime FROM {cfg.stage_table} ORDER BY TransactionDateTime; DECLARE @i INT; DECLARE @cnt INT; DECLARE @date DATETIME; SELECT @i = MIN(id) - 1, @cnt = MAX(id) FROM @dates; WHILE @i < @cnt BEGIN SET @i = @i + 1 SET @date = (SELECT date FROM @dates WHERE id = @i) MERGE {cfg.dim_customer} AS Target USING (SELECT * FROM {cfg.stage_table} WHERE TransactionDateTime = @date) AS Source ON Target.CustomerCodeNK = Source.CustomerID WHEN MATCHED THEN UPDATE SET Target.AquiredDate = Source.AcquisitionDate, Target.AquiredSource = Source.AcquisitionSource, Target.ZipCode = Source.Zipcode, Target.LoadDate = CONVERT(DATETIME, '{now}'), Target.LoadSource = '{cfg.ingest_file_path}' WHEN NOT MATCHED THEN INSERT (CustomerCodeNK, AquiredDate, AquiredSource, ZipCode, LoadDate, LoadSource) VALUES (Source.CustomerID, Source.AcquisitionDate, Source.AcquisitionSource, Source.Zipcode, CONVERT(DATETIME,'{now}'), '{cfg.ingest_file_path}'); END """
来自 MS SQL Server 控制台的 SQL 语句:
DECLARE @dates table(id INT IDENTITY(1,1), date DATETIME) INSERT INTO @dates (date) SELECT DISTINCT TransactionDateTime FROM dbo.STG_CustomerTransactions ORDER BY TransactionDateTime; DECLARE @i INT; DECLARE @cnt INT; DECLARE @date DATETIME; SELECT @i = MIN(id) - 1, @cnt = MAX(id) FROM @dates; WHILE @i < @cnt BEGIN SET @i = @i + 1 SET @date = (SELECT date FROM @dates WHERE id = @i) MERGE dbo.DIM_CustomerDup AS Target USING (SELECT * FROM dbo.STG_CustomerTransactions WHERE TransactionDateTime = @date) AS Source ON Target.CustomerCodeNK = Source.CustomerID WHEN MATCHED THEN UPDATE SET Target.AquiredDate = Source.AcquisitionDate, Target.AquiredSource = Source.AcquisitionSource, Target.ZipCode = Source.Zipcode, Target.LoadDate = CONVERT(DATETIME,'6/30/2022 11:53:05'), Target.LoadSource = '../csv/cleaned_original_data.csv' WHEN NOT MATCHED THEN INSERT (CustomerCodeNK, AquiredDate, AquiredSource, ZipCode, LoadDate, LoadSource) VALUES (Source.CustomerID, Source.AcquisitionDate, Source.AcquisitionSource, Source.Zipcode, CONVERT(DATETIME,'6/30/2022 11:53:05'), '../csv/cleaned_original_data.csv'); END
如果您仔细考虑最终结果的结果,您实际上只是为每个客户获取最新的行(按日期)。因此,您可以使用标准行号方法过滤源。
Python 代码不能正常工作的确切原因尚不清楚,但下面的查询可能会更好。你也在做 SQL 注入,这很危险,也可能导致正确性问题。
此外,您应该始终使用明确的日期格式。
MERGE dbo.DIM_CustomerDup AS t USING ( SELECT * FROM ( SELECT *, rn = ROW_NUMBER() OVER (PARTITION BY s.CustomerID ORDER BY s.TransactionDateTime DESC) FROM dbo.STG_CustomerTransactions s ) AS s WHERE s.rn = 1 ) AS s ON t.CustomerCodeNK = s.CustomerID WHEN MATCHED THEN UPDATE SET AquiredDate = s.AcquisitionDate, AquiredSource = s.AcquisitionSource, ZipCode = s.Zipcode, LoadDate = SYSDATETIME(), LoadSource = '../csv/cleaned_original_data.csv' WHEN NOT MATCHED THEN INSERT (CustomerCodeNK, AquiredDate, AquiredSource, ZipCode, LoadDate, LoadSource) VALUES (s.CustomerID, s.AcquisitionDate, s.AcquisitionSource, s.Zipcode, SYSDATETIME(), '../csv/cleaned_original_data.csv') ;