我有所有运行的BigQuery连接器,但是我希望在Cloud Composer而不是App Engine Flexible上计划在Docker容器中有一些现有脚本。
我有以下脚本似乎遵循了我可以找到的示例:
import datetime from airflow import DAG from airflow import models from airflow.operators.docker_operator import DockerOperator yesterday = datetime.datetime.combine( datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()) default_args = { # Setting start date as yesterday starts the DAG immediately 'start_date': yesterday, # If a task fails, retry it once after waiting at least 5 minutes 'retries': 1, 'retry_delay': datetime.timedelta(minutes=5), } schedule_interval = '45 09 * * *' dag = DAG('xxx-merge', default_args=default_args, schedule_interval=schedule_interval) hfan = DockerOperator( task_id = 'hfan', image = 'gcr.io/yyyyy/xxxx' )
…但是当尝试运行时,它在网络用户界面中告诉我:
Broken DAG: [/home/airflow/gcs/dags/xxxx.py] No module named docker
也许Docker未配置为在Cloud Composer运行的Kubernetes集群内工作吗?还是我只是缺少语法中的某些内容?
我通过在composer的PyPI部分中安装docker-py == 1.10.6来解决它。
但是,要使DockerOperator正常工作,需要付出更多的努力,因为作曲家的工作人员无法访问Docker守护程序。转到GCP控制台并执行以下步骤;获取群集凭据后)。
kubectl get deployment airflow-worker -o yaml --export > airflow-worker- config.yaml
编辑airflow-worker-config.yaml(示例链接)以挂载docker.sock和docker,授予对airflow-worker的特权访问以运行docker命令
应用部署设置
kubectl apply -f airflow-worker-config.yaml