一尘不染

在dag之间使用单独的environ和sys.path

python

  • TLDR :该问题最初基于问题,后来被确定为该问题的标题更新。跳至“更新2”以获取最相关的问题详细信息。

有dag文件从另一个位置的另一个python文件中导入了dicts的python列表,并根据列表的dict值创建了dag,气流存在奇怪的问题,当我手动运行dag文件时,它看上去会有所不同。像…

...
environ["PROJECT_HOME"] = "/path/to/some/project/files"
# import certain project files
sys.path.append(environ["PROJECT_HOME"])
import tables as tt

tables = tt.tables

for table in tables:
    print table
    assert isinstance(table, dict)
    <create some dag task 1>
    <create some dag task 2>
    ...

~/airflow/dag/dir手动运行py文件时,没有引发任何错误,并且for循环将输出命令,但是气流显然会在Web服务器和运行时看到不同的内容airflow list_dags。运行airflow list_dags我得到错误

    assert isinstance(table, dict)
AssertionError

而且不知道如何测试这是什么原因,因为再次从dag位置手动运行py文件时,没有问题,并且print语句显示dicts,并且Web服务器UI也不显示其他错误消息。

有人知道这里会发生什么吗?也许关于进口如何运作?

  • 更新1

在从导入的python模块调用函数时,看到了更多的怪异之处,当手动运行dag文件时,一切运行正常,但是airflow list_dags说…

AttributeError:“模块”对象没有属性“ my_func”

使我更加怀疑某些导入异常,即使这与我在另一个dag文件中使用的完全相同的过程(即,设置一些environ值并附加到sys.path)为该dag导入模块也没有任何问题。

  • 更新2

问题似乎是(在错误的断言处打印各种sys.pathenvironmodule.__all__信息之后),
正在导入的名称相似的模块来自另一个 我执行了相同步骤的项目。就是 有另一个文件…

...
environ["PROJECT_HOME"] = "/path/to/some/project/files"
# import certain project files
sys.path.append(environ["PROJECT_HOME"])
import tables as tt

tables = tt.tables

for table in tables:
    print table
    assert isinstance(table, dict)
    <create some dag task 1>
    <create some dag task 2>
    ...

并且该项目主页正被用来下载一个类似命名的模块,该模块也具有一个obj,该obj的名称与我所期望的相同(即使在我将project文件夹插入之前sys.path)。除了制作打包的dag之外,还有一种方法可以防止气流组合不同dag的所有environsys.path值(因为我在各种bash和python任务脚本中使用$
PROJECT_HOME)?


阅读 190

收藏
2021-01-20

共1个答案

一尘不染

为了从其他路径引入特定模块,我在这里使用解决方案通过指定其他python模块的绝对文件路径来导入它们。

为了使用不同的python解释器将各种python脚本作为气流任务运行,我做了类似…

do_stuff_a = BashOperator(
        task_id='my_task_a',
        bash_command='/path/to/virtualenv_a/bin/python /path/to/script_a.py'),
        execution_timeout=timedelta(minutes=30),
        dag=dag)
2021-01-20