您的位置:首页 > 其它

空气质量国控站点数据插值出全国3181个城市值,利用了多线程

2017-11-02 13:04 246 查看
#coding=utf-8
import arcpy
import math
import sys
import datetime
import pymssql
import json
import os
import time
import uuid
import logging
import multiprocessing
import random
from arcpy import env
from arcpy.sa import *

def drawpng(date1,hour1,pullute,where):
logging.debug(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+"进入drawpng:"+pullute);
#消除多进程报错
time.sleep(1.1)
newTempDir = r"E:\temp\IDWmpenvr_" + str(time.strftime('%Y%m%d%H%M%S')) + str(random.random()*10000);
os.mkdir(newTempDir)
os.environ["TEMP"] = newTempDir
os.environ["TMP"] = newTempDir

env.workspace = r"E:\idw\tif_hour";
mapPath =r"E:\idw\tif_hour";

cnxn =pymssql.connect(host='127.0.0.1', user='sa', password='xzs@123', database='nationAir')
cursor = cnxn.cursor();

#查询污染物数据
sql="SELECT b.stationcode,"+pullute+" from monitor_site_hour a,hf_site b where a.site=b.stationname and a.city=b.cityname and datetime='"+date1+" "+hour1+":00:00'"+" and "+where;
cursor.execute(sql);
cursorData = cursor.fetchall();

#把查询数据保存到SHP
for item in cursorData:
fc = "sites_"+pullute+".shp";
where ="SITEID='"+str(item[0])+"'";
rows = arcpy.UpdateCursor(fc,where);
for row in rows:
row.setValue(str(pullute),float(item[1]));
rows.updateRow(row)
del rows,fc;
logging.debug(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+"数据保存到shp完成:"+pullute);

#进行差值操作
inPointFeatures = "sites_"+pullute+".shp";
arcpy.CheckOutExtension("Spatial")
outSplineBarriers = Idw(inPointFeatures,pullute);
tif ="pointraster_"+pullute+".tif";
if os.path.exists(mapPath+r"\pointraster_"+pullute+".tif"):
#老tif文件删除
os.remove(mapPath+r"\pointraster_"+pullute+".tif")
outSplineBarriers.save(tif);
logging.debug(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+"插值操作完成:"+pullute);

#清理内存
del tif,inPointFeatures,outSplineBarriers;
cursor.close;
cnxn.close;
logging.debug(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+"drawpng完成:"+pullute);

def insertData(date1,hour1):

env.workspace = r"E:\idw\tif_hour";
mapPath =r"E:\idw\tif_hour";

cnxn =pymssql.connect(host='127.0.0.1', user='sa', password='xzs@123', database='nationAir')
cursor = cnxn.cursor();
cursor1 = cnxn.cursor();

sql1="SELECT  citycode,longitude,latitude FROM hf_city";
cursor.execute(sql1);
cursorData = cursor.fetchall();

tif_so2 ="pointraster_so2.tif";
tif_no2 ="pointraster_no2.tif";
tif_co ="pointraster_co.tif";
tif_o3 ="pointraster_o3.tif";
tif_pm10 ="pointraster_pm10.tif";
tif_pm25 ="pointraster_pm25.tif";

#insert到数据库
for item in cursorData:
point=item[1]+" "+item[2];
try:
result_so2 = arcpy.GetCellValue_management(tif_so2,point);
result_no2 = arcpy.GetCellValue_management(tif_no2,point);
result_co = arcpy.GetCellValue_management(tif_co,point);
result_o3 = arcpy.GetCellValue_management(tif_o3,point);
result_pm10 = arcpy.GetCellValue_management(tif_pm10,point);
result_pm25 = arcpy.GetCellValue_management(tif_pm25,point);
updateSql="insert into hf_idw(citycode,datetimes,so2,no2,co,o3,pm10,pm25) values('%s','%s',%s,%s,%s,%s,%s,%s)" % (item[0],date1+" "+hour1+":00:00",result_so2.getOutput(0),result_no2.getOutput(0),result_co.getOutput(0),result_o3.getOutput(0),result_pm10.getOutput(0),result_pm25.getOutput(0));
cursor1.execute(updateSql);
except Exception, e:
logging.debug(e.message);
cnxn.commit();

#清理内存
del result_so2,result_no2,result_co,result_o3,result_pm10,result_pm25;
cursor.close;
cursor1.close;
cnxn.close;
logging.debug(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+"insertData完成:");

LOG_FILENAME="E:\idw\log_hour.txt";
logging.basicConfig(filename=LOG_FILENAME,level=logging.NOTSET);

if __name__ == '__main__':#windows下必须加这句

cnxn =pymssql.connect(host='127.0.0.1', user='sa', password='xzs@123', database='nationAir')
cursor = cnxn.cursor();

d1 = datetime.datetime.now();
date1=d1.strftime('%Y-%m-%d');
d3= d1 + datetime.timedelta(hours=-1);
hour1=d3.strftime('%H');
logging.debug(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+"小时值7项变量绘图开始"+date1+",小时:"+hour1);

checksql="SELECT count(*) FROM monitor_site_hour where datetime='"+date1+" "+hour1+":00:00'";
cursor.execute(checksql);
checkdata= cursor;

num=0;
for item0 in checkdata:
num=item0[0];
logging.debug(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+"检查数据库数量:"+str(num));
if num>1000:
logging.debug(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+"检查通过,开始计算");

#开启6个线程并行计算
pool = multiprocessing.Pool(processes = 6)
pool.apply_async(drawpng, (date1,hour1,"so2","so2!='_'",));
pool.apply_async(drawpng, (date1,hour1,"no2","no2!='_'",));
pool.apply_async(drawpng, (date1,hour1,"co","co!='_'",));
pool.apply_async(drawpng, (date1,hour1,"o3","o3!='_'",));
pool.apply_async(drawpng, (date1,hour1,"pm10","pm10!='_'",));
pool.apply_async(drawpng, (date1,hour1,"pm25","pm25!='_'",));
pool.close()
pool.join()

insertData(date1,hour1);

logging.debug(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+"计算结束");

cursor.close;
cnxn.close;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: