-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.py
165 lines (134 loc) · 4.3 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import os
import threading
import itchat
import importlib
from itchat.content import *
from tools import *
class Platform:
def __init__(self):
self.plugins = []
self.threads = []
self.loadPlugins()
self.loadThreads()
@staticmethod
def pluginStart(from_):
print("读取插件 %s." % from_)
@staticmethod
def pluginStop(from_):
print("结束插件 %s." % from_)
@staticmethod
def pluginNotRun(from_):
print("插件已禁用 %s." % from_)
@staticmethod
def pluginRun(from_):
print("插件已启用 %s." % from_)
@staticmethod
def threadStart(from_):
print("读取线程 %s." % from_)
@staticmethod
def threadStop(from_):
print("线程停止 %s." % from_)
@staticmethod
def threadNotRun(from_):
print("线程已禁用 %s." % from_)
@staticmethod
def threadRun(from_):
print("线程已启用 %s." % from_)
# 重新加载插件
def reload(self):
self.plugins = []
self.loadPlugins()
# 重新加载线程
def reloadThreads(self):
self.shutdownThreads()
self.threads = []
self.loadThreads()
# 加载线程文件夹
def loadThreads(self):
for filename in os.listdir("threads"):
if not filename.endswith(".py") or filename.startswith("_"):
continue
self.runThead(filename)
self.threads.sort(key=lambda i: i.priority)
# 加载插件文件夹
def loadPlugins(self):
for filename in os.listdir("plugins"):
if not filename.endswith(".py") or filename.startswith("_"):
continue
self.runPlugin(filename)
self.plugins.sort(key=lambda i: i.priority)
# 读取线程数据
def runThead(self, filename):
threadName = os.path.splitext(filename)[0]
_threads = __import__("threads." + threadName, fromlist=[threadName])
importlib.reload(_threads)
clazz = _threads.getPluginClass()
o = clazz()
o.setPlatform(self)
o.isStart()
# 进程启动状态
if o.status:
self.threads.append(o)
o.start()
o.inRun()
else:
o.notRun()
o.setPlatform(None)
# 读取插件数据
def runPlugin(self, filename):
pluginName = os.path.splitext(filename)[0]
plugin = __import__("plugins." + pluginName, fromlist=[pluginName])
importlib.reload(plugin)
clazz = plugin.getPluginClass()
o = clazz()
o.setPlatform(self)
o.start()
# 插件启动状态
if o.status:
self.plugins.append(o)
o.inRun()
else:
o.notRun()
o.setPlatform(None)
# 结束所有线程
def shutdownThreads(self):
for i in self.threads:
i.stop()
i.setPlatform(None)
# 结束所有插件
def shutdown(self):
data = []
for o in self.plugins:
if o.pluginName == "TianDao":
data.append(o)
continue
o.stop()
o.setPlatform(None)
self.plugins = data
# 插件运行
def botRunPlugin(self, msg, tag):
for o in self.plugins:
try:
if o.asynch:
threading.Thread(target=o.run(msg, tag), name=o.pluginName).start()
else:
o.run(msg, tag)
except:
print(o.pluginName + "线程错误")
if __name__ == "__main__":
platform = Platform()
@itchat.msg_register(itchat.content.PICTURE)
def image_reply(msg):
platform.botRunPlugin(msg, "PICTURE")
@itchat.msg_register(itchat.content.PICTURE, isGroupChat=True)
def image_reply(msg):
platform.botRunPlugin(msg, "PICTUREGroup")
@itchat.msg_register(itchat.content.TEXT)
def text_reply(msg):
platform.botRunPlugin(msg, "TEXT")
@itchat.msg_register(TEXT, isGroupChat=True)
def text_reply(msg):
platform.botRunPlugin(msg, "TEXTGroup")
# 开启微信机器人
# startBot(False)
startBot(2)