python调用ansible接口API执行命令
2018-03-01 13:21
633 查看
python版本:Python 2.6.6
ansible版本:ansible 2.3.1.0
调用脚本: task_exec_v1.py
附playbook脚本:test_ping.yml
python3.5调用ansible参考文档:
https://www.cnblogs.com/stones/p/8252731.html
ansible版本:ansible 2.3.1.0
调用脚本: task_exec_v1.py
#!/usr/bin/env python #coding:utf-8 import os import sys import json import logging from collections import namedtuple from ansible.inventory import Inventory from ansible.vars import VariableManager from ansible.parsing.dataloader import DataLoader from ansible.executor.playbook_executor import PlaybookExecutor from ansible.plugins.callback import CallbackBase from ansible.errors import AnsibleParserError from optparse import OptionParser #定义打印日志 logging.basicConfig(filename='task_exec_v3.log', #通过logging.basicConfig函数对日志的输出格式及方式做相关配置 format='%(asctime)s - %(name)s - %(levelname)s -%(module)s: %(message)s',datefmt='%Y-%m-%d %H:%M:%S %p', level=10) class MyCallback(CallbackBase): #这里是状态回调,各种成功失败的状态,里面的各种方法其实都是从写于CallbackBase父类里面的,其实还有很多,可以根据需要拿出来用 def __init__(self,*args): super(MyCallback,self).__init__(display=None) self.status_ok=json.dumps({}) self.status_fail=json.dumps({}) self.status_unreachable=json.dumps({}) self.status_playbook='' self.status_no_hosts=False self.host_ok = {} self.host_failed={} self.host_unreachable={} def v2_runner_on_ok(self,result): host=result._host.get_name() self.runner_on_ok(host, result._result) #self.status_ok=json.dumps({host:result._result},indent=4) self.host_ok[host] = result def v2_runner_on_failed(self, result, ignore_errors=False): host = result._host.get_name() self.runner_on_failed(host, result._result, ignore_errors) #self.status_fail=json.dumps({host:result._result},indent=4) self.host_failed[host] = result def v2_runner_on_unreachable(self, result): host = result._host.get_name() self.runner_on_unreachable(host, result._result) #self.status_unreachable=json.dumps({host:result._result},indent=4) self.host_unreachable[host] = result def v2_playbook_on_no_hosts_matched(self): self.playbook_on_no_hosts_matched() self.status_no_hosts=True def v2_playbook_on_play_start(self, play): self.playbook_on_play_start(play.name) self.playbook_path=play.name class My_ansible_play(): #这里是ansible运行 #初始化各项参数,大部分都定义好,只有几个参数是必须要传入的 def __init__(self, playbook, extra_vars={}, host_list='/etc/ansible/hosts', connection='ssh', become=False, become_user=None, module_path=None, fork=50, ansible_cfg=None, #os.environ["ANSIBLE_CONFIG"] = None passwords={}, check=False): self.playbook_path=playbook self.passwords=passwords self.extra_vars=extra_vars Options = namedtuple('Options', ['listtags', 'listtasks', 'listhosts', 'syntax', 'connection','module_path', 'forks', 'private_key_file', 'ssh_common_args', 'ssh_extra_args', 'sftp_extra_args', 'scp_extra_args', 'become', 'become_method', 'become_user', 'verbosity', 'check']) logging.info(Options) self.options = Options(listtags=False, listtasks=False, listhosts=False, syntax=False, connection=connection, module_path=module_path, forks=fork, private_key_file=None, ssh_common_args=None, ssh_extra_args=None, sftp_extra_args=None, scp_extra_args=None, become=become, become_method=None, become_user=become_user, verbosity=None, check=check) logging.info(self.options) if ansible_cfg != None: os.environ["ANSIBLE_CONFIG"] = ansible_cfg self.variable_manager=VariableManager() self.variable_manager.extra_vars=self.extra_vars self.loader=DataLoader() self.inventory=Inventory(loader=self.loader,variable_manager=self.variable_manager,host_list=host_list) #定义运行的方法和返回值 def run(self): complex_msg={} if not os.path.exists(self.playbook_path): code=1000 results={'playbook':self.playbook_path,'msg':self.playbook_path+' playbook is not exist','flag':False} logging.info(results) #results=self.playbook_path+'playbook is not existed' #return code,complex_msg,results pbex= PlaybookExecutor(playbooks=[self.playbook_path], inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.options, passwords=self.passwords) self.results_callback=MyCallback() pbex._tqm._stdout_callback=self.results_callback try: code=pbex.run() except AnsibleParserError: code=1001 results={'playbook':self.playbook_path,'msg':self.playbook_path+' playbook have syntax error','flag':False} #results='syntax error in '+self.playbook_path #语法错误 return code,results if self.results_callback.status_no_hosts: code=1002 results={'playbook':self.playbook_path,'msg':self.results_callback.status_no_hosts,'flag':False,'executed':False} return code,results def get_result(self): self.result_all={'success':{},'fail':{},'unreachable':{}} for host, result in self.results_callback.host_ok.items(): self.result_all['success'][host] = result._result for host, result in self.results_callback.host_failed.items(): if result._result.has_key("msg"): self.result_all['fail'][host] = result._result['msg'] for host, result in self.results_callback.host_unreachable.items(): self.result_all['unreachable'][host]= result._result['msg'] for i in self.result_all['success'].keys(): print i,self.result_all['success'][i] print self.result_all['fail'] print self.result_all['unreachable'] if __name__ =='__main__': play_book=My_ansible_play('/etc/ansible/playbooks/test_ping.yml') logging.info(play_book) play_book.run() play_book.get_result()
附playbook脚本:test_ping.yml
--- - hosts: test gather_facts: False tasks: - name: test ping ping: - name: shell commond shell: echo "hello world" register: result - name: show debug info debug: var=result.stdout verbosity=0
python3.5调用ansible参考文档:
https://www.cnblogs.com/stones/p/8252731.html
相关文章推荐
- Python3.5 调用Ansible 执行命令
- Python调用Ansible 2.0 API执行playbook
- python 调用系统命令,执行命令行
- python调用zabbix api接口实时展示数据 推荐
- eoLinker-API_Shop_短信服务接口-调用示例代码,支持PHP、Python、Java等语言
- python的subprocess:子程序调用(调用执行其他命令);获取子程序脚本当前路径问题
- Python调用shell命令的几种方法(在新进程中执行shell命令)
- python的subprocess:子程序调用(调用执行其他命令);获取子程序脚本当前路径问题
- ansible (2.4.2.0) API python调用重写 | 适用于 web
- python调用zabbix api接口实时展示数据
- Python如何调用新浪api接口的问题
- Python3调用face++免费API接口识别身份证信息
- python中执行linux命令(调用linux命令)
- 关于python调用zabbix api接口的自动化实例 [结合saltstack] 推荐
- Python调用zabbix api 接口
- Python 调用os执行类似linux/dos常见命令
- Python——cmd调用(os.system阻塞处理)(多条命令执行)
- python调用os.system执行系统命令,中文输出显示乱码
- SWIG之为C/C++的API生成Python调用接口基础
- Python调用REST API接口的几种方式 推荐