参考:
https://blog.csdn.net/z928727991/article/details/102744611
1、超时中断处理
在 default_args 参数中加入 ‘execution_timeout’: timedelta(hours=1) ,表示如果一小时还没有运行完,就置为失败
# encoding: utf-8
import airflow
import time
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from airflow.operators.dagrun_operator import TriggerDagRunOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'execution_timeout': timedelta(hours=1),
'start_date': airflow.utils.dates.days_ago(1),
2、流程中断retry
retries,retry_delay
retries 表示重试的次数,重试多少次后跳过此 task 。retry_delay 参数表示失败多久后进行重试,次数设置的是1分钟
de4c65baf941f8f2ed775339fd0c733a = BashOperator(
task_id ='de4c65baf941f8f2ed775339fd0c733a',
retries = 5,
retry_delay = timedelta(minutes=1),
bash_command =command,
params={"table_name":"test"},
trigger_rule='all_done',
dag=dag)
参考:
https://www.cnblogs.com/zmc940317/p/13293685.html
3、邮件预警
需要先设置:
airflow.cfg中设置Email SMTP服务器
args = {
'owner': 'airflow',
'start_date': start_date,
'email': ['[email protected]'],
'email_on_failure': True, #失败之后再次发送
'email_on_retry': True,
'depends_on_past': True,
}