您的位置:首页 > 其它

进程池与线程池在数据仓库迁移中的多并发应用

2017-07-12 22:49 218 查看
博客核心内容:

1、进程池并发应用
2、线程池并发应用


(一)1、进程池并发应用

代码示例:

#!/usr/bin/python
# -*- coding:utf-8 -*-

"""
function:本程序的目的是替代shell脚本,实现拷贝数据的功能

"""

from multiprocessing import Process,Pool
import os
import subprocess

def copy_data(line):
source_destination,target_destination = line.strip().split("#")
print("source_destination  is :%s" % source_destination)
print("target_destination  is :%s" % target_destination)

command = " hadoop fs -du -s %s | awk '{print %s}' "%(source_destination,"$1")
res_size = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE).stdout.read().decode("utf-8")
num_map = ( int(res_size)// 1024 // 1024 // 256 + 1)
if num_map <= 6:
num_map = 6
elif num_map >= 3000
num_map = 10000
else:
num_map = num_map

cmd = "hadoop distcp -update -skipcrccheck  -delete  -m  %s   %s  %s"%(num_map,source_destination,target_destination)
print("\033[41;1m%s\033[0m" %cmd,end="\n\n")
subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)

if __name__ == '__main__':
pool = Pool(processes=200)

with open("distcp.list",mode="r",encoding="utf-8") as fr:
for line in fr.readlines():
pool.apply_async(func=copy_data,args=(line,))

pool.close()
pool.join()

print("数据拷贝完毕!")


运行结果展示:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: