本文先讲述笔者DockerOperator的试错过程,后面的KubernetesPodOperator才是最便捷的方式
1. Connections页面报错问题
当我点击Admin -> Connections时遇到下面报错
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/connection.py", line 238, in get_password
return fernet.decrypt(bytes(self._password, 'utf-8')).decode()
File "/home/airflow/.local/lib/python3.7/site-packages/cryptography/fernet.py", line 194, in decrypt
raise InvalidToken
cryptography.fernet.InvalidToken
问题原因:重新执行install.sh时候我们发现有该命令,由于每次重新部署fernet-key都会变,导致之前加密的密码无法正确解密
[root@sha-216 airflow-plus]# echo Fernet Key: $(kubectl get secret --namespace airflow-plus airflow-fernet-key -o jsonpath="{.data.fernet-key}" | base64 --decode)
Fernet Key: ekc3MHl6VmSSSNnY0pjSSSSZk94WkVxl5T3E2OUtXT0s=
解决方式:我们在install.sh中加入固定秘钥,这样不会每次都重新生成
--set webserverSecretKey=ekc3MHl6VmSSSNnY0pjSSSSZk94WkVxl5T3E2OUtXT0s=
--set fernetKey=ekc3MHl6VmNnY0pjZk94WkVxRVRmUkl5T3E2OUtXT0s=
然后我们找到连接的pgsql库,airflow-plus数据库中表connection, 备份之前手工新增的配置,以备重新配置, 然后清空该表,不用担心之前系统数据会丢失,接着我们在执行install.sh即可
然后我们把手工配置的connection在配置一遍即可,这样后面重新卸载install也不会出现该问题
2. 在Connection中配置Dokcer连接配置
该配置指定Conn ID为docker_repo【后面会用到】,指定私有仓库地址及用户名密码
3. 编写镜像执行DAG
from airflow.providers.docker.operators.docker import DockerOperator
import pendulum
from airflow import DAG
with DAG(
dag_id="comm_add_dag",
schedule_interval=None,
start_date=pendulum.datetime(2023, 5, 8, tz="Asia/Shanghai"),
catchup=True,
description="小区自动化入库"
) as dag:
t1 = DockerOperator(
# 刚才定义的conn id
docker_conn_id="docker_repo",
# WebServer中点击Admin -> Providers -> Docker对应的Version,如果是auto,可能会报错
api_version="2.2.0",
image='comm-add:1.0.1',
# 临沂为我们的传参,python代码中, 镜像中部分代码
# @ click.command()
# @ click.option('--city', default="all", help='要计算的城市')
command='python main.py --city {
{dag_run.conf["city"] if "city" in dag_run.conf}} else all',
task_id='comm_add'
)
t1
4. 传参运行
输入json串,点击运行Trigger即可
5. 报错及解决
requests.exceptions.ConnectionError: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))
[2023-05-10, 01:46:33 UTC] {local_task_job.py:154} INFO - Task exited with return code 1
[2023-05-10, 01:46:33 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
我们修改之前说到的values.yaml中worker的配置挂载docker.sock
workers:
resources:
#limits:
# cpu: 4000m
# memory: 12288Mi
requests:
cpu: 100m
memory: 1024Mi
extraVolumes:
- name: dockersock
hostPath:
path: /var/run/docker.sock
extraVolumeMounts:
- name: dockersock
mountPath: /var/run/docker.sock
挂载后重新install后出现权限问题
File "/home/airflow/.local/lib/python3.7/site-packages/docker/transport/unixconn.py", line 30, in connect
sock.connect(self.unix_socket)
PermissionError: [Errno 13] Permission denied
折腾了许久,暂时无招,有经验的伙伴可以讨论下
6. 基于KubernetesPodOperator运行Docker
这种方式很便捷,分分钟搞定!!!!!
其中image_pull_secrets是K8S中配置的Secret,提供私有仓库镜像访问授权,在此不在详解
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
import pendulum
from airflow import DAG
from docker.types import Mount
from kubernetes.client import models as k8s
with DAG(
dag_id="comm_add_dag",
schedule_interval=None,
start_date=pendulum.datetime(2023, 5, 8, tz="Asia/Shanghai"),
catchup=True,
description="自动化入库"
) as dag:
production_task = KubernetesPodOperator(namespace='airflow-plus',
image="comm-add:1.0.1",
image_pull_secrets=[k8s.V1LocalObjectReference("nexus-registry-secret")],
cmds=["python", "main.py"],
arguments=["--city", '{
{dag_run.conf["city"] if "city" in dag_run.conf}} else all'],
name="comm_add",
task_id="comm_add",
get_logs=True
)
production_task
欢迎关注公众号算法小生