您的位置:首页 > 大数据 > 人工智能

Airflow1.10.11 之运行 dag 的三种方式

2020-07-27 20:00 253 查看

1, 准备脚本

[root@do-airflow ~]# vi test.b.script.sh
#!/bin/bash

S_FILE=""

S_DAY=$3
if [ -z $S_DAY ]; then
S_DAY=`date '+%Y%m%d'`
fi

S_FILE="/root/$S_DAY.$1.$2.log"

rm -f $S_FILE

I=0
while true; do
S_MSG=`date "+%Y-%m-%d %H:%M:%S"`
echo $S_MSG
echo $S_MSG >> $S_FILE
((I=I+1))
if [[ $I == 10 ]]; then
break
fi
sleep 1

done
[root@do-airflow ~]#

2, 准备 dag

[root@do-airflow ~]# vi /opt/airflow/dags/b_hello.py
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta

default_args = {
'owner': 'dosrain',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
dag_id='b_hello',
default_args=default_args,
description='my first DAG',
schedule_interval=None)

# 一期汇聚
a1_operator = BashOperator(
task_id='a1_task',
bash_command='/root/test.c.script.sh a 1 {{ dag_run.conf["sday"] }}',
dag=dag)

[root@do-airflow ~]# python3 /opt/airflow/dags/b_hello.py
[root@do-airflow ~]# airflow list_tasks b_hello
[2020-07-24 17:05:49,937] {__init__.py:50} INFO - Using executor LocalExecutor
[2020-07-24 17:05:49,939] {dagbag.py:396} INFO - Filling up the DagBag from /opt/airflow/dags
a1_task
[root@do-airflow ~]#

3, 触发 dag

3.1, Web UI 方式




3.2, 命令行方式

[root@do-airflow ~]# rm -f *.log

# 启用 b_hello
[root@do-airflow ~]# airflow unpause b_hello
[2020-07-24 17:11:51,126] {__init__.py:50} INFO - Using executor LocalExecutor
[2020-07-24 17:11:51,127] {dagbag.py:396} INFO - Filling up the DagBag from /opt/airflow/dags/b_hello.py
Dag: b_hello, paused: False

# 触发 b_hello,注意,是带参数的
[root@do-airflow ~]# airflow trigger_dag -c '{"sday":"20200401"}' b_hello
[2020-07-24 17:14:47,195] {__init__.py:50} INFO - Using executor LocalExecutor
[2020-07-24 17:14:47,197] {dagbag.py:396} INFO - Filling up the DagBag from /opt/airflow/dags/b_hello.py
Created <DagRun b_hello @ 2020-07-24 17:14:47+08:00: manual__2020-07-24T17:14:47+08:00, externally triggered: True>

# 查看结果文件
[root@do-airflow ~]# ll *.log
-rw-r--r--. 1 root root 200 Jul 24 17:14 20200401.a.1.log
[root@do-airflow ~]#

3.3, REST Api 方式

参考链接:
https://airflow.apache.org/docs/stable/security.html
https://airflow.apache.org/docs/stable/rest-api-ref.html

[root@do-airflow ~]# rm -f *.log

# 修改后端认证方式
[root@do-airflow ~]# vi /opt/airflow/airflow.cfg
auth_backend = airflow.api.auth.backend.default

# 重启 airflow 的 web 服务
[root@do-airflow ~]# systemctl restart airflow-webserver

# 发起测试 rest api 是否正常的 get 请求
[root@do-airflow ~]# curl http://192.168.109.131:8080/api/experimental/test
{"status":"OK"}

# 获取 b_hello 的执行历史
[root@do-airflow ~]# curl http://192.168.109.131:8080/api/experimental/dags/b_hello/dag_runs
[{"dag_id":"b_hello","dag_run_url":"/admin/airflow/graph?dag_id=b_hello&execution_date=2020-07-24+17%3A14%3A47%2B08%3A00","execution_date":"2020-07-24T17:14:47+08:00","id":3,"run_id":"manual__2020-07-24T17:14:47+08:00","start_date":"2020-07-24T17:14:47.212520+08:00","state":"success"}]

# 触发 b_hello
[root@do-airflow ~]# curl -X POST \
http://192.168.109.131:8080/api/experimental/dags/b_hello/dag_runs \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-d '{"conf":"{\"sday\":\"20400101\"}"}'
{"execution_date":"2020-07-24T17:26:33+08:00","message":"Created <DagRun b_hello @ 2020-07-24 17:26:33+08:00: manual__2020-07-24T17:26:33+08:00, externally triggered: True>","run_id":"manual__2020-07-24T17:26:33+08:00"}

# 查看结果文件
[root@do-airflow ~]# ll *.log
-rw-r--r--. 1 root root 200 Jul 24 17:26 20400101.a.1.log
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: