-
Notifications
You must be signed in to change notification settings - Fork 2
/
sync.py
143 lines (105 loc) · 4.57 KB
/
sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""The user sync module for handling user changes in the directory service."""
import admin
from appengine_config import JINJA_ENVIRONMENT
from config import PATHS
from datastore import Notification
from datastore import NotificationChannel
from error_handlers import Handle500
from googleapiclient import errors
from google_directory_service import GoogleDirectoryService
import json
import webapp2
def _RenderNotificationsTemplate():
"""Render a list of notifications."""
notifications = Notification.GetAll()
template_values = {
'notifications': notifications
}
template = JINJA_ENVIRONMENT.get_template('templates/notifications.html')
return template.render(template_values)
def _RenderChannelsListTemplate():
"""Render a list of notification channels."""
channels = NotificationChannel.GetAll()
template_values = {
'channels': channels
}
html_file_name = 'templates/notification_channels.html'
template = JINJA_ENVIRONMENT.get_template(html_file_name)
return template.render(template_values)
class PushNotificationHandler(webapp2.RequestHandler):
"""Receive an update about users in the datastore."""
# pylint: disable=too-few-public-methods
def post(self):
"""Receive push notifications and issue some sort of response."""
state = self.request.headers.get('X-Goog-Resource-State')
number = self.request.headers.get('X-Goog-Message-Number')
json_body = self.request.body
body_object = json.loads(json_body)
uuid = body_object['id']
email = body_object['primaryEmail']
Notification.Insert(state=state, number=number, uuid=uuid, email=email)
self.response.write('Got a notification!')
class DefaultPathHandler(webapp2.RequestHandler):
"""Base page for all pages under /sync."""
# pylint: disable=too-few-public-methods
@admin.OAUTH_DECORATOR.oauth_required
@admin.RequireAppOrDomainAdmin
def get(self):
"""Redirect to the list of previous notifications."""
self.redirect(PATHS['notifications_list'])
class ListChannelsHandler(webapp2.RequestHandler):
"""List all the channels currently receiving notifications."""
# pylint: disable=too-few-public-methods
@admin.OAUTH_DECORATOR.oauth_required
@admin.RequireAppOrDomainAdmin
def get(self):
"""List all the channels and associated properties in the datastore."""
self.response.write(_RenderChannelsListTemplate())
class ListNotificationsHandler(webapp2.RequestHandler):
"""List all the notifications previously received."""
# pylint: disable=too-few-public-methods
@admin.OAUTH_DECORATOR.oauth_required
@admin.RequireAppOrDomainAdmin
def get(self):
"""List all previous notifications received."""
self.response.write(_RenderNotificationsTemplate())
class WatchUserDeleteEventHandler(webapp2.RequestHandler):
"""Watch users in the directory API for deletion events."""
# pylint: disable=too-few-public-methods
@admin.OAUTH_DECORATOR.oauth_required
@admin.RequireAppOrDomainAdmin
def get(self):
"""Perform a pub/sub for user delete events."""
try:
directory_service = GoogleDirectoryService(admin.OAUTH_DECORATOR)
directory_service.WatchUsers('delete')
self.redirect(PATHS['notification_channels_list'])
except errors.HttpError as error:
self.response.write('An error occurred: ' + str(error))
class UnsubscribeHandler(webapp2.RequestHandler):
"""Unsubsribe from notifications for a specific resouce."""
# pylint: disable=too-few-public-methods
@admin.OAUTH_DECORATOR.oauth_required
@admin.RequireAppOrDomainAdmin
def get(self):
"""Find the channel specified and unsubscribe from it."""
datastore_id = int(self.request.get('id'))
entity = NotificationChannel.Get(datastore_id)
try:
directory_service = GoogleDirectoryService(admin.OAUTH_DECORATOR)
directory_service.StopNotifications(entity)
self.redirect(PATHS['notification_channels_list'])
except errors.HttpError as error:
self.response.write('An error occurred: ' + str(error))
APP = webapp2.WSGIApplication([
(PATHS['receive_push_notifications'], PushNotificationHandler),
(PATHS['sync_top_level_path'], DefaultPathHandler),
(PATHS['notification_channels_list'], ListChannelsHandler),
(PATHS['notifications_list'], ListNotificationsHandler),
(PATHS['watch_for_user_deletion'], WatchUserDeleteEventHandler),
(PATHS['unsubscribe_from_notifications'], UnsubscribeHandler),
(admin.OAUTH_DECORATOR.callback_path,
admin.OAUTH_DECORATOR.callback_handler()),
], debug=True)
# This is the only way to catch exceptions from the oauth decorators.
APP.error_handlers[500] = Handle500