forked from hunternet93/ddp_asyncio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
example_watch_todos.py
87 lines (61 loc) · 3.08 KB
/
example_watch_todos.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
'''An example application that uses ddp_asyncio to watch all public to-do lists from Meteor's reference Todos application (https://github.com/meteor/todos)'''
import asyncio
import sys
from ddp_asyncio import DDPClient
class TodoWatcher:
def __init__(self, address):
self.client = DDPClient(address)
self.lists_subs = {}
async def watch_lists(self, lists, lists_q):
while True:
event = await lists_q.get()
if event.type == 'added':
print('List created: "{}"'.format(event.fields.name))
sub = await self.client.subscribe('todos.inList', {'listId': event._id})
self.lists_subs[event._id] = sub
elif event.type == 'changed':
if event.fields.get('name'):
print('List renamed to "{}"'.format(event.fields.name))
elif event.type == 'removed':
print('List deleted: "{}"'.format(event._id))
await self.client.unsubscribe(self.lists_subs[event._id])
del self.lists_subs[event._id]
async def watch_todos(self, todos, todos_q):
while True:
event = await todos_q.get()
if event.type == 'added':
print('Task created: "{}"'.format(event.fields.text))
elif event.type == 'changed':
if event.fields.get('name'):
print('Task changed to "{}"'.format(event.fields.text))
if not event.fields.get('checked') == None:
if event.fields.checked:
print('Task marked complete: "{}"'.format(todos[event._id].text))
else:
print('Task marked incomplete: "{}"'.format(todos[event._id].text))
elif event.type == 'removed':
print('Task deleted: "{}"'.format(event._id))
async def go(self, loop):
print('Connecting to server...')
while True:
try:
session = await self.client.connect()
except ConnectionRefusedError or ConnectionResetError:
await asyncio.sleep(1)
continue
print('Connected to server.')
lists = self.client.get_collection('lists')
lists_q = lists.get_queue()
lists_task = loop.create_task(self.watch_lists(lists, lists_q))
todos = self.client.get_collection('todos')
todos_q = todos.get_queue()
todos_task = loop.create_task(self.watch_todos(todos, todos_q))
sub = await self.client.subscribe('lists.public')
await sub.wait()
await self.client.disconnection()
print('Lost connection to server, attempting to reestablish...')
lists_task.cancel()
todos_task.cancel()
loop = asyncio.get_event_loop()
td = TodoWatcher(sys.argv[1])
loop.run_until_complete(td.go(loop))