-
Notifications
You must be signed in to change notification settings - Fork 49
/
ctaTask.py
83 lines (73 loc) · 2.83 KB
/
ctaTask.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# encoding: UTF-8
'''
本文件中包含的是回测任务模块,用于回测任务的管理。
'''
import os
from datetime import datetime
import multiprocessing
########################################################################
def openLog(name):
os.system('notepad log\{}.log'.format(name))
########################################################################
class ctaTask(multiprocessing.Process):
"""
回测任务进程对象
"""
#----------------------------------------------------------------------
def __init__(self, name='', target= None, args=(), kwargs={},mode='front',showfunc=None,outq=None,runmode='bar'):
"""构造函数"""
super(ctaTask,self).__init__(target=target,args=args,kwargs=kwargs)
self.outq = outq
self.name = name
self.mode = mode
self.runmode = runmode
self.startTM = None
self.runTM = None
self.showfunc = showfunc
if isinstance(args[0],dict) and 'name' in args[0]:
args[0]['name'] = '.'.join([self.name,args[0]['name']])
self.setting = args[0]
else:
self.setting = {}
self.results = {}
self.state = u'等待中'
#----------------------------------------------------------------------
def stopTask(self):
"""停止任务"""
self.state = u'已停止'
try:
self.runTM = datetime.now()-self.startTM
self.terminate()
except:
self.runTM = None
#----------------------------------------------------------------------
def startTask(self):
"""开始任务"""
self.startTM = datetime.now()
self.daemon = True
self.start()
#----------------------------------------------------------------------
def update(self,setting,results,state):
"""更新任务完成"""
if results:
self.setting,self.results = setting,results
self.state = state
if self.state == u'已完成':
self.runTM = datetime.now()-self.startTM
#----------------------------------------------------------------------
def run(self):
"""任务函数"""
if self._target:
setting,results = self._target(*self._args, **self._kwargs)
self.outq.put(tuple([self.name,setting,results]))
#----------------------------------------------------------------------
def show(self):
"""显示结果"""
if self.state == u'已完成':
self.showfunc(self.results)
#----------------------------------------------------------------------
def log(self):
"""显示结果"""
if self.state == u'已完成':
p = multiprocessing.Process(target = openLog, args=(self.setting.get('name'),))
p.start()