您的位置:首页 > 编程语言 > Python开发

基础入门_Python-模块和包.运维开发中watchdog事件监视的最佳实践?

2016-10-31 12:34 931 查看
简单介绍:
说明: 此模块是一个跨平台的PY库和SHELL工具,可以监视文件系统事件(增加/删除/修改)

快速安装:
pip install --upgrade watchdog
日志记录:
event_handler = LoggingEventHandler() -> event_handler
说明: 创建一个日志处理句柄,其实LoggingEventHandler是继承自FileSystemEventHandler类,只是重写了增删查改的回调函数,直接调用logging模块写到对应logging配置的目标
说明: 此模块为我们实现了一个watchdog.events.LoggingEventHandler类,可直接配合logging模块,可以简单记录增删查改
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/ # Purpose:
#
"""
# 说明: 导入公共模块
import time
import logging
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler
# 说明: 导入其它模块
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
logging.info('start watching.')
event_handler = LoggingEventHandler()
watcher = Observer()
watcher.schedule(event_handler=event_handler, path='.', recursive=True)
watcher.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt, e:
watcher.stop()
watcher.join()
说明: LoggingEventHandler直接调用logging.info写日志,而logging又是如此强大的支持线程安全的日志模块,所以可以有机的结合实现更加强大的功能,watchdog非常简单,首先from watchdog.observers import Observer导入Observer类,然后实例化后调用schedule只用传递三个参数,第一个参数就是实例处理句柄,第二个参数是要监控的地址,默认并不递归监控,只有指定recursive=True时才会递归检测,至于线程对象的start/stop/join什么意思我就不多说了~对了,上面的那个for循环主要是为了捕捉Ctrl+C异常,调用watch.stop()让线程正常退出~

回调处理:
event_handler = FileSystemEventHandler() -> event_handler
说明: 由于FileSystemEventHandler是基类,并没有具体实现on_any_event/on_created/on_deleted/on_modified/on_moved方法,所以通常并不会直接实例化作为事件处理对象,而是自定义一个类继承它然后去实现那些回调函数
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/ # Purpose:
#
"""
# 说明: 导入公共模块
import time
from watchdog.events import FileSystemEventHandler
# 说明: 导入其它模块
def check_modification(func):
def wrapper(self, event):
print u'''Event Statics:
事件类型: {}
是否目录: {}
文件路径: {}
'''.format(event.event_type, event.is_directory, event.src_path)
return wrapper
class CustomerHandler(FileSystemEventHandler):
@check_modification
def on_created(self, event):
pass
@check_modification
def on_deleted(self, event):
pass
@check_modification
def on_modified(self, event):
pass
@check_modification
def on_moved(self, event):
pass
def start_watching(event_handler, path='.', recursive=True):
from watchdog.observers import Observer
watcher = Observer()
watcher.schedule(event_handler=event_handler, path=path, recursive=recursive)
watcher.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt, e:
watcher.stop()
if __name__ == '__main__':
event_handler = CustomerHandler()
start_watching(event_handler=event_handler)
说明: 如上自定义一个继承自FileSystemEventHandler的类,并且实现了增删查该的方法,所有方法默认都有一个event参数,为事件对象,为了方便直接定义了个修饰器,输出event对象的三个常用的属性,event.event_type, event.is_directory, event.src_path

最佳实践:



1. 玩过FLASK的人都知道在DEBUG模式下,对PY文件的修改会自从重启整个程序,避免调试手动重启的麻烦,昨天刚好接到一个需求,希望写一个简单的插件系统,支持动态加载,我们都直到一旦程序启动再向插件目录增/删/查/改插件,程序是无感知的,为了实现类似FLASK重载效果,让插件式监控更加智能,于是学习了下FLASK源码实现,并自己手写了一个简化版的自动重载装饰器,直接看源码吧~
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/ # Purpose:
#
"""
# 说明: 导入公共模块
import os
import sys
import time
import threading
import subprocess
# 说明: 导入其它模块

# 说明: 基类监视
class BaseReloaderLoop(object):
def __init__(self, watch_files, interval):
self.watch_files = set(os.path.abspath(f) for f in watch_files)
self.interval = interval

def monitor(self):
pass

def rerun_with_autoreload(self):
while True:
envs = os.environ.copy()
args = [sys.executable] + sys.argv
envs.update({'APP_AUTORELOAD': 'True'})
# 说明: 阻塞版的POPEN
subprocess.call(args, env=envs)

# 说明: 默认监视
class StatReloaderLoop(BaseReloaderLoop):
def __init__(self, *args, **kwargs):
super(StatReloaderLoop, self).__init__(*args, **kwargs)

# 说明: WATCHDOG
class WatchdogReloaderLoop(BaseReloaderLoop):
def __init__(self, *args, **kwargs):
super(WatchdogReloaderLoop, self).__init__(*args, **kwargs)
self.scheduling_flag = True
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

def stop_scheduling():
self.scheduling_flag = False

class _EventHandler(FileSystemEventHandler):
def __init__(self):
super(_EventHandler, self).__init__()

def on_any_event(self, event):
stop_scheduling()

self.event_handler = _EventHandler()
self.monitor_class = Observer

def monitor(self):
watcher = self.monitor_class()
watcher.start()

while self.scheduling_flag:
for path in self.watch_files:
try:
watcher.schedule(event_handler=self.event_handler, path=path, recursive=True)
except (OSError, WindowsError), e:
# 说明: 异常处理
pass
time.sleep(self.interval)

reloader_loop = {
'stat': StatReloaderLoop,
'watchdog': WatchdogReloaderLoop,
}
try:
__import__('watchdog.observers')
except ImportError, e:
reloader_loop['auto'] = reloader_loop['status']
else:
reloader_loop['auto'] = reloader_loop['watchdog']

# 说明: 装饰函数
def run_with_autoreload(watch_files=None, interval=1, rtype='auto'):
"""Decorator for run with autoreloader.
:param watch_files: file path
:type watch_files: list
:param interval: check interval
:type interval: int
:param rtype: reload type
:type rtype: str
:return: None
:rtype: None
"""
def decorator(func):
def wrapper(*args, **kwargs):
reloader = reloader_loop[rtype](watch_files, interval)
isreload = os.environ.get('APP_AUTORELOAD', 'False')
if isreload == 'True':
cur_thread = threading.Thread(target=func, args=args, kwargs=kwargs)
cur_thread.setDaemon(True)
cur_thread.start()
reloader.monitor()
else:
reloader.rerun_with_autoreload()
return wrapper
return decorator
说明: 使用方法很简单直接在你的主程序入口函数上@run_with_autoreload(..., ..., ...)支持设置监视多个目录,设置监视间隔,说下整个思路吧,首先在当前环境获取APP_AUTORELOAD的值是否为True(默认其实都是没有配置也无需配置的),如果是则是由表示由子进程启动,否则调用rerun_with_autoreload(),这个方法主要是设置环境变量APP_AUTORELOAD为True且利用subprocess.Call在新的环境envs中调用我们命令行中输入的命令以子进程形式重新执行,子进程执行时由于APP_AUTORELOAD已经为True,所以会以开启一个线程执行我们修饰的主函数,但是注意,它设置了cur_thread.setDaemon(True)也就是说一旦子进程结束,它不管有没有执行完毕都会退出,而reloader.monitor()则担任了cur_thread.join()的阻塞作用,而内部watchdog监控到任何事件都会修改self.scheduling_flag的值,正好reloader.monitor()就依赖于此值进行循环的,所以一旦事件发送就停止循环,而此时reloader.rerun_with_autoreload()使得生成新的子进程接管整个应用,这样就达到了自动重载的功能

简单调用:
import time
from wrappers.autoreload import run_with_autoreload

@run_with_autoreload(watch_files=['./img', './css'], interval=1, rtype='auto')
def main():
while True:
print '=> {}'.format(time.time())
time.sleep(1)

if __name__ == '__main__':
print 'found notice: app start at {}.'.format(time.time())
main()
说明: 程序的入口函数大家按照自己的应用来,如上只是简单演示,我监视的是当前目录下的img/css目录,你可以尝试在当前目录建立img/css目录然后启动程序然后在img/css中添加/删除/修改文件,测试整个应用程序有没有重新加载~
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Python 基础入门