您的位置:首页 > 其它

多线程应用-类(thread)

2016-04-15 17:22 363 查看
在对class thread加锁时,锁无法正常应用,函数方式没问题。

在使用class thread方法时,并发后的查询结果不对,函数方式没问题。

# -*- coding: UTF-8 -*-
from time import ctime,sleep
import threading,datetime
from Queue import Queue

class pdc(threading.Thread):
def __init__(self,t_name):
threading.Thread.__init__(self,name=t_name)
#self.name='aaa' #此时self还不是Thread,为string格式
def run(self): #run()方法继承于threading,需要重写定义自己的内容
self.setName('b'+str(i)) #self.setName('bbb') #此时self是Thread,可以通过 print dir(self) 查看所具有的属性/方法
print '%s: %s is producing %d to the queue.' %(ctime(),self.getName(),i)
sleep(1)

if __name__ == '__main__':
threads=[]
for i in range(5):
t = pdc('p'+str(i))
t.start()
threads.append(t)
for t in threads:
t.join()


返回结果:

Fri Apr 15 17:19:22 2016: b1 is producing 1 to the queue.
Fri Apr 15 17:19:22 2016: b1 is producing 1 to the queue.
Fri Apr 15 17:19:22 2016: b2 is producing 2 to the queue.
Fri Apr 15 17:19:22 2016: b4 is producing 4 to the queue.
Fri Apr 15 17:19:22 2016: b4 is producing 4 to the queue.

生产者-消费者模型(Thread-Queue):

# -*- coding: UTF-8 -*-
from time import ctime,sleep
import threading,datetime
from Queue import Queue

class pdc(threading.Thread):
def __init__(self,t_name,queue):
threading.Thread.__init__(self)
self.data = queue
self.name = t_name
def run(self): #run()方法继承于threading,需要重写定义自己的内容
tname = self.name
for i in range(5):
#print self.name
nn =  tname + str(i)
self.setName(nn) #self.setName('bbb') #此时self是Thread,可以通过 print dir(self) 查看所具有的属性/方法
print '%s: %s is producing %d to the queue.' %(ctime(),self.getName(),i)
self.data.put(nn)
sleep(1)
print '%s: %s pdc finished!' %(ctime(),self.getName())

class cum(threading.Thread):
def __init__(self,t_name,queue):
threading.Thread.__init__(self)
self.data = queue
self.name = t_name
def run(self):
tname = self.name
for i in range(5):
nn =  tname + str(i)
self.setName(nn)
val = self.data.get()
print '%s: %s in consuming %d in the queue is consumed.' %(ctime(),self.getName(),i)
sleep(2)
print '%s: %s cum finished!' %(ctime(),self.getName())

if __name__ == '__main__':
queue = Queue()
producer = pdc('p',queue)
consumer = cum('c',queue)
producer.start()
consumer.start()
producer.join()
consumer.join()

# print queue.qsize()
# while not queue.empty():
#     print queue.get_nowait()


返回结果:

Mon Apr 18 10:05:26 2016: p0 is producing 0 to the queue.
Mon Apr 18 10:05:26 2016: c0 in consuming 0 in the queue is consumed.
Mon Apr 18 10:05:27 2016: p1 is producing 1 to the queue.
Mon Apr 18 10:05:28 2016: p2 is producing 2 to the queue.
Mon Apr 18 10:05:28 2016: c1 in consuming 1 in the queue is consumed.
Mon Apr 18 10:05:29 2016: p3 is producing 3 to the queue.
Mon Apr 18 10:05:30 2016: p4 is producing 4 to the queue.Mon Apr 18 10:05:30 2016: c2 in consuming 2 in the queue is consumed.

Mon Apr 18 10:05:31 2016: p4 pdc finished!
Mon Apr 18 10:05:32 2016: c3 in consuming 3 in the queue is consumed.
Mon Apr 18 10:05:34 2016: c4 in consuming 4 in the queue is consumed.
Mon Apr 18 10:05:36 2016: c4 cum finished!

生产者消费者模型,直接将queue定义为global,实际同上:

from time import ctime,sleep
import threading,datetime
from Queue import Queue
global queue

class pdc(threading.Thread):
def __init__(self,queue):
threading.Thread.__init__(self)

def run(self): #run()方法继承于threading,需要重写定义自己的内容
for i in range(5):

nn =  'qq_' + str(i)
print '%s: producing %s to the queue.' %(ctime(),nn)
queue.put(nn) #将生成出来的数据放入到queue中
sleep(0.1)
print '%s pdc finished!' %(ctime())

class cum(threading.Thread):
def __init__(self,queue):
threading.Thread.__init__(self)

def run(self):
for i in range(5):
val = queue.get() #从queue中取数据进行消费
print '%s: consuming %s. the last number of queue is %d' %(ctime(),val,queue.qsize())
sleep(1)
print '%s consume finished!' %(ctime())

if __name__ == '__main__':
queue = Queue()
producer = pdc(queue)
consumer = cum(queue)
producer.start()
consumer.start()
producer.join()
consumer.join()


返回:

Sun May 22 14:34:38 2016: producing qq_0 to the queue.
Sun May 22 14:34:38 2016: consuming qq_0. the last number of queue is 0
Sun May 22 14:34:39 2016: producing qq_1 to the queue.
Sun May 22 14:34:39 2016: producing qq_2 to the queue.
Sun May 22 14:34:39 2016: producing qq_3 to the queue.
Sun May 22 14:34:39 2016: producing qq_4 to the queue.
Sun May 22 14:34:39 2016 pdc finished!
Sun May 22 14:34:39 2016: consuming qq_1. the last number of queue is 3
Sun May 22 14:34:40 2016: consuming qq_2. the last number of queue is 2
Sun May 22 14:34:41 2016: consuming qq_3. the last number of queue is 1
Sun May 22 14:34:42 2016: consuming qq_4. the last number of queue is 0
Sun May 22 14:34:43 2016 consume finished!

多线程获取服务器信息(启动多个线程获取服务器信息放到quque中,启动一个线程从queue中获取数据进行处理,如写入数据库):

#-*- coding: UTF-8 -*-
import subprocess,cjson,threading
from Queue import Queue

