您的位置:首页 > 其它

创新实训5.22 多线程评测

2017-05-25 19:59 127 查看
因为我们希望能保证VJ的性能,因此不能对每次提交就立即评测,我们希望通过生产者消费者模型来解决这个问题,刚开始听取同学的建议使用celery,并参考了相关教程,如下:

http://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html

http://www.tuicool.com/articles/UbABNr

http://www.cnblogs.com/ajianbeyourself/p/3889017.html

http://blog.csdn.net/happyanger6/article/details/51404837

使用celery完成了基本的异步任务队列,文章中重复使用的例子也成功运行。但是celery是不能用来部署scrapy爬虫的,CrawlRunner会发生ReactorNotRestartable的错误,就是说Reactor不能重新开启,另一个问题就是连续多次爬取页面后新的爬虫无法监听某个端口,虽然查阅了很多资料看了StackoverFlow的很多解决方案,但是在我这里都没能解决问题(如果有好的解决办法希望跟我说)

https://segmentfault.com/q/1010000007080674

后来查到了scrapyd,相关资料可以看http://www.jianshu.com/p/a8aad3bf4dc4,简单说就是把爬虫做成了一个web服务,通过HTTP请求让爬虫开始运行。

关于Scrapyd安装和部署以及相关API:

http://scrapyd.readthedocs.io/en/latest/api.html#api

http://www.cnblogs.com/zhongtang/p/5634545.html

至今初步完成了评测结果的获取,多线程实时监控运行状态,同时使用了互斥锁来保证共享资源使用的原子性。

评测的工作过程:提交的代码与使用的sduvj帐号绑定,最多有4个代码在同时评测,这4份代码分别对应不同的帐号,获取代码运行结果是只需要查询对应帐号的最近一次提交即可。帐号是共享资源,通过互斥锁来保证其原子性。

最后将程序做成了守护进程,放在后台运行。

代码:

# coding:utf-8
import os,sys,time,threading,pymysql,traceback,requests,json
from submiter import *

mutex = threading.Lock() #acc
submitMutex = threading.Lock()
account = {"sduvj1":True,"sduvj5":True,"sduvj3":True,"sduvj4":True}
threadID = 0
inProcess = set()
threadLog = open('log/thread.log','a+')
baseURL = 'http://127.0.0.1:6800/'
schURL = baseURL + 'schedule.json'
jobsURL = baseURL + 'listjobs.json?project=vjspider'

class JudgerThread(threading.Thread):
def __init__(self,threadID,vjRunID,OJ,Prob,Code,Lang):
threading.Thread.__init__(self)
self.threadID = threadID
self.vjRunID = vjRunID
self.OJ = OJ
self.Prob = Prob
self.Code = Code
self.Lang = Lang

def run(self):
global account,threadLog,schURL,jobsURL
acc = ""
while acc == "":
if mutex.acquire():
for k in account.keys():
if account[k]:
acc = k
account[k] = False
threadLog.write("accquire account %s (vjRunID : %d threadID : %d\n"%(acc,self.vjRunID,self.threadID))
threadLog.flush()
break
mutex.release()
time.sleep(1)

spider = ""
submiter = None
if self.OJ == "hdu":
spider = "hdu_status"
submiter = HduSubmiter(self.Prob,acc,self.Code,self.Lang)

sub = False
if submitMutex.acquire():
sub = submiter.submit2OJ()
submitMutex.release()

if sub:
time.sleep(1)
dictdata = {"project":"vjspider","spider":spider,"vj_run_id":self.vjRunID,"user":acc}
r = requests.post(schURL,data = dictdata)
jobID = json.loads(r.text)['jobid']
while True:
fjobs = json.loads((requests.get(jobsURL)).text)['finished']
flag = False
for fjob in fjobs:
if fjob['id'] == jobID :
flag = True
if flag:
break
time.sleep(1)

print("Thread %d finished"%self.threadID)

if mutex.acquire():
account[acc] = True
threadLog.write("release account %s (vjRunID : %d threadID : %d\n"%(acc,self.vjRunID,self.threadID))
threadLog.flush()
mutex.release()

def daemon_init(stdin='/log/null',stdout='/log/null',stderr='/log/null'):
sys.stdin = open(stdin,'r')
sys.stdout = open(stdout,'a+')
sys.stderr = open(stderr,'a+')
try:
pid = os.fork()
if pid > 0: #parrent
os._exit(0)
except OSError as e:
sys.stderr.write("first fork failed!!"+e.strerror)
os._exit(1)

# 子进程, 由于父进程已经退出,所以子进程变为孤儿进程,由init收养
'''setsid使子进程成为新的会话首进程,和进程组的组长,与原来的进程组、控制终端和登录会话脱离。'''
os.setsid()
'''防止在类似于临时挂载的文件系统下运行,例如/mnt文件夹下,这样守护进程一旦运行,临时挂载的文件系统就无法卸载了,这里我们推荐把当前工作目录切换到根目录下'''
os.chdir("/")
'''设置用户创建文件的默认权限,设置的是权限“补码”,这里将文件权限掩码设为0,使得用户创建的文件具有最大的权限。否则,默认权限是从父进程继承得来的'''
os.umask(0)

try:
pid = os.fork() #第二次进行fork,为了防止会话首进程意外获得控制终端
if pid > 0:
os._exit(0) #父进程退出
except OSError as e:
sys.stderr.write("second fork failed!!"+e.strerror)
os._exit(1)

# 孙进程
# for i in range(3,64): # 关闭所有可能打开的不需要的文件,UNP中这样处理,但是发现在python中实现不需要。
# os.close(i)
sys.stdout.write("Daemon has been created! with pid: %d\n" % os.getpid())
sys.stdout.flush() #由于这里我们使用的是标准IO,回顾APUE第五章,这里应该是行缓冲或全缓冲,因此要调用flush,从内存中刷入日志文件。

def main():
global threadID,inProcess
db = pymysql.connect("xxxx","xxxx","xxxx","vj",use_unicode=True, charset="utf8")
cursor = db.cursor()
sql = "select * from status where result = 'Waiting'"

sys.stdout.write("main begin!\n")
sys.stdout.flush()
while True:
#inProcess need clear
try:
cursor.execute(sql)
results = cursor.fetchall()
for row in results:
vjRunID = row[0]
print("vjRunID : %s",vjRunID)
if vjRunID in inProcess:
break
inProcess.add(vjRunID)
probID = row[2]
Code = row[3]
Lang = row[4]

cursor.execute("select * from problem where proid = %s"%str(probID))
tmp_result = cursor.fetchall()
OJ = ""
Prob = ""
for tmp in tmp_result:
OJ = tmp[1]
Prob = tmp[2]
break

#sys.stdout.write("Item : %s,%s,%s\n"%(vjRunID,OJ,Prob))
threadID = threadID + 1
thd = JudgerThread(threadID,vjRunID,OJ,Prob,Code,Lang)
thd.setDaemon(False)
thd.start()
except Exception as e:
sys.stderr.write("Error : sql execute failed")
sys.stderr.write(str(e))
sys.stderr.write('traceback.print_exc():%s'% traceback.print_exc())
sys.stderr.write('traceback.format_exc():\n%s' % traceback.format_exc())
db.commit()
sys.stdout.flush()
time.sleep(5)

if __name__ == '__main__':
print ('========main function start!============') #在调用daemon_init函数前是可以使用print到标准输出的,调用之后就要用把提示信息通过stdout发送到日志系统中了
daemon_init('log/null','log/daemon.log','log/daemon.err') # 调用之后,你的程序已经成为了一个守护进程,可以执行自己的程序入口了
time.sleep(10)
main()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: