-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_prototype.py
81 lines (49 loc) · 1.77 KB
/
_prototype.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from aiohttp import web
from datetime import datetime
from aiows import aiows, ActionsRegistry, send, broadcast, send_pubsub
routes = web.RouteTableDef()
aiows = aiows()
actions = ActionsRegistry(prefix='test')
aiows.register_actions(actions)
@routes.get('/ws/')
async def ws(request: web.Request):
await aiows.client_connect(request, request.query.get('id', 1))
@aiows.on('hello', payload_schema=RequestSchema(), prefix='CWARS')
@aiows.swagger(response_schema=ResponseSchema(), tags=['aaa'])
async def hello_action(payload, client_id):
return {'mirror': payload}
@aiows.on('update1', swagger=False)
async def update_action(payload, client_id):
await aiows.send('update_answer', client_id, {'some': 'data'})
@actions.on('update2')
async def update_action(payload, client_id):
await aiows.broadcast('updated', {'some': 'data'})
@actions.on
async def update_action(payload, client_id):
await aiows.broadcast('updated', {'some': 'data'})
@aiows.on_pubsub('hello', payload_schema=RequestSchema())
async def hello_action(payload, client_id):
pass
@aiows.on_pubsub('hello', payload_schema=RequestSchema())
async def hello_action(payload, client_id, request, request_id):
pass
@aiows.on_error()
async def on_error(payload, client_id):
pass
@aiows.on_pubsub_error()
async def on_error(payload):
pass
async def some_worker():
# From another process
client_id = 1
await send(client_id, data={}, request_id=None)
await broadcast(data={}, request_id=None)
await send_pubsub(data={}, request_id=None)
app = web.Application()
app.add_routes(routes)
app.add_routes(aiows.swagger(prefix=''))
app.cleanup_ctx.append(aiows.start_ctx)
if __name__ == '__main__':
import os
os.environ['YZCONFIG_MODULE'] = 'settings'
web.run_app(app)