class P_infor(threading.Thread): #定义生产者,执行serverinfoj.ps1脚本(该脚本返回值为json字符串),获取服务器信息(dict),并放到queue中。
def __init__(self,lock,queue,IP):
threading.Thread.__init__(self)
self.psf = 'E:\\serverinfoj.ps1'
self.IP = IP
self.lock = lock
self.queue = queue
def run(self):
getinforchild = subprocess.Popen(['powershell.exe',self.psf,self.IP],shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
while getinforchild.poll() == None:
rsts = getinforchild.stdout.readlines()
if len(rsts) <> 0:
infor = cjson.decode(rsts[0])
if infor["Status"] == 'Success':
#print infor["Infors"]
with self.lock:
self.queue.put(infor["Infors"])
else:
print infor["Status"]

class c_infor(threading.Thread): #定义消费者,从queue中取出服务器信息
def __init__(self,queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
try:
qq = self.queue.get(timeout=20)
print qq
except:
break

if __name__ == '__main__':
IPS = ['10.160.30.50','10.160.30.51','10.160.25.48','10.160.26.50']
lst = IPS
tnum = 4 #定义线程数量
tcn = 1 #定义消费者进程数量
lock = threading.Lock()
queue = Queue()

for i in range(0,len(lst),tnum):
threadsp=[]
for IP in lst[i:i+tnum]:
tp=P_infor(lock,queue,IP)
tp.start()
threadsp.append(tp)

if tcn == 1: #消费者进程只启动一次
tc=c_infor(queue)
tc.start()
tcn = 0

for tp in threadsp:
tp.join()
tc.join()


附,serverinfoj.ps1脚本内容:

param($server)
#定义获取计算机信息的函数
Function GetServerInfo ($server,$account,$serverpass)
{

If (Test-Connection $server -count 2 -quiet)
{
$UserName = $account
$Password = ConvertTo-SecureString $serverpass -AsPlainText –Force
$cred = New-Object System.Management.Automation.PSCredential($UserName,$Password)
$session = New-PSSession -ComputerName $server -Credential $cred

$system = Invoke-Command -Session $session -ScriptBlock {Get-WmiObject -Class Win32_ComputerSystem}
If ($?)
{
#获取计算机域名、型号
$domainname = $system.Domain
$model = $system.Model

#获取计算机IP地址,取IP和gw不为空的网卡IP地址
#$ip = gwmi Win32_NetworkAdapterConfiguration -computer $server  -Credential $cred |?{$_.ipaddress -ne $null -and $_.defaultipgateway -ne $null}
#$ipaddr = $system.Name

#获取操作系统版本
$os = Invoke-Command -Session $session -ScriptBlock {Get-WmiObject -Class Win32_OperatingSystem}

#获取操作系统版本
$os_caption =  $os.Caption
If ($os_caption.Contains("Server 2008 R2 Enterprise"))
{$os_caption_s = "Win2008"}
ElseIf ($os_caption.Contains("Server 2003 Enterprise"))
{$os_caption_s = "Win2003"}
Else {$os_caption_s = $os.Caption}
$osversion = $os_caption_s + " " + $os.OSArchitecture.Substring(0,2) + "bit"

#获取CPU名称、单颗CPU核心数量*CPU个数
$cpus = Invoke-Command -Session $session -ScriptBlock {Get-WmiObject -Class win32_processor}
$cpucount = 0
Foreach ($cpu in $cpus)
{
If ($cpu.DeviceID -ne $null)
{$cpucount += 1}
}
$cpunamecore = $cpu.name+"    "+[string]$cpu.NumberOfLogicalProcessors + '*' + [string]$cpucount + "C"

#获取内存大小
$memorys = Invoke-Command -Session $session -ScriptBlock {Get-WmiObject -Class Win32_PhysicalMemory}
#$memorylist = $null
$memorysize_sum = $null
Foreach ($memory in $memorys)
{
#$memorylist += ($memory.capacity/1024/1024/1024).tostring("F1")+"GB + "
[int]$memorysize_sum_n +=  $memory.capacity/1024/1024/1024
}
$memorysize_sum = [string]$memorysize_sum_n + "GB"

#获取磁盘信息
$disks = Invoke-Command -Session $session -ScriptBlock {Get-WmiObject -Class Win32_Diskdrive}
$disklist = $null
#$disksize_sum = $null
Foreach ($disk in $disks)
{
$disklist += ($disk.deviceid.replace("\\.\PHYSICALDRIVE","Disk") +":" + [int]($disk.size/1024/1024/1024)+"GB ")
#$disksize_sum+=$disk.size
}

#获取计算机序列号、制造商
$bios = Invoke-Command -Session $session -ScriptBlock {Get-WmiObject -Class Win32_BIOS}
$sn = $bios.SerialNumber
If ($sn.Substring(0,6) -eq "VMware")
{$sn = "VMware"}
If ($bios.Manufacturer.contains("Dell"))
{$manufacturer = "Dell"}
Elseif ($bios.Manufacturer.contains("HP"))
{$manufacturer = "HP"}
Elseif ($bios.Manufacturer.contains("Microsoft"))
{
$manufacturer = "Microsoft"
$sn = "Hyper-V"
}
Else {$manufacturer = $bios.Manufacturer}
$type = $manufacturer + " " + $model

$serverinfoj = @"
{"Status": "Success","Infors": {"ServerName": "$env:ComputerName","IP": "$Server","OSVersion": "$osversion","MemorySize": "$memorysize_sum", "CPU": "$cpunamecore","DomainName": "$domainname","DISK": "$disklist","SN": "$sn","Type":"$type"}}
"@

}
Else
{
$serverinfoj = @"
{"Status": "RPC Failed"}
"@
}
}
Else
{
$serverinfoj = @"
{"Status": "Unreachable"}
"@
}
#$serverinfo = ConvertFrom-Json -InputObject $serverinfoj
Return $serverinfoj
}
$account = 'u\admin'
$serverpass = 'password'
GetServerInfo $server $account $serverpass


View Code
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: