diff --git a/.gitignore b/.gitignore index ded6067..e69de29 100644 --- a/.gitignore +++ b/.gitignore @@ -1,36 +0,0 @@ -*.py[cod] - -# C extensions -*.so - -# Packages -*.egg -*.egg-info -dist -build -eggs -parts -bin -var -sdist -develop-eggs -.installed.cfg -lib -lib64 -__pycache__ - -# Installer logs -pip-log.txt - -# Unit test / coverage reports -.coverage -.tox -nosetests.xml - -# Translations -*.mo - -# Mr Developer -.mr.developer.cfg -.project -.pydevproject diff --git a/config/__init__.pyc b/config/__init__.pyc new file mode 100644 index 0000000..a3b9f55 Binary files /dev/null and b/config/__init__.pyc differ diff --git a/config/sysconfig.pyc b/config/sysconfig.pyc new file mode 100644 index 0000000..3d24d77 Binary files /dev/null and b/config/sysconfig.pyc differ diff --git a/lib/__init__.py b/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/dealString.py b/lib/dealString.py new file mode 100644 index 0000000..37f9311 --- /dev/null +++ b/lib/dealString.py @@ -0,0 +1,89 @@ +# -*- coding:utf-8 -*- +# string lib +# split the value check + +import string +from my_log import * + +def format_check(str): + value = str.split(',') + return value + +# compare the expect value with the real value +# oldv : the expect value - string format +# newv : the real value - string format +# value : check point - list +# return TRUE or FALSE +def compare_value(oldv, newv ,check=None): + result = {} + try: + expect_value = oldv + real_value = eval(newv) + except: + print 'Get expect value errro.' + logger.error("Get expect value errro.") + exit() + # value == "" + if value == "": + for key in expect_value.keys(): + if isinstance(expect_value[key], dict): # 检查该key对应的value是否为dict类型 + compare_value(expect_value[key], real_value[key]) + else: + if expect_value[key] == real_value[key]: + return True + else: + return False + else: + pass + +# deal false & true & NONE in json +# tran "false true " into "False True" +# return: string +def re_str(str): + ''' resolve string ''' + try: + str = list(str) + length = len(str) + for n in range(length): + if str[n] == 'f': + if str[n-1] == ':' and str[n+1] == 'a': + str[n] = 'F' + else: + continue + elif str[n] == 't': + if str[n-1] == ':' and str[n+1] == 'r': + str[n] = 'T' + else: + continue + elif str[n] == 'n': + if str[n-1] == ':' and str[n+1] == 'u' and str[n+2] == 'l' and str[n+3] == 'l': + str[n] = 'N' + str[n+1] = 'o' + str[n+2] = 'n' + str[n+3] = 'e' + elif str[n] == '"': + if str[n-1] not in [',',':','{','}','"','\\'] and str[n+1] not in [',',':','{','}','"']: + str.append('') + for x in range(len(str),n,-1): + str[x-1] = str[x-2] + str[n] = "\\" + else: + continue + else: + continue + str = "".join(str) + return str + except: + print "deal the string error." + logger.error('tran json into dict occur error.') + exit() + +# compare all keys +def list_all_dict(dict_a): + if isinstance(dict_a,dict) : #使用isinstance检测数据类型 + for x in range(len(dict_a)): + temp_key = dict_a.keys()[x] + temp_value = dict_a[temp_key] + print"%s : %s" %(temp_key,temp_value) + list_all_dict(temp_value) #自我调用实现无限遍历 + diff --git a/lib/mutilThread.py b/lib/mutilThread.py new file mode 100644 index 0000000..4e00143 --- /dev/null +++ b/lib/mutilThread.py @@ -0,0 +1,54 @@ +# -*- coding:utf-8 -*- + +import time +import threading + +class TestQ(threading.Thread): + def __init__(self, api, cases): + threading.Thread.__init__(self) + self.api = api + print self.api + self.cases = cases + + def run(self): + if self.api['protocol'] == 'http': + for n in range(len(self.cases)): + print "执行测试用例:" + str(self.cases[n]['cid']) + conn = httplib.HTTPSConnection(self.api['host']) + apiUrl = self.api['url'] + "?" + self.cases[n]['all_param'] + conn.request(api['method'], apiUrl) + backinfo = conn.getresponse() + ret_res = backinfo.read() + now_res = eval(re_str(ret_res)) + # deal wish result. + expect = self.cases[n]['wish'] + if str(expect)[:1] == 'a': + dependa = expect[3:] + dependapi = eval(dependa) + depend_key = dependapi.keys() + depend_value = dependapi.values() + depend_expect = one_case_run(depend_key[0], depend_value[0]) + else: + depend_expect = cases[n]['wish'] + + print " 预期结果: " + str(depend_expect) + print " 实际结果: " + str(now_res) + if self.cases[n]['check'] == 'no': + print " 校验全部字段值." + if now_res == depend_expect: + print " " + self.cases[n]['des'] + " test success." + else: + print " " + self.cases[n]['des'] + " test failed" + else: + check_param = format_check(cases[n]['check']) + print " 校验参数:" + str(check_param) + compare_res = compare_value(check_param, depend_expect, re_str(now_res)) + if 'False' in compare_res: + print ' ' + self.cases[n]['des'] + 'test failed' + else: + print " " + self.cases[n]['des'] + 'test success' + else: + pass + + def stop(self): + self.thread_stop = True \ No newline at end of file diff --git a/lib/my_log.py b/lib/my_log.py new file mode 100644 index 0000000..281d6e6 --- /dev/null +++ b/lib/my_log.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- + +import os +import logging +import logging.config + +LOG_FILENAME = 'logging.conf' +os.chdir("/usr/home/zeyang/autoapi/config/") +logging.config.fileConfig(LOG_FILENAME) +logger = logging.getLogger("running error info") diff --git a/lib/resolveXML.py b/lib/resolveXML.py new file mode 100644 index 0000000..7e523ad --- /dev/null +++ b/lib/resolveXML.py @@ -0,0 +1,140 @@ +#!/usr/bin/python +# -*- coding:utf-8 -*- + +import sys,os +import httplib +from xml.dom import minidom + +sys.path.append("/usr/home/zeyang/autoapi/config") +from sysconfig import * +from my_log import * +import deal_string + +# xml object +class xmlObject(): + os.chdir("/usr/home/zeyang/autoapi/data/case/status/") + + def __init__(self): + return + + def get_attrvalue(self,node,attrname): + if node: + return node.getAttribute(attrname) + else: '' + + def get_nodevalue(self, node, index = 0): + if node: + return node.childNodes[index].nodeValue + else: '' + + def get_xmlnode(self, node, name): + if node: + return node.getElementsByTagName(name) + else: '' + + def get_xml_data(self, filename): + try: + doc = minidom.parse(filename) + root = doc.documentElement + # resolve the XXX.api.xml file + if 'api.xml' in filename: + api_list = {} + + api_host = self.get_xmlnode(root, 'host') + api_url = self.get_xmlnode(root, 'url') + api_apiname = self.get_xmlnode(root, 'apiname') + api_protocol = self.get_xmlnode(root, 'protocol') + api_des = self.get_xmlnode(root, 'des') + api_islogin = self.get_xmlnode(root, 'islogin') + api_method = self.get_xmlnode(root, 'method') + api_ip = self.get_xmlnode(root, 'ip') + api_group = self.get_xmlnode(root, 'group') + + api_host_v = self.get_nodevalue(api_host[0]) + api_url_v = self.get_nodevalue(api_url[0]) + api_apiname_v = self.get_nodevalue(api_apiname[0]) + api_protocol_v = self.get_nodevalue(api_protocol[0]) + api_des_v = self.get_nodevalue(api_des[0]).encode('utf-8') + api_islogin_v = self.get_nodevalue(api_islogin[0]) + api_method_v = self.get_nodevalue(api_method[0]) + api_ip_v = self.get_nodevalue(api_ip[0]) + api_group_v = self.get_nodevalue(api_group[0]) + + api_list['host'], api_list['url'], api_list['apiname'], api_list['protocol'] \ + , api_list['des'], api_list['islogin'], api_list['ip'], api_list['method'], \ + api_list['group'] = api_host_v, api_url_v, api_apiname_v, api_protocol_v, \ + api_des_v,api_islogin_v,api_ip_v,api_method_v,api_group_v + return api_list + + elif 'case.xml' in filename: + os.chdir("/usr/home/zeyang/autoapi/data/case/status") + case_nodes = self.get_xmlnode(root,'case') + case_list = [] + for c in case_nodes: + list = {} + case_des = self.get_xmlnode(c, 'des') + case_wish = self.get_xmlnode(c, 'wish') + case_run = self.get_xmlnode(c, 'run') + case_cid = self.get_xmlnode(c, 'cid') + + case_des_v = self.get_nodevalue(case_des[0]).encode('utf-8') + case_wish_v = self.get_nodevalue(case_wish[0]) + case_run_v = self.get_nodevalue(case_run[0]) + case_cid_v = self.get_nodevalue(case_cid[0]) + # expect value must be format before U use. + if case_wish_v[:1] == '{': + b = deal_string.re_str(case_wish_v) + a = eval(b) + else: + a = case_wish_v + case_check_v = self.get_attrvalue(case_wish[0], 'check') + # get the param list + p_nodes = self.get_xmlnode(c, 'p') + # get need run case + p_list = {} + for p in p_nodes: + p_key = self.get_nodevalue(p) + p_value = self.get_attrvalue(p, 'value') + p_list[p_key] = p_value + if case_check_v == '': + list['des'],list['wish'],list['params'],list['all_param'],list['check'], list['run'], list['cid']= \ + case_des_v, a, p_list, self.format_param(p_list), 'no', case_run_v, case_cid_v + else: + list['des'],list['wish'],list['params'],list['all_param'],list['check'], list['run'], list['cid'] = \ + case_des_v, a, p_list, self.format_param(p_list), case_check_v, case_run_v, case_cid_v + case_list.append(list) + return case_list + + else: + plan_person = self.get_xmlnode(root, 'person') + plan_times = self.get_xmlnode(root, 'times') + + plan_person_v = self.get_nodevalue(plan_person[0]) + plan_times_v = self.get_nodevalue(plan_times[0]) + + plan_list = {} + plan_api = self.get_xmlnode(root,'api') + plan_api_list = {} + for plan in plan_api: + api = self.get_nodevalue(plan) + group = self.get_attrvalue(api, 'group') + plan_api_list[gourp] = api + plan_list['person'], plan_list['times'], plan_list['api'] = plan_person_v, plan_times_v, plan_api_list + return plan_list + except Exception, e: + logger.error(e) + + ''' + # format the params + # old: a=1 b=2 c=3 + # new: a=1&b=2&c=3 + ''' + def format_param(self, params): + try: + pstr = '' + for key in params.keys(): + pstr = pstr + str(key) + "=" + params[key] + "&" + return pstr[:-1] + except: + print 'format the case params error.' + logger.error('when deal the case.xml, format params occur error.') diff --git a/lib/testLogic.py b/lib/testLogic.py new file mode 100644 index 0000000..583416b --- /dev/null +++ b/lib/testLogic.py @@ -0,0 +1,125 @@ +#!/usr/bin/python +# -*- coding:utf-8 -*- + +import resolveXML + +# before the test running +# resolve the api.xml and case.xml +# data prepare +def before_test(): + """ 获取测试计划 """ + print "-" * 30 + "Test Start" + "-" * 30 + try: + plan_filename = sys.argv[1] + except IndexError,e: + print "there is no test plan. stop running ..." + logger.error(e) + exit() + xml = resolveXML.xmlObject() + plan = xml.get_xml_data(plan_filename) + print "待测接口列表:" + str(plan['api']) + + for n in range(len(plan['api'])): + os.chdir("/usr/home/zeyang/autoapi/data/api/status/") + + # 获取api的配置信息 + api_xml = xmlObject() + api_filename = plan['api'][n] + ".api.xml" + api = api_xml.get_xml_data(api_filename) + + # 获取case的配置信息 + os.chdir("/usr/home/zeyang/autoapi/data/case/status/") + case_xml = xmlObject() + case_filename = plan['api'][n] + ".case.xml" + cases = case_xml.get_xml_data(case_filename) + to_run_case = pick_run_case(cases) + print str(n+1) + ". " + plan['api'][n] + " ----- case num:" + str(len(to_run_case)) + + testQ = TestQ(plan['api'][n], to_run_case) + testQ.start() + time.sleep(10) + testQ.stop + print str(plan['api'][n]) + "执行完成。" + +# simple run one case +# param : string(apiname), string(caseid) +# return : re_str(res) +def one_case_run(apiname, caseid): + os.chdir("/usr/home/zeyang/autoapi/data/api/status/") + oooxml = xmlObject() + apifile = apiname + ".api.xml" + hhhxml = xmlObject() + casefile = apiname + ".case.xml" + api = oooxml.get_xml_data(apifile) + os.chdir("/usr/home/zeyang/autoapi/data/case/status/") + allcase = hhhxml.get_xml_data(casefile) + case = {} + for i in range(len(allcase)): + for key in allcase[i]: + if caseid == allcase[i]['cid']: + case = allcase[i] + else: + continue + if api['protocol'] == 'http': + conn = httplib.HTTPConnection(api['host']) + else: + conn = httplib.HTTPSConnection(api['host']) + apiUrl = api['url'] + "?" + case['all_param'] + conn.request(api['method'], apiUrl) + backinfo = conn.getresponse() + res = backinfo.read() + return re_str(res) + +# http or https request +def run_test(api, cases): + if api['protocol'] == 'http': + for n in range(len(cases)): + print "执行测试用例:" + str(cases[n]['cid']) + conn = httplib.HTTPSConnection(api['host']) + apiUrl = api['url'] + "?" + cases[n]['all_param'] + conn.request(api['method'], apiUrl) + backinfo = conn.getresponse() + ret_res = backinfo.read() + now_res = eval(re_str(ret_res)) + # deal wish result. + expect = cases[n]['wish'] + if str(expect)[:1] == 'a': + dependa = expect[3:] + dependapi =eval(dependa) + depend_key = dependapi.keys() + depend_value = dependapi.values() + depend_expect = one_case_run(depend_key[0], depend_value[0]) + else: + depend_expect = cases[n]['wish'] + + print " 预期结果: " + str(depend_expect) + print " 实际结果: " + str(now_res) + if cases[n]['check'] == 'no': + print " 校验全部字段值." + if now_res == depend_expect: + print " " + cases[n]['des'] + " test success." + else: + print " " + cases[n]['des'] + " test failed" + else: + check_param = format_check(cases[n]['check']) + print " 校验参数:" + str(check_param) + compare_res = compare_value(check_param, depend_expect, re_str(now_res)) + if 'False' in compare_res: + print ' ' + cases[n]['des'] + 'test failed' + else: + print " " + cases[n]['des'] + 'test success' + else: + pass + + + +# pick up all the running case. +def pick_run_case(case): + ''' pick up the running case while run = 1 ''' + run_case = [] + for n in range(len(case)): + if case[n]['run'] == '1': + run_case.append(case[n]) + else: + continue + return run_case \ No newline at end of file