您的位置:首页 > 运维架构

windows监控——再见zmq

2015-11-02 16:39 323 查看
如果不动手实践,总会把复杂的问题想简单。其实也不复杂,只是用得少,不熟悉。

还是这个题目,已经写到了version5.1。起初很随便地用了自己不太了解的socket通信,稀里糊涂地完成了功能。但是到后期改代码就很困难了。索性一不做二不休,改成zmq通信,毕竟用过。

但是不会用zmq进行一对多的通信,客户端开多了,总是会弄乱send和recv。只能在每个循环中,只设置一对send,recv。

本想偷懒不用正则匹配,结果偷鸡不成蚀把米,zmq混乱了。还是老老实实用正则匹配从前台获取周期。

使用zmq通信最大的感受就是,一定要弄清消息的“来龙去脉”,client和server的send和recv一定要完美匹配,才不会出错。

废话说完,上干货!

server.py

#! /usr/bin/env python
#coding=utf-8
'''
fileName: server.py
数据发送方式:zmq
'''

import zmq
import time
import json
from Queue import Queue
import threading
import pymongo
import re

# 默认周期
cycle = 10

# 生产者进程
class Producer(threading.Thread):
def __init__(self, t_name, processdata):
threading.Thread.__init__(self, name=t_name)
self.data=processdata
def run(self):
print "%s: %s is producing in the queue!/n" %(time.ctime(), self.getName())
recvMassage()

# 接收数据
def recvMassage():
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while (True):
message = socket.recv()
print "message from client:", message
if '@' in message:  # 接收到前端更改周期
mode = re.compile(r'\d+')
c = mode.findall(message)
global cycle
cycle = int (c[0])
print 'Now, the cycle is ',cycle
socket.send('ok')
else:               # 接收客户端监控信息
load = json.loads(message)
info = dict(load)
print type(info)
handleData(info)        # 处理监控信息
socket.send(str(cycle)) # 返回当前周期

# 处理监控到的信息
def handleData(info):
# 链接数据库
conn = pymongo.Connection("localhost", 27017)
db = conn['networkSecurity']
systemInfo = db['systemInfo']
# 构造dict数据
message = {'IP':info[u'IP'],'CPUstate':info[u'CPUstate'],'Memorystate':info[u'Memorystate'],
'PortState':info[u'PortState'],'ProcessName':info[u'ProcessName']}
print 'Client said :\nIP:%s\nCPUstate:%s\nMemorystate:%s\nPortState:%s\nProcessName:%s'%(message['IP'],message['CPUstate'],message['Memorystate'],message['PortState'],message['ProcessName'])
# 将数据放入队列
processdata.put(message)
# 将数据存入数据库
systemInfo.insert(message)
print 'success to store the data!'

#消费者
class Consumer(threading.Thread):
def __init__(self, t_name,processdata):
threading.Thread.__init__(self, name=t_name)
def run(self):
print "%s: %s is consuming in the queue!/n" %(time.ctime(), self.getName())
message = processdata.get()
# print 'the message in the queue is :   ',message
# print type(message)
monitorSystem(message)

# 黑白名单匹配,info为字典
def monitorSystem(info):
warning = 0
whiteList = ['cmd.exe']
blackList = ['sublime_text.exe']
# for info in systemInfo.find():
#     print info
IP = info['IP']
processName = info['ProcessName']
for process in processName:
if process in blackList:
warning = 1
print 'Process  %s in black list is running in IP %s ! '%(process,IP)
for process in whiteList:
if process not in processName:
warning = 1
print 'Process %s in white list is not running in IP %s ! '%(process,IP)
if warning == 0:
print 'Host %s is running legally ! '%IP

if __name__ == '__main__':
# 处理队列
processdata=Queue()

# 生产进程:接受数据
producer = Producer('Pro.', processdata)

# 消费进程:处理数据,黑白名单匹配
consumer = Consumer('Con.', processdata)

producer.start()
consumer.start()

producer.join()
consumer.join()


client.py

#! /usr/bin/env python
#coding=utf-8
'''
fileName:client.py
监控windows信息:CPU占有率,内存占有率,端口开放情况,当前进程名称
数据格式:    {'IP':getIp(),'CPUstate':getCPUState(),'Memorystate':getMemoryState(),
'PortState':getPortState(),'ProcessName':getProcessName()}
'''

import zmq
import psutil
import json
import socket
import thread
import time

# 向服务器发送监控信息
def sendMessage(portState):
context = zmq.Context()
print "Connecting to server..."
socket = context.socket(zmq.REQ)
socket.connect ("tcp://192.168.111.135:5555")    # 这里的IP为服务器IP

mymessage = json.dumps(packMessage(portState))
socket.send(mymessage)    # 发送监控信息
info = socket.recv()     # 收到周期
print "Received reply: ",info
c = json.loads(info)
global cycle
cycle = int(c)

# 构造数据包
def packMessage(portState):
message =     {'IP':getIp(),'CPUstate':getCPUState(),'Memorystate':getMemoryState(),
'PortState':portState,'ProcessName':getProcessName()}
print 'My message is :\nIP:%s\nCPUstate:%s\nMemorystate:%s\nPortState:%s\nProcessName:%s'%(message['IP'],message['CPUstate'],message['Memorystate'],message['PortState'],message['ProcessName'])
return message

# 获取本机IP
def getIp():
myname = socket.getfqdn(socket.gethostname())
myaddr = socket.gethostbyname(myname)
return myaddr

# 获取CPU使用率
def getCPUState(interval=1):
return (str(psutil.cpu_percent(interval)) + "%")

# 获取内存使用率
def getMemoryState():
mem_rate = 0
for pnum in psutil.pids():
p = psutil.Process(pnum)
mem_rate = mem_rate + p.memory_percent()
return "%.2f%%"%mem_rate

# 输入IP和端口号,扫描判断端口是否开放
def socket_port(ip,port,portList):
try:
if port >= 65535:
print u'端口扫描结束'
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = s.connect_ex((ip,port))
if result == 0:
lock.acquire()
portList.append(port)
lock.release()
s.close()
except:
print u'端口扫描异常'

# 输入IP,扫描IP的0-65534端口情况
def ip_scan(ip):
portList = []
socket.setdefaulttimeout(3)
try:
for i in range(0,65534):
thread.start_new_thread(socket_port,(ip,int(i),portList))
# 返回所有开放的端口号
return portList
except:
print u'扫描ip出错'

# 获取正在运行的进程名称
def getProcessName():
ProcessNameList = []
for pnum in psutil.pids():
p = psutil.Process(pnum)
ProcessNameList.append(p.name())
return ProcessNameList

if __name__ == '__main__':
global cycle
cycle = 60
while(True):
# 当前端口开放状态
myIP = getIp()
lock = thread.allocate_lock()
portState = ip_scan(myIP)

# 主要函数——发送监控信息
sendMessage(portState)

# 发送周期
print 'The cycle is',cycle
time.sleep(cycle)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: