有dag文件从另一个位置的另一个python文件中导入了dicts的python列表,并根据列表的dict值创建了dag,气流存在奇怪的问题,当我手动运行dag文件时,它看上去会有所不同。像…
... environ["PROJECT_HOME"] = "/path/to/some/project/files" # import certain project files sys.path.append(environ["PROJECT_HOME"]) import tables as tt tables = tt.tables for table in tables: print table assert isinstance(table, dict) <create some dag task 1> <create some dag task 2> ...
从~/airflow/dag/dir手动运行py文件时,没有引发任何错误,并且for循环将输出命令,但是气流显然会在Web服务器和运行时看到不同的内容airflow list_dags。运行airflow list_dags我得到错误
~/airflow/dag/
airflow list_dags
assert isinstance(table, dict) AssertionError
而且不知道如何测试这是什么原因,因为再次从dag位置手动运行py文件时,没有问题,并且print语句显示dicts,并且Web服务器UI也不显示其他错误消息。
有人知道这里会发生什么吗?也许关于进口如何运作?
在从导入的python模块调用函数时,看到了更多的怪异之处,当手动运行dag文件时,一切运行正常,但是airflow list_dags说…
AttributeError:“模块”对象没有属性“ my_func”
使我更加怀疑某些导入异常,即使这与我在另一个dag文件中使用的完全相同的过程(即,设置一些environ值并附加到sys.path)为该dag导入模块也没有任何问题。
environ
sys.path
问题似乎是(在错误的断言处打印各种sys.path,environ和module.__all__信息之后), 正在导入的名称相似的模块来自另一个 我执行了相同步骤的项目。就是 有另一个文件…
module.__all__
并且该项目主页正被用来下载一个类似命名的模块,该模块也具有一个obj,该obj的名称与我所期望的相同(即使在我将project文件夹插入之前sys.path)。除了制作打包的dag之外,还有一种方法可以防止气流组合不同dag的所有environ和sys.path值(因为我在各种bash和python任务脚本中使用$ PROJECT_HOME)?
为了从其他路径引入特定模块,我在这里使用解决方案通过指定其他python模块的绝对文件路径来导入它们。
为了使用不同的python解释器将各种python脚本作为气流任务运行,我做了类似…
do_stuff_a = BashOperator( task_id='my_task_a', bash_command='/path/to/virtualenv_a/bin/python /path/to/script_a.py'), execution_timeout=timedelta(minutes=30), dag=dag)