Airflow1.10.11 之运行 dag 的三种方式
2020-07-27 20:00
253 查看
1, 准备脚本
[root@do-airflow ~]# vi test.b.script.sh #!/bin/bash S_FILE="" S_DAY=$3 if [ -z $S_DAY ]; then S_DAY=`date '+%Y%m%d'` fi S_FILE="/root/$S_DAY.$1.$2.log" rm -f $S_FILE I=0 while true; do S_MSG=`date "+%Y-%m-%d %H:%M:%S"` echo $S_MSG echo $S_MSG >> $S_FILE ((I=I+1)) if [[ $I == 10 ]]; then break fi sleep 1 done [root@do-airflow ~]#
2, 准备 dag
[root@do-airflow ~]# vi /opt/airflow/dags/b_hello.py import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import timedelta default_args = { 'owner': 'dosrain', 'depends_on_past': False, 'start_date': airflow.utils.dates.days_ago(2) } dag = DAG( dag_id='b_hello', default_args=default_args, description='my first DAG', schedule_interval=None) # 一期汇聚 a1_operator = BashOperator( task_id='a1_task', bash_command='/root/test.c.script.sh a 1 {{ dag_run.conf["sday"] }}', dag=dag) [root@do-airflow ~]# python3 /opt/airflow/dags/b_hello.py [root@do-airflow ~]# airflow list_tasks b_hello [2020-07-24 17:05:49,937] {__init__.py:50} INFO - Using executor LocalExecutor [2020-07-24 17:05:49,939] {dagbag.py:396} INFO - Filling up the DagBag from /opt/airflow/dags a1_task [root@do-airflow ~]#
3, 触发 dag
3.1, Web UI 方式
3.2, 命令行方式
[root@do-airflow ~]# rm -f *.log # 启用 b_hello [root@do-airflow ~]# airflow unpause b_hello [2020-07-24 17:11:51,126] {__init__.py:50} INFO - Using executor LocalExecutor [2020-07-24 17:11:51,127] {dagbag.py:396} INFO - Filling up the DagBag from /opt/airflow/dags/b_hello.py Dag: b_hello, paused: False # 触发 b_hello,注意,是带参数的 [root@do-airflow ~]# airflow trigger_dag -c '{"sday":"20200401"}' b_hello [2020-07-24 17:14:47,195] {__init__.py:50} INFO - Using executor LocalExecutor [2020-07-24 17:14:47,197] {dagbag.py:396} INFO - Filling up the DagBag from /opt/airflow/dags/b_hello.py Created <DagRun b_hello @ 2020-07-24 17:14:47+08:00: manual__2020-07-24T17:14:47+08:00, externally triggered: True> # 查看结果文件 [root@do-airflow ~]# ll *.log -rw-r--r--. 1 root root 200 Jul 24 17:14 20200401.a.1.log [root@do-airflow ~]#
3.3, REST Api 方式
参考链接:
https://airflow.apache.org/docs/stable/security.html
https://airflow.apache.org/docs/stable/rest-api-ref.html
[root@do-airflow ~]# rm -f *.log # 修改后端认证方式 [root@do-airflow ~]# vi /opt/airflow/airflow.cfg auth_backend = airflow.api.auth.backend.default # 重启 airflow 的 web 服务 [root@do-airflow ~]# systemctl restart airflow-webserver # 发起测试 rest api 是否正常的 get 请求 [root@do-airflow ~]# curl http://192.168.109.131:8080/api/experimental/test {"status":"OK"} # 获取 b_hello 的执行历史 [root@do-airflow ~]# curl http://192.168.109.131:8080/api/experimental/dags/b_hello/dag_runs [{"dag_id":"b_hello","dag_run_url":"/admin/airflow/graph?dag_id=b_hello&execution_date=2020-07-24+17%3A14%3A47%2B08%3A00","execution_date":"2020-07-24T17:14:47+08:00","id":3,"run_id":"manual__2020-07-24T17:14:47+08:00","start_date":"2020-07-24T17:14:47.212520+08:00","state":"success"}] # 触发 b_hello [root@do-airflow ~]# curl -X POST \ http://192.168.109.131:8080/api/experimental/dags/b_hello/dag_runs \ -H 'Cache-Control: no-cache' \ -H 'Content-Type: application/json' \ -d '{"conf":"{\"sday\":\"20400101\"}"}' {"execution_date":"2020-07-24T17:26:33+08:00","message":"Created <DagRun b_hello @ 2020-07-24 17:26:33+08:00: manual__2020-07-24T17:26:33+08:00, externally triggered: True>","run_id":"manual__2020-07-24T17:26:33+08:00"} # 查看结果文件 [root@do-airflow ~]# ll *.log -rw-r--r--. 1 root root 200 Jul 24 17:26 20400101.a.1.log
相关文章推荐
- 全屏幕方式运行UE4打包程序的三种方法
- WordCount案例及MapReduce运行的三种方式
- mybatis generatorConfig.xml生成配置文件及三种运行方式
- 【Android实现程序前后台切换效果】(一)android后台运行时弹出正在运行通知的三种方式对比
- opencv中三种像素访问方式的运行速度比较
- linux上PHP三种运行方式
- 运行springBoot的三种方式
- 运行shell脚本的三种方式
- 命令行运行Python脚本时传入参数的三种方式详解
- Window下PHP三种运行方式图文详解
- linux运行命令的三种方式
- Mapreduce之WordCount的三种运行方式
- [转]Window下图文详解PHP三种运行方式
- python中程序运行计时的三种方式
- 让WPF窗体程序支持命令行方式运行的三种方式
- C/C++中算法运行时间的三种计算方式
- 运行applet的三种方式
- C/C++中算法运行时间的三种计算方式
- C/C++中算法运行时间的三种计算方式(By 虚怀若谷)
- hive 运行三种方式 之 remote metastore service