DMS Airflow:阿里云企业级数据工作流平台。整合DMS服务,实现高效调度、智能管理与安全监控,助力数据团队。
原文标题:DMS Airflow:企业级数据工作流编排平台的专业实践
原文作者:阿里云开发者
冷月清谈:
怜星夜思:
2、DMS Airflow强调与阿里云现有DMS服务的深度集成。对于已经使用其他第三方调度工具(比如Jenkins、Azkaban)的企业来说,迁移到DMS Airflow主要的驱动力会是什么?可能面临哪些技术或管理上的挑战?
3、文章里提到了数据感知调度(Dataset),能根据数据更新自动触发DAG。这个概念很好,但真实世界的数据管道往往非常复杂,涉及多个数据源和中间结果。如何高效地定义和管理这些Dataset,才能避免把“数据觉察”变成又一个维护噩梦?
原文内容
DMS Airflow 是基于 Apache Airflow 构建的企业级数据工作流编排平台,通过深度集成阿里云 DMS(Data Management Service)系统的各项能力,为数据团队提供了强大的工作流调度、监控和管理能力。本文将从 Airflow 的高级编排能力、DMS 集成的特殊能力,以及 DMS Airflow 的使用示例三个方面,全面介绍 DMS Airflow 的技术架构与实践应用。
一、Airflow 提供的高级编排能力
1.1 DAG(有向无环图)定义
Airflow 的核心是 DAG(Directed Acyclic Graph),它定义了任务之间的依赖关系和执行顺序。
核心特性:
-
Python 代码定义:DAG 以 Python 代码形式定义,支持版本控制和代码审查;
-
动态生成:支持根据配置或数据动态生成 DAG;
-
模板化:支持 Jinja2 模板,实现参数化配置;
示例:
from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from datetime import datetime, timedeltadefault_args = {
‘owner’: ‘data-team’,
‘depends_on_past’: False,
‘email_on_failure’: True,
‘email_on_retry’: False,
‘retries’: 3,
‘retry_delay’: timedelta(minutes=5)
}dag = DAG(
‘complex_etl_pipeline’,
default_args=default_args,
description=‘复杂ETL数据管道’,
schedule_interval=‘@daily’,
start_date=datetime(2024, 1, 1),
catchup=False,
tags=[‘etl’, ‘production’]
)定义任务
extract_task = BashOperator(
task_id=‘extract_data’,
bash_command=‘python /scripts/extract.py --date {{ ds }}’,
dag=dag
)transform_task = PythonOperator(
task_id=‘transform_data’,
python_callable=transform_function,
op_kwargs={‘date’: ‘{{ ds }}’},
dag=dag
)load_task = BashOperator(
task_id=‘load_data’,
bash_command=‘python /scripts/load.py --date {{ ds }}’,
dag=dag
)定义依赖关系
extract_task >> transform_task >> load_task
1.2 任务依赖管理
Airflow 提供了灵活的任务依赖管理机制,支持复杂的任务编排场景。
依赖操作符:
-
>> 和 <<:设置任务执行顺序
-
set_upstream() 和 set_downstream():显式设置上下游关系
-
cross_downstream():批量设置下游依赖
-
chain():链式依赖设置
复杂依赖示例:
from airflow.models import DAG from airflow.operators.dummy import DummyOperator from airflow.utils.helpers import chain, cross_downstream分支任务
branch_task = DummyOperator(task_id=‘branch’, dag=dag)
并行任务组
task_a = DummyOperator(task_id=‘task_a’, dag=dag)
task_b = DummyOperator(task_id=‘task_b’, dag=dag)
task_c = DummyOperator(task_id=‘task_c’, dag=dag)合并任务
merge_task = DummyOperator(task_id=‘merge’, dag=dag)
设置依赖:branch -> [task_a, task_b, task_c] -> merge
branch_task >> [task_a, task_b, task_c] >> merge_task
使用 chain 函数
chain(
extract_task,
[transform_task_1, transform_task_2],
load_task
)
1.3 调度和时间触发
Airflow 提供了强大的调度功能,支持多种时间触发方式。
调度类型:
-
Cron 表达式:schedule_interval='0 0 *'(每天零点执行)
-
预设值:@daily、@hourly、@weekly 等
-
时间间隔:timedelta(hours=2)(每2小时执行)
-
None:手动触发,不自动调度
时间模板变量:
-
{{ ds }}:执行日期(YYYY-MM-DD)
-
{{ ds_nodash }}:执行日期(YYYYMMDD)
-
{{ ts }}:执行时间戳
-
{{ yesterday_ds }}:前一天日期
-
{{ next_ds }}:下一次执行日期
示例:
dag = DAG( 'scheduled_pipeline', schedule_interval='0 */6 * * *', # 每6小时执行一次 start_date=datetime(2024, 1, 1), catchup=True, # 补跑历史数据 max_active_runs=1 # 最多同时运行1个实例 )
task = PythonOperator(
task_id=‘process_data’,
python_callable=process_function,
op_kwargs={
‘execution_date’: ‘{{ ds }}’,
‘next_execution_date’: ‘{{ next_ds }}’
},
dag=dag
)
1.4 任务状态管理
Airflow 提供了完善的任务状态管理机制,支持任务重试、失败处理和状态转换。
任务状态:
-
None:未调度
-
Scheduled:已调度,等待执行
-
Queued:已排队,等待资源
-
Running:正在执行
-
Success:执行成功
-
Failed:执行失败
-
Skipped:跳过执行
-
Retry:重试中
-
Up for retry:等待重试
重试机制:
task = PythonOperator(
task_id='unreliable_task',
python_callable=unreliable_function,
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True, # 指数退避
max_retry_delay=timedelta(hours=1),
dag=dag
)
1.5 数据感知调度(Dataset)
Airflow 2.4+ 引入了 Dataset 概念,支持基于数据可用性的调度。
核心概念:
-
Dataset:表示数据的抽象概念
-
Dataset Producer:产生数据的任务
-
Dataset Consumer:消费数据的任务
-
调度触发:当 Dataset 更新时,自动触发依赖的 DAG
示例:
from airflow import Dataset from airflow.operators.python import PythonOperator定义 Dataset
raw_data = Dataset(“s3://bucket/raw-data/”)
processed_data = Dataset(“s3://bucket/processed-data/”)Producer 任务
produce_task = PythonOperator(
task_id=‘produce_data’,
outlets=[raw_data], # 标记产生的数据集
python_callable=produce_function,
dag=dag
)Consumer 任务
consume_task = PythonOperator(
task_id=‘consume_data’,
inlets=[raw_data], # 依赖的数据集
outlets=[processed_data],
python_callable=consume_function,
dag=another_dag # 可以跨 DAG
)
1.6 动态任务生成
Airflow 支持在运行时动态生成任务,实现灵活的编排逻辑。
应用场景:
-
根据配置文件生成任务
-
根据数据库查询结果生成任务
-
根据文件列表生成处理任务
示例:
def generate_tasks(): """根据配置动态生成任务""" configs = [ {'table': 'users', 'database': 'db1'}, {'table': 'orders', 'database': 'db1'}, {'table': 'products', 'database': 'db2'} ] tasks = [] for config in configs: task = PythonOperator( task_id=f"process_{config['table']}", python_callable=process_table, op_kwargs=config, dag=dag ) tasks.append(task) return tasks动态生成的任务
dynamic_tasks = generate_tasks()
1.7 任务组和子 DAG
Airflow 支持任务组(TaskGroup)和子 DAG(SubDAG),用于组织复杂的任务结构。
TaskGroup 示例:
from airflow.utils.task_group import TaskGroupwith TaskGroup(‘etl_group’) as etl_group:
extract_task = BashOperator(task_id=‘extract’, …)
transform_task = PythonOperator(task_id=‘transform’, …)
load_task = BashOperator(task_id=‘load’, …)
extract_task >> transform_task >> load_taskTaskGroup 可以像普通任务一样使用
start_task >> etl_group >> end_task
1.8 XCom 数据传递
Airflow 的 XCom(Cross-Communication)机制支持任务间数据传递。
使用示例:
def extract_function(**context): data = {'records': 1000, 'size': '10MB'} return datadef transform_function(**context):
# 获取上游任务的数据
ti = context[‘ti’]
data = ti.xcom_pull(task_ids=‘extract’)
records = data[‘records’]
# 处理数据
processed = records * 2
return processedextract_task = PythonOperator(
task_id=‘extract’,
python_callable=extract_function,
dag=dag
)transform_task = PythonOperator(
task_id=‘transform’,
python_callable=transform_function,
dag=dag
)
extract_task >> transform_task
二、DMS 集成的 Airflow 特殊能力
2.1 与 DMS 系统的深度集成
2.1.1 统一认证与授权
DMS Airflow 通过 DmsAuthManager 实现了与 DMS UC Center 的统一认证,用户无需单独管理 Airflow 账号,直接使用 DMS 账号登录。
核心优势:
-
单点登录:一次登录,全平台访问
-
权限统一:权限管理与 DMS 系统保持一致
-
角色映射:自动映射 DMS 角色到 Airflow 角色(Public、Viewer、User、Operator、Admin)
2.1.2 DMS 服务集成
DMS Airflow 通过内部代理机制,实现了与 DMS 各种服务的无缝集成。
集成服务:
-
DMS Enterprise API:SQL 执行、任务管理
-
AnalyticDB API:Spark 任务提交、资源管理
-
DTS API:数据同步任务控制
-
Notebook API:Notebook 资源管理
-
UC Center:用户认证和权限管理
2.2 企业级通知能力
DMS Airflow 提供了三种通知方式,满足不同场景的告警需求。
2.2.1 多通道通知
DMS Notification:
-
直接集成到 DMS 系统通知中心
-
支持任务状态、错误信息、执行结果等
-
与 DMS 工作流系统联动
SLS Notification:
-
集中式日志管理
-
支持日志查询和分析
-
可与日志分析工具集成
CloudMonitor Notification:
-
实时监控指标
-
支持自定义告警规则
-
与云监控告警系统集成
2.3 智能资源管理
2.3.1 自动扩缩容服务
DMS Airflow 的自动扩缩容服务基于任务负载动态调整 Worker 数量,实现资源的智能化管理。
核心特性:
-
负载监控:实时监控队列中等待和执行的任务数量;
-
智能计算:根据任务数量和 Worker 并发度计算目标 Worker 数;
-
平滑处理:使用滑动窗口和 Kalman 滤波算法平滑负载波动;
-
边界约束:支持最小和最大 Worker 数量限制;
-
K8s 集成:通过 API 调用调整 Kubernetes 副本数;
配置示例:
# airflow.cfg
[scale]
queue_length = 15 # 滑动窗口长度
worker_num_min = 2 # 最小 Worker 数
worker_num_max = 20 # 最大 Worker 数
polling_interval = 30 # 轮询间隔(秒)
2.3.2 资源组管理
DMS Airflow 支持 AnalyticDB 的资源组管理,可以指定任务在特定的资源组中执行,实现资源隔离和优先级控制。
资源组类型:
-
Interactive 资源组:交互式查询,低延迟;
-
Batch 资源组:批处理任务,高吞吐;
-
Warehouse 资源组:数据仓库查询;
2.4 DAG 动态刷新
DMS Airflow 提供了 DAG 刷新插件(dags_refresh_plugin),支持通过 API 触发 DAG 文件重新加载,无需重启 Airflow 服务。
核心特性:
-
API 触发:通过 HTTP API 触发刷新;
-
安全认证:基于 POP 签名算法的安全认证;
-
批量刷新:支持批量刷新多个 DAG;
使用场景:
-
代码更新后快速生效
-
配置变更后立即应用
-
开发调试时的快速迭代
2.5 日志优化
DMS Airflow 实现了日志栈过滤(no_stack_filter),自动过滤异常堆栈信息,使日志更加简洁易读。
优势:
-
减少日志体积
-
提高日志可读性
-
加快日志传输速度
2.6 实例名称到 Cluster ID 映射
DMS Airflow 支持通过 DMS 实例名称(dblink)自动解析 AnalyticDB Cluster ID,简化配置管理。
使用场景:
# 方式1:直接使用 cluster_id spark_task = DMSAnalyticDBSparkSqlOperator( task_id='spark_task', cluster_id='adb-cluster-001', resource_group='interactive-spark', sql='SELECT * FROM table', dag=dag )方式2:使用 instance 名称(自动解析)
spark_task = DMSAnalyticDBSparkSqlOperator(
task_id=‘spark_task’,
instance=‘production-adb-dblink’, # DMS 中的 dblink 名称
resource_group=‘interactive-spark’,
sql=‘SELECT * FROM table’,
dag=dag
)
2.7 企业级监控与可观测性
DMS Airflow 集成了多种监控和可观测性工具,提供全方位的任务执行监控。
监控维度:
-
任务执行监控:任务状态、执行时间、重试次数;
-
资源使用监控:Worker 数量、队列长度、资源组使用率;
-
业务指标监控:通过 CloudMonitor 发送自定义业务指标;
-
日志分析:通过 SLS 进行集中日志管理和分析;
2.8 安全特性
DMS Airflow 实现了多层安全机制,确保系统安全可靠。
安全机制:
-
POP 签名认证:API 调用使用 POP 签名算法验证;
-
Token 管理:自动刷新 DMS Token,保证长期任务的稳定性;
-
权限控制:基于角色的细粒度权限控制;
-
连接加密:所有 API 调用通过加密通道传输。
三、DMS Airflow 使用示例
3.1 SQL 任务执行示例
DMSSqlOperator 用于执行 DMS SQL 任务,支持异步执行和状态监控。
核心特性:
-
异步执行,避免长时间阻塞
-
自动轮询任务状态
-
支持多条 SQL 语句顺序执行
-
支持任务完成回调
使用示例:
from airflow import DAG from airflow.providers.alibaba_dms.cloud.operators.dms_sql import DMSSqlOperator from datetime import datetimedag = DAG(
‘dms_sql_example’,
default_args={‘start_date’: datetime(2024, 1, 1)},
schedule_interval=‘@daily’
)
sql_task = DMSSqlOperator(
task_id=‘execute_sql’,
instance=‘production_db’,
database=‘analytics’,
sql=‘’’
SELECT COUNT(*) as total_records
FROM user_behavior_log
WHERE date = ‘{{ ds }}’
‘’',
polling_interval=10,
callback=lambda result: print(f"SQL执行完成: {result}"),
dag=dag
)
3.2 Spark 计算任务示例
DMSAnalyticDBSparkOperator 用于执行 AnalyticDB MySQL 3.0 Data Lakehouse 的 Spark 任务,支持两种资源组类型:Job 资源组和 Warehouse 资源组。
核心特性:
-
支持 SparkWarehouse 和传统 Spark Job 两种执行引擎
-
自动识别资源组类型
-
支持 Spark 配置参数自定义
-
自动获取 Spark Web UI 地址
-
支持执行时间限制
使用示例:
from airflow import DAG from airflow.providers.alibaba_dms.cloud.operators.dms_analyticdb_spark import( DMSAnalyticDBSparkSqlOperator, DMSAnalyticDBSparkOperator ) from datetime import datetimedag = DAG(
‘spark_analysis_example’,
default_args={‘start_date’: datetime(2024, 1, 1)},
schedule_interval=‘@daily’
)Spark SQL 执行(Warehouse模式)
spark_sql_task = DMSAnalyticDBSparkSqlOperator(
task_id=‘spark_sql_analysis’,
cluster_id=‘adb-cluster-001’,
resource_group=‘interactive-spark’,
sql=‘’’
SELECT
user_id,
COUNT(*) as action_count,
SUM(amount) as total_amount
FROM user_events
WHERE date = ‘{{ ds }}’
GROUP BY user_id
‘’',
schema=‘analytics’,
conf={‘spark.sql.shuffle.partitions’: 200},
execute_time_limit_in_seconds=3600,
dag=dag
)Spark Job 执行(传统模式)
spark_job_task = DMSAnalyticDBSparkOperator(
task_id=‘spark_batch_job’,
cluster_id=‘adb-cluster-001’,
resource_group=‘batch-job’,
sql=‘your_spark_sql_here’,
app_type=‘SQL’,
app_name=‘daily_etl_job’,
dag=dag
)
3.3 数据同步任务示例
DTSLakeInjectionOperator 用于控制阿里云 DTS(Data Transmission Service)数据同步任务,支持数据库到数据湖的同步场景。
核心特性:
-
自动构建 DTS 任务
-
实时监控同步任务状态
-
自动处理预检查失败场景
使用示例:
from airflow import DAG from airflow.providers.alibaba_dms.cloud.operators.dms_dts import DTSLakeInjectionOperator from datetime import datetimedag = DAG(
‘dts_sync_example’,
default_args={‘start_date’: datetime(2024, 1, 1)},
schedule_interval=‘@daily’
)
dts_task = DTSLakeInjectionOperator(
task_id=‘sync_to_data_lake’,
source_instance=‘source_rds’,
source_database=‘production_db’,
target_instance=‘target_oss’,
bucket_name=‘data-lake-bucket’,
reserve={
‘table_filter’: [‘user_', 'order_’],
‘sync_mode’: ‘full’
},
db_list={
‘include’: [‘analytics’, ‘reporting’]
},
polling_interval=10,
dag=dag
)
3.4 Notebook 任务执行示例
DMSNotebookOperator 支持执行 Jupyter Notebook 文件,适合数据科学和机器学习工作流。
核心特性:
-
自动创建或获取 Notebook 实例
-
支持运行时参数注入
-
实时获取任务执行进度
-
支持任务超时配置
-
自动获取并输出 Notebook 执行日志
使用示例:
from airflow import DAG from airflow.providers.alibaba_dms.cloud.operators.dms_notebook import DMSNotebookOperator from datetime import datetimedag = DAG(
‘notebook_example’,
default_args={‘start_date’: datetime(2024, 1, 1)},
schedule_interval=‘@daily’
)
notebook_task = DMSNotebookOperator(
task_id=‘run_ml_training’,
file_path=‘notebooks/model_training.ipynb’,
profile_name=‘ml-profile’,
cluster_name=‘ml-cluster’,
cluster_type=‘spark’,
spec=‘large’,
runtime_name=‘python3.9’,
run_params={
‘training_date’: ‘{{ ds }}’,
‘model_version’: ‘v2.0’
},
timeout=7200,
polling_interval=10,
dag=dag
)
3.5 通知器使用示例
DMS Airflow 提供了三种通知器,满足不同场景的告警需求。
3.5.1 基础通知示例
from airflow import DAG from airflow.providers.alibaba_dms.cloud.notifications.sls_notification import SLSNotifier from airflow.providers.alibaba_dms.cloud.notifications.cloudmonitor_notification import CloudMonitorNotifier from datetime import datetime定义通知回调
def notify_on_failure(context):
# SLS 通知
sls_notifier = SLSNotifier(
sls_conn_id=‘sls_default’,
project=‘airflow-logs’,
logstore=‘task-alerts’,
success=False,
message=f"Task {context[‘task_instance’].task_id} failed"
)
sls_notifier.notify(context)
# CloudMonitor 通知
cms_notifier = CloudMonitorNotifier(
cms_conn_id=‘cms_default’,
region=‘cn-hangzhou’,
metric_name=‘TaskFailure’,
event_name=‘TaskFailedEvent’,
success=False,
message=f"Task {context[‘task_instance’].task_id} failed"
)
cms_notifier.notify(context)
dag = DAG(
‘example_with_notifications’,
default_args={
‘start_date’: datetime(2024, 1, 1),
‘on_failure_callback’: notify_on_failure
},
schedule_interval=‘@daily’
)
3.6 完整 ETL 工作流示例
以下是一个完整的 ETL 工作流示例,展示了如何组合使用多个 DMS Airflow 操作器:
from airflow import DAG from airflow.providers.alibaba_dms.cloud.operators.dms_sql import DMSSqlOperator from airflow.providers.alibaba_dms.cloud.operators.dms_analyticdb_spark import DMSAnalyticDBSparkSqlOperator from airflow.providers.alibaba_dms.cloud.operators.dms_dts import DTSLakeInjectionOperator from airflow.providers.alibaba_dms.cloud.notifications.sls_notification import SLSNotifier from datetime import datetime, timedeltadefault_args = {
‘owner’: ‘data-team’,
‘depends_on_past’: False,
‘email_on_failure’: True,
‘retries’: 2,
‘retry_delay’: timedelta(minutes=5),
‘on_failure_callback’: lambda context: SLSNotifier(
project=‘airflow-alerts’,
logstore=‘task-failures’,
success=False,
message=f"DAG {context[‘dag’].dag_id} failed"
).notify(context)
}dag = DAG(
‘complete_etl_pipeline’,
default_args=default_args,
description=‘完整ETL数据管道’,
schedule_interval=‘@daily’,
start_date=datetime(2024, 1, 1),
catchup=False,
tags=[‘etl’, ‘production’]
)步骤1:数据同步(从源库同步到数据湖)
sync_task = DTSLakeInjectionOperator(
task_id=‘sync_source_data’,
source_instance=‘production_rds’,
source_database=‘production_db’,
target_instance=‘data_lake_oss’,
bucket_name=‘raw-data-bucket’,
reserve={
‘table_filter’: [‘user_', 'order_’],
‘sync_mode’: ‘incremental’
},
polling_interval=10,
dag=dag
)步骤2:执行 SQL 验证数据
validate_task = DMSSqlOperator(
task_id=‘validate_data’,
instance=‘analytics_db’,
database=‘staging’,
sql=‘’’
SELECT
COUNT(*) as total_records,
COUNT(DISTINCT user_id) as unique_users
FROM raw_user_data
WHERE date = ‘{{ ds }}’
‘’',
polling_interval=10,
dag=dag
)步骤3:Spark 数据处理和分析
spark_transform_task = DMSAnalyticDBSparkSqlOperator(
task_id=‘spark_data_transform’,
cluster_id=‘adb-cluster-001’,
resource_group=‘batch-processing’,
sql=‘’’
INSERT INTO analytics.user_daily_summary
SELECT
user_id,
date,
COUNT(*) as event_count,
SUM(amount) as total_amount,
AVG(amount) as avg_amount
FROM staging.raw_user_data
WHERE date = ‘{{ ds }}’
GROUP BY user_id, date
‘’',
schema=‘analytics’,
conf={‘spark.sql.shuffle.partitions’: 200},
execute_time_limit_in_seconds=3600,
dag=dag
)步骤4:生成报表
report_task = DMSSqlOperator(
task_id=‘generate_report’,
instance=‘analytics_db’,
database=‘analytics’,
sql=‘’’
INSERT INTO daily_reports
SELECT
date,
COUNT(DISTINCT user_id) as daily_active_users,
SUM(total_amount) as daily_revenue
FROM user_daily_summary
WHERE date = ‘{{ ds }}’
GROUP BY date
‘’',
polling_interval=10,
dag=dag
)定义依赖关系
sync_task >> validate_task >> spark_transform_task >> report_task
四、总结
DMS Airflow 作为企业级数据工作流编排平台,通过深度集成 DMS 系统的各项能力,为数据团队提供了强大的工作流调度、监控和管理能力。
核心优势总结:
适用场景:
-
数据 ETL 工作流
-
数据分析和报表生成
-
机器学习模型训练和部署
-
数据同步和迁移
-
定时任务调度
DMS Airflow 将持续演进,为数据团队提供更加高效、稳定、易用的工作流编排能力。
附录:相关资源
欢迎点击阅读原文查看 DMS Airflow 文档:https://help.aliyun.com/zh/dms/create-and-manage-an-airflow-instance
Apache Airflow 官方文档:https://airflow.apache.org/docs/
欢迎钉钉加入DMS数据管理用户交流5群(群号:67215001618)。
