抱歉,您的浏览器无法访问本站

本页面需要浏览器支持(启用)JavaScript


了解详情 >

blaire

👩🏻‍💻星洲小课堂 SinClass

image

hello world

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# -*- coding: utf-8 -*-

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import timedelta

#-------------------------------------------------------------------------------
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization

default_args = {
'owner': 'jifeng.si',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['779844881@qq.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'adhoc':False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'trigger_rule': u'all_success'
}

#-------------------------------------------------------------------------------
# dag

dag = DAG(
'example_hello_world_dag',
default_args=default_args,
description='my first DAG',
schedule_interval=timedelta(days=1))

#-------------------------------------------------------------------------------
# first operator

date_operator = BashOperator(
task_id='date_task',
bash_command='date',
dag=dag)

#-------------------------------------------------------------------------------
# second operator

sleep_operator = BashOperator(
task_id='sleep_task',
depends_on_past=False,
bash_command='sleep 5',
dag=dag)

#-------------------------------------------------------------------------------
# third operator

def print_hello():
return 'Hello world!'

hello_operator = PythonOperator(
task_id='hello_task',
python_callable=print_hello,
dag=dag)

#-------------------------------------------------------------------------------
# dependencies

sleep_operator.set_upstream(date_operator)
hello_operator.set_upstream(date_operator)

1
2
# ~/airflow/dags [19:59:51]
➜ python hello_world.py

airflow tasks list example_hello_world_dag

1
airflow tasks list example_hello_world_dag
1
2
3
4
5
6
7
8
➜ airflow tasks list example_hello_world_dag
[2021-01-24 20:00:02,162] {dagbag.py:440} INFO - Filling up the DagBag from /Users/blair/airflow/dags
[2021-01-24 20:00:02,200] {example_kubernetes_executor_config.py:174} WARNING - Could not import DAGs in example_kubernetes_executor_config.py: No module named 'kubernetes'
[2021-01-24 20:00:02,200] {example_kubernetes_executor_config.py:175} WARNING - Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']

date_task
hello_task
sleep_task

2.1 测试 date_task

1
➜ airflow tasks test example_hello_world_dag date_task 20210124

2.2 测试 hello_task

1
2
3
4
5
6
7
AIRFLOW_CTX_DAG_EMAIL=779844881@qq.com
AIRFLOW_CTX_DAG_OWNER=blair.chan
AIRFLOW_CTX_DAG_ID=example_hello_world_dag
AIRFLOW_CTX_TASK_ID=hello_task
AIRFLOW_CTX_EXECUTION_DATE=2021-01-24T00:00:00+00:00
[2021-01-24 20:05:03,473] {python.py:118} INFO - Done. Returned value was: Hello world!
[2021-01-24 20:05:03,480] {taskinstance.py:1142} INFO - Marking task as SUCCESS. dag_id=example_hello_world_dag, task_id=hello_task, execution_date=20210124T000000, start_date=20210124T120503, end_date=20210124T120503

2.3 测试 sleep_task

1
2
3
4
5
 /var/folders/58/kj1q13hs3j52hwvj3t2g2plh0000gp/T
[2021-01-24 20:04:30,923] {bash.py:158} INFO - Running command: sleep 5
[2021-01-24 20:04:30,929] {bash.py:169} INFO - Output:
[2021-01-24 20:04:35,942] {bash.py:177} INFO - Command exited with return code 0
[2021-01-24 20:04:35,972] {taskinstance.py:1142} INFO - Marking task as SUCCESS. dag_id=example_hello_world_dag, task_id=sleep_task, execution_date=20210124T000000, start_date=20210124T120430, end_date=20210124T120435

airflow如何设置Dag和Dag之间的依赖啊?

  1. 可以自己写一个脚本来检测父dag的状态,来达到dag之间的依赖
  2. ExternalTaskSensor 高级特性,监工

Reference

Comments