您的位置:首页 > 编程语言 > Python开发

flask + Python3 实现的的API自动化测试平台---- IAPTest接口测试平台

2017-11-24 12:18 676 查看
**背景: 1.平时测试接口,总是现写代码,对测试用例的管理,以及测试报告的管理持久化做的不够,
2.工作中移动端开发和后端开发总是不能并行进行,需要一个mock的依赖来让他们并行开发。
3.同时让自己锻炼去开发测试平台,掌握flask开发程序,提高自己的业务水平。

整体思路: 1.利用flask+bootstrap来进行web界面开发,对接口,接口测试用例,定时任务,测试报告的持续集成。
2.IAPTest支持接口用例管理,接口多用例测试,支持定时测试任务,测试报告持久化
3.目前mock服务支持单一path,定时任务可以开启暂停多用例执行,定时任务执行后自动发送测试报告,多用例的单次执行,单接口的调试功能。对测试环境的管理
下面来看下最后的效果图,以及附上github开源地址。

测试环境管理界面:体验区:http://60.205.187.178:89/ 账号:liwanlei密码liwanlei



定时任务界面:



mock界面



测试报告界面



用例管理界面



接口管理界面



**核心代码分享区:**

定时任务对应视图开发

class AddtimingtaskView(MethodView):
@login_required
def get(self):
return  render_template('addtimingtasks.html')
@login_required
def post(self):
taskname=request.form['taskname']
tinmingtime=request.form['time']
to_email_data=request.form['to_email']
cao_email=request.form['cao_email']
weihu=request.form['weihu']
if taskname =='':
flash('任务名不能为空!')
return render_template('addtimingtasks.html')
if tinmingtime =='':
flash('任务执行时间不能为空!')
return render_template('addtimingtasks.html')
if to_email_data=='':
flash('发送给谁邮件不能为空!')
return render_template('addtimingtasks.html')
if weihu=='':
flash('维护人邮件不能为空!')
return render_template('addtimingtasks.html')
taskname_is = Task.query.filter_by(taskname=taskname).first()
if taskname_is:
flash('任务已经存在请重新填写!')
return render_template('addtimingtasks.html')
new_task=Task(taskname=taskname,taskstart=tinmingtime,taskrepor_to=to_email_data,taskrepor_cao=cao_email,task_make_email=weihu,
makeuser=current_user.id)
db.session.add(new_task)
try:
db.session.commit()
flash('添加定时任务成功')
return  redirect(url_for('timingtask'))
except Exception as e:
db.session.rollback()
flash('添加过程貌似异常艰难!')
return redirect(url_for('addtimingtasks'))
return render_template('addtimingtasks.html')
class Editmingtaskview(MethodView):
@login_required
def get(self,id):
task_one=Task.query.filter_by(id=id).first()
procjet=Project.query.all()
if not task_one:
flash('你编辑的不存在')
return  redirect(url_for('timingtask'))
return  render_template('Edittimingtasks.html',task_one=task_one,porjects=procjet)
def post(self,id):
task_one = Task.query.filter_by(id=id).first()
procjet = Project.query.all()
taskname = request.form['taskname']
tinmingtime = request.form['time']
to_email_data = request.form['to_email']
cao_email = request.form['cao_email']
weihu = request.form['weihu']
if taskname =='':
flash('任务名不能为空!')
return render_template('addtimingtasks.html')
if tinmingtime =='':
flash('任务执行时间不能为空!')
return render_template('addtimingtasks.html')
if to_email_data=='':
flash('发送给谁邮件不能为空!')
return render_template('addtimingtasks.html')
if weihu=='':
flash('维护人邮件不能为空!')
return render_template('addtimingtasks.html')
task_one.taskname=taskname
task_one.taskrepor_to=to_email_data
task_one.taskrepor_cao=cao_email
task_one.task_make_email=weihu
task_one.makeuser=current_user.id
try:
db.session.commit()
flash('编辑成功')
return  redirect(url_for('timingtask'))
except:
db.session.rollback()
flash('编辑出现问题!')
return redirect(url_for('timingtask'))
return render_template('Edittimingtasks.html', task_one=task_one,porjects=procjet)
class DeteleTaskViee(MethodView):
def get(self,id):
task_one = Task.query.filter_by(id=id).first()
if not task_one:
flash('你编辑的不存在')
return redirect(url_for('timingtask'))
if task_one.status==True:
flash('已经删除')
return redirect(url_for('timingtask'))
task_one.status=True
try:
db.session.commit()
flash('删除任务成功')
return redirect(url_for('timingtask'))
except:
db.session.rollback()
flash('删除任务休息了')
return redirect(url_for('timingtask'))
@app.route('/gettest',methods=['POST'])
@login_required
def gettest():#ajax获取项目的测试用例
projec=(request.get_data('project')).decode('utf-8')
if not projec:
return []
proje=Project.query.filter_by(project_name=str(projec)).first()
if not proje:
return  []
testyong=InterfaceTest.query.filter_by(projects_id=proje.id).all()
testyong_list=[]
for i in testyong:
testyong_list.append({'name':i.Interface_name,'id':i.id})
return   jsonify({'data':testyong_list})
class TestforTaskView(MethodView):#为测试任务添加测试用例
def get(self,id):
procjet = Project.query.all()
task_one=Task.query.filter_by(id=id).first()
return  render_template('addtestyongfortask.html',task_one=task_one,procjets=procjet)
def post(self,id):
procjet = Project.query.all()
task_one = Task.query.filter_by(id=id).first()
proc_test=request.form.get('project')
if proc_test =='':
flash(u'不能不添加测试项目!')
return render_template('addtestyongfortask.html', task_one=task_one, procjets=procjet)
test_yongli=request.form.getlist('testyongli')
if test_yongli=='':
flash(u'亲你见过只有测试项目没有测试用例的测试任务吗!')
return render_template('addtestyongfortask.html', task_one=task_one, procjets=procjet)
for oldtask in task_one.interface.all():
task_one.interface.remove(oldtask)
task_one.prject=Project.query.filter_by(project_name=proc_test).first().id
for yongli in test_yongli:
task_one.interface.append(InterfaceTest.query.filter_by(id=yongli).first())
db.session.add(task_one)
try:
db.session.commit()
flash('任务更新用例成功')
return  redirect(url_for('timingtask'))
except:
flash('任务更新用例失败')
return redirect(url_for('timingtask'))
return render_template('addtestyongfortask.html', task_one=task_one, procjets=procjet)
class StartTaskView(MethodView):#开始定时任务
@login_required
def get(self,id):
task=Task.query.filter_by(id=id).first()
if len(task.interface.all())<=1:
flash('定时任务执行过程的测试用例为多用例,请你谅解')
return  redirect(url_for('timingtask'))
try:
scheduler.add_job(func=addtask, id=str(id), args=str(id),trigger=eval(task.taskstart),replace_existing=True)
task.yunxing_status='启动'
db.session.commit()
flash(u'定时任务启动成功!')
return  redirect(url_for('timingtask'))
except Exception as e:
flash(u'定时任务启动失败!请检查任务的各项内容各项内容是否正常')
return redirect(url_for('timingtask'))
class ZantingtaskView(MethodView):#暂停定时任务
@login_required
def get(self,id):
task = Task.query.filter_by(id=id).first()
try:
scheduler.pause_job(str(id))
task.yunxing_status = '暂停'
db.session.commit()
flash(u'定时任务暂停成功!')
return redirect(url_for('timingtask'))
except:
task.yunxing_status = '创建'
db.session.commit()
flash(u'定时任务暂停失败!已经为您初始化')
return redirect(url_for('timingtask'))
class HuifutaskView(MethodView):#回复定时任务
@login_required
def get(self,id):
task = Task.query.filter_by(id=id).first()
try:
scheduler.resume_job(str(id))
task.yunxing_status='启动'
db.session.commit()
flash(u'定时任务恢复成功!')
return redirect(url_for('timingtask'))
except:
task.yunxing_status = '创建'
db.session.commit()
flash(u'定时任务恢复失败!已经为您初始化')
return redirect(url_for('timingtask'))
class YichuTaskView(MethodView):#移除定时任务
@login_required
def get(self,id):
task = Task.query.filter_by(id=id).first()
try:
scheduler.delete_job(str(id))
task.yunxing_status='关闭'
db.session.commit()
flash(u'定时任务移除成功!')
return redirect(url_for('timingtask'))
except:
task.yunxing_status = '创建'
db.session.commit()
flash(u'定时任务移除失败!已经为您初始化')
return redirect(url_for('timingtask'))


定时任务所执行的func代码

def addtask(id):#定时任务执行的时候所用的函数
in_id=int(id)
task=Task.query.filter_by(id=in_id).first()
starttime = datetime.datetime.now()
star = time.time()
day = time.strftime("%Y%m%d%H%M", time.localtime(time.time()))
basedir = os.path.abspath(os.path.dirname(__file__))
file_dir = os.path.join(basedir, 'upload')
file = os.path.join(file_dir, (day + '.log'))
if os.path.exists(file) is False:
os.system('touch %s' % file)
filepath = os.path.join(file_dir, (day + '.html'))
if os.path.exists(filepath) is False:
os.system(r'touch %s' % filepath)
projecct_list = []
model_list = []
Interface_name_list = []
Interface_url_list = []
Interface_meth_list = []
Interface_pase_list = []
Interface_assert_list = []
Interface_headers_list = []
id_list = []
for task_yongli in task.interface.all():
id_list.append(task_yongli.id)
projecct_list.append(task_yongli.projects)
model_list.append(task_yongli.models)
Interface_url_list.append(task_yongli.Interface_url)
Interface_name_list.append(task_yongli.Interface_name)
Interface_meth_list.append(task_yongli.Interface_meth)
Interface_pase_list.append(task_yongli.Interface_pase)
Interface_assert_list.append(task_yongli.Interface_assert)
Interface_headers_list.append(task_yongli.Interface_headers)
apitest = ApiTestCase(Interface_url_list, Interface_meth_list, Interface_pase_list, Interface_assert_list, file,
Interface_headers_list)
result_toal, result_pass, result_fail, relusts, bask_list = apitest.testapi()
endtime = datetime.datetime.now()
end = time.time()
createHtml(titles=u'接口测试报告', filepath=filepath, starttime=starttime, endtime=endtime, passge=result_pass,
fail=result_fail, id=id_list, name=projecct_list, headers=Interface_headers_list,
coneent=Interface_url_list, url=Interface_meth_list, meth=Interface_pase_list,
yuqi=Interface_assert_list, json=bask_list, relusts=relusts)
hour = end - star
user_id = User.query.filter_by(role_id=2).first().id
new_reust = TestResult(Test_user_id=user_id, test_num=result_toal, pass_num=result_pass, fail_num=result_fail,
test_time=starttime, hour_time=hour, test_rep=(day + '.html'), test_log=(day + '.log'))
email = EmailReport.query.filter_by(role_id=2, default_set=True).first()
send_emails(sender=email.send_email, receivers=task.taskrepor_to, password=email.send_email_password,
smtp=email.stmp_email, port=email.port, fujian1=file, fujian2=filepath, subject=u'%s自动用例执行测试报告' % day,
url='%stest_rep'%(request.url_root))
db.session.add(new_reust)
db.session.commit()


mock服务的一个请求方式的代码

1 class MakemockserverView(MethodView):#做一个mock服务
2     def get(self,path):#get请求方法
3         huoqupath=Mockserver.query.filter_by(path=path,status=True).first()
4         heders=request.headers
5         method=request.method
6         if not huoqupath:
7             abort(404)
8         if method.lower() !=huoqupath.methods:
9             return  jsonify({'code':'-1','message':'请求方式错误!','data':''})
10         if huoqupath.is_headers==True:
11             if comp_dict(heders,huoqupath.headers) ==True:
12                 if huoqupath.ischeck==True:
13                     paerm = request.values.to_dict()
14                     if dict_par(paerm,huoqupath.params)==True:
15                         if huoqupath.rebacktype == 'json':
16                             try:
17                                 json_fan = json.dumps(huoqupath.fanhui)
18                                 return jsonify({'code': '1', 'message': 'successs', 'data': json_fan})
19                             except:
20                                 return jsonify({'code': '-2', 'message': '你写入的返回不能正常json!请检查', 'data': ''})
21                         elif huoqupath.rebacktype == 'xml':
22                             response = make_response(huoqupath.fanhui)
23                             response.content_type = 'application/xml'
24                             return response
25                         else:
26                             return jsonify({'code': '-2', 'message': '你写入的类型目前系统不支持', 'data': ''})
27                     else:
28                         return jsonify({'code': '-4', 'message': '你输入的参数不正确', 'data': ''})
29                 else:
30                     if huoqupath.rebacktype=='json':
31                         try:
32                             json_fan=json.dumps(huoqupath.fanhui)
33                             return  jsonify({'code': '1', 'message': 'successs', 'data':json_fan})
34                         except:
35                             return jsonify({'code': '-2', 'message': '你写入的返回不能正常json!请检查', 'data': ''})
36                     elif huoqupath.rebacktype =='xml':
37                         response=make_response(huoqupath.fanhui)
38                         response.content_type='application/xml'
39                         return response
40                     else:
41                         return jsonify({'code': '-2', 'message': '你写入的类型目前系统不支持', 'data': ''})
42             else:
43                 return jsonify({'code': '-3', 'message': '安全校验失败!', 'data': ''})
44         if huoqupath.ischeck == True:
45             paerm = request.values.to_dict()
46             if dict_par(paerm, huoqupath.params) == True:
47                 if huoqupath.rebacktype == 'json':
48                     try:
49                         json_fan = json.dumps(huoqupath.fanhui)
50                         return jsonify({'code': '1', 'message': 'successs', 'data': json_fan})
51                     except:
52                         return jsonify({'code': '-2', 'message': '你写入的返回不能正常json!请检查', 'data': ''})
53                 elif huoqupath.rebacktype == 'xml':
54                     response = make_response(huoqupath.fanhui)
55                     response.content_type = 'application/xml'
56                     return response
57                 else:
58                     return jsonify({'code': '-2', 'message': '你写入的类型目前系统不支持', 'data': ''})
59             else:
60                 return jsonify({'code': '-4', 'message': '你输入的参数不正确', 'data': ''})
61         else:
62             if huoqupath.rebacktype == 'json':
63                 try:
64                     json_fan = json.dumps(huoqupath.fanhui)
65                     return jsonify({'code': '1', 'message': 'successs', 'data': json_fan})
66                 except:
67                     return jsonify({'code': '-2', 'message': '你写入的返回不能正常json!请检查', 'data': ''})
68             elif huoqupath.rebacktype == 'xml':
69                 response = make_response(huoqupath.fanhui)
70                 response.content_type = 'application/xml'
71                 return response
72             else:
73                 return jsonify({'code': '-2', 'message': '你写入的类型目前系统不支持', 'data': ''}) #


开源地址:https://github.com/liwanlei/FXTest

使用说明:

1.依赖包为requirements.txt文件下的,可能部分不全,需要什么可以自己使用中增加。
2.目前由于考虑后续迁移内部使用的话,所以移除了注册功能,改为管理员后台添加方式,默认登录:liwanlei 密码:liwanlei
3.部分功能调试还存在一定的问题,欢迎各位多提宝贵意见,

部分功能欠缺:

1.定时任务的持久化,现在处理容易受到运行过程中的宕机等情况重新启动服务器的定时任务全部需要开启
2、mock接口只能支持单一的path
3. 测试环境没有改为动态配置,动态支持。目前测试环境管理以及上线
4。部分地方可能还会有不严谨性,但是工具可以使用。
5、目前仅支持 http请求中的json格式的,
6.大家可以多提意见,后续会优化,最近一直熬夜加班可能有些消息不能及时回复,还望谅解。

作者寄语:

前进的道路我们充满着迷茫,

前进的每一步我们都会有收获。

路在脚下,我们决定不了我们的出身,但是我们可以努力改变我们未来。

告别昨天失败的自己,努力拼搏今天,成就美好明天

有问题可以联系我:QQ:952943386 email:leileili126@163.com qq群:194704520 新群:683894834

微信打赏

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: