您的位置:首页 > 运维架构

运维开发中__import__动态导入最佳实践?

2017-02-28 00:00 603 查看
常规导入:

1
2
3
import module_name[,module1,...]

from module_name import [*|child[,child1,...]

from module_name import [*|child[,child1,...]as alias_name

注意:导入语句可出现在程序任意位置,自定义包要实现from module_name import *的效果则此模块必须在__init__.py实现__all__ = ['module_1', 'module_2']

加载一次:

说明:多次重复使用import语句时,不会重新加载模块,而是把该模块的内存地址给引用到本地环境变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
==> x.py <==

#!/usr/bin/env python

# -*- coding:utf-8 -*-

"""

#

# Authors:limanman

# 51CTOBG:http://xmdevops.blog.51cto.com/[/code] 
# Purpose:

#

"""

# 说明:导入公共模块

import os

# 说明:导入其它模块

print 'os in x.py', id(os)

==> y.py <==

#!/usr/bin/env python

# -*- coding:utf-8 -*-

"""

#

# Authors:limanman

# 51CTOBG:http://xmdevops.blog.51cto.com/[/code] 
# Purpose:

#

"""

# 说明:导入公共模块

import x

import os

# 说明:导入其它模块

if __name__== '__main__':

print 'os in y.py', id(os)

import x

重新加载:

说明:对已经加载的模块进行重新加载,一般用于原模块有变化等特殊情况,reload前该模块必须已经import过,但是需要注意的是已经使用的实例还会使用旧模块,而新产生的实例才会使用新模块,reload之后还是原来的内存地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/env python

# -*- coding:utf-8 -*-

"""

#

# Authors:limanman

# 51CTOBG:http://xmdevops.blog.51cto.com/[/code] 
# Purpose:

#

"""

# 说明:导入公共模块

import
sys

try
:

sys.setdefaultencoding(
'utf-8'
)

except
Exception, e:

print
e

reload
(sys)

sys.setdefaultencoding(
'utf-8'
)

print
sys.getdefaultencoding()

# 说明:导入其它模块

if
__name__
=
=
'__main__'
:

pass

说明:很多人不名为为何要reload()一下sys才能使用setdefaultencoding设置编码,其实是因为解释器初始化时预先执行了/usr/lib64/python2.7/site.py,而在其554行代码中del sys.setdefaultencoding删除了此方法,其实你import sys只是指向了那个被删除了setdefaultencoding属性的sys模块地址,所以需要重新reload一下还原此方法

相对导入:

说明: PY通过模块名中的点来判断是否属于包,属于哪个包,当你使用from ..xx import oo,其中的点表示包结构中的层次,如果模块名为__main__表示它不属于任何包,所以此时模块名应该不包含点,否则会导致relative-import in non-package错误,也就是说包含相对导入的文件无法作为入口文件,但是可通过Python -m来当作模块载入

1
2
3
from ..libs.databaseimport Redis

from ..libs.alarmimport alarm_template

from ..libs.alarm.apiimport weixin_notify

绝对导入:

说明:绝对导入也叫完全导入,2.x版本必须使用from __future__import absolute_import打开此机制,而3.x则将其作为默认机制

1
from x.y.z import o

动态导入:

说明:__import__其实就是import的内部实现,通常用于动态加载,如插件式监控系统中只知道插件名如何执行插件内的代码?此时就可以通过动态加载来实现获取插件内的函数然后去调用

__import__(module_name[, globals[, locals[, fromlist]]]) -> object

说明:module_name为模块名,但是需要注意的是如果module_name包含子模块如x.y,则默认会返回x对象,如果要返回y对象需要设置fromlist列表,来实现from x import y的效果.当然要实现动态导入含有专门的imp和importlib模块.可以学习一下~

应用场景:

1. zabbix/nagios等监控系统主动监控都必须手工配置Agent,即使自定义插件亦是如此,如果要实现一个自动检测插件自动调用插件自动上报数据的监控系统要如何实现哪?





1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#!/usr/bin/env python

# -*- coding:utf-8 -*-

"""

#

# Authors:limanman

# 51CTOBG:http://my.51CTOBG.net/pydevops/[/code] 
# Purpose:

#

"""

# 说明:兼容绝对导入

from
__future__
import
absolute_import

# 说明:导入公共模块

import
json

import
time

import
threading

# 说明:导入其他模块

from
..libs.database
import
Redis

from
..libs.alarm
import
alarm_template

from
..libs.alarm.api
import
weixin_notify


# 说明:客户端监控类

class
MonitorClient(
object
):

def
__init__(
self
, redis, agent, info, error):

self
.info
=
info

self
.error
=
error

self
.redis
=
Redis(db
=
redis[
'db'
],

   
host
=
redis[
'host'
],

   
port
=
redis[
'port'
],

   
password
=
redis[
'password'
])

self
.agent_host
=
agent[
'host'
]

self
.redis_host
=
redis[
'host'
]

self
.clientconf
=
self
._get_climconf()

self
.pubchannel
=
redis[
'publish'
]
or
'xmdevops_channel'


self
.info.info(
'update key#climconf::%s val#%s'
%
(
self
.agent_host,
self
.clientconf))


def
start(
self
):

self
._plugins_handler()


def
_get_climconf(
self
):

redis_key
=
'climconf::%s'
%
(
self
.agent_host,)

while
True
:

redis_val
=
self
.redis.get(redis_key)

if
not
redis_val:

message
=
'getval key#%s with nothing,5 seconds try again'
%
(redis_key,)

self
.info.info(message)

self
._report_exception(redis_key, message)

time.sleep(
5
)

continue

try
:

conf_dict
=
json.loads(redis_val)

except
TypeError, e:

message
=
'unpack key#%s val#%s with error %s'
%
(redis_key, redis_val, e)

self
.error.error(message)

self
._report_exception(redis_key, message)

time.sleep(
5
)

continue

break

return
conf_dict


def
_plugins_handler(
self
):

while
True
:

for
service_name, plugin_info
in
self
.clientconf.iteritems():

if
len
(plugin_info) <
4
:

self
.clientconf[service_name].append(
0
)

plugin_name, check_interval, add_data, last_runtime
=
plugin_info

if
time.time()
-
last_runtime > check_interval:

self
.clientconf[service_name][
-
1
]
=
time.time()

self
.info.info(
'plugin key#%s val#%s is called'
%
(service_name, plugin_info))

cur_thread
=
threading.Thread(

target
=
self
._plugins_called, args
=
(service_name, plugin_name, add_data))

cur_thread.start()

time.sleep(
1
)


old_clientconf
=
self
.clientconf

self
.clientconf
=
self
._get_climconf()

for
trigger_key, trigger_val
in
self
.clientconf.iteritems():

if
trigger_key
in
old_clientconf:

self
.clientconf[trigger_key].append(old_clientconf[trigger_key][
-
1
])


def
_plugins_called(
self
, service_name, plugin_name, add_data):

plugin_path
=
'app.%s.%s'
%
(
'plugins'
, service_name)

try
:

plugin_mods
=
__import__
(plugin_path, fromlist
=
[service_name])

except
ValueError, e:

message
=
'import key#%s val#%s with error %s'
%
(plugin_path, e)

self
.error.error(message)

self
._report_exception(plugin_path, message)

return

try
:

plugin_func
=
getattr
(plugin_mods, plugin_name)

except
AttributeError, e:

message
=
'plugin key#%s val#%s not exists'
%
(plugin_mods, plugin_name)

self
.error.error(message)

self
._report_exception(plugin_func, message)

return

plugin_data
=
plugin_func(add_data)

report_data
=
{

'host'
:
self
.agent_host,

'data'
:plugin_data,

'service'
:service_name

}

data
=
json.dumps(report_data)

self
.info.info(
'publish key#%s val#%s'
%
(service_name, data))

self
.redis.publish(
self
.pubchannel, data)


def
_report_exception(
self
, errors, details):

message
=
alarm_template
%
(

self
.agent_host,
'critical'
, errors,

time.strftime(
'%H:%M:%S'
, time.localtime()), details)

results
=
weixin_notify(message)

if
results:

self
.error.error(results)

说明:如上就是一个自己写的基于Redis的全自动化微型监控框架部分核心代码,首先读取网页端下发下来的监控配置,然后利用线程通过__import__动态调用插件中的入口监控函数,然后将执行结果上报到对应区域的redis,server端再处理阀值数据等等,可以作为一个非常好的学习案例

登录乐搏学院官网http://www.learnbo.com/

或关注我们的官方微博微信,还有更多惊喜哦~



本文出自 “满满李 - 运维开发之路” 博客,请务必保留此出处http://xmdevops.blog.51cto.com/11144840/1857506
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: