forked from M-Mueller/mattermost-poll
-
Notifications
You must be signed in to change notification settings - Fork 0
/
poll.py
265 lines (233 loc) · 9.64 KB
/
poll.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
import sqlite3
from flask_babel import force_locale, gettext as tr
import settings
class NoMoreVotesError(Exception):
"""Raised when user tries to vote but has no votes left."""
pass
class InvalidPollError(Exception):
"""Raised when Poll creation or loading failed."""
pass
def init_database(con):
"""Initializes the database. Is automatically called the first time a Poll
is created."""
cur = con.cursor()
cur.execute("""PRAGMA user_version = 1""")
cur.execute("""CREATE TABLE IF NOT EXISTS Polls (
poll_id integer PRIMARY KEY,
creator text NOT NULL,
message text NOT NULL,
locale text NOT NULL,
finished integer NOT NULL,
secret integer NOT NULL,
public integer NOT NULL,
max_votes integer NOT NULL,
bars integer NOT NULL)""")
cur.execute("""CREATE TABLE IF NOT EXISTS VoteOptions (
poll_id integer REFERENCES Polls (poll_id)
ON DELETE CASCADE ON UPDATE NO ACTION,
number integer NOT NULL,
name text NOT NULL)""")
cur.execute("""CREATE TABLE IF NOT EXISTS Votes (
poll_id integer REFERENCES Polls (poll_id)
ON DELETE CASCADE ON UPDATE NO ACTION,
voter text NOT NULL,
vote integer NOT NULL,
CONSTRAINT single_vote UNIQUE
(poll_id, voter, vote) ON CONFLICT REPLACE)""")
con.commit()
cur.execute("""PRAGMA table_info(Polls)""")
if 'bars' not in (c[1] for c in cur.fetchall()):
print("Outdated database version detected, adding 'bars' column.")
cur.execute("""ALTER TABLE Polls
ADD COLUMN bars integer NOT NULL DEFAULT 0""")
con.commit()
cur.execute("""PRAGMA table_info(Polls)""")
if 'locale' not in (c[1] for c in cur.fetchall()):
print("Outdated database version detected, adding 'locale' column.")
cur.execute("""ALTER TABLE Polls
ADD COLUMN locale text NOT NULL DEFAULT "en" """)
con.commit()
class Poll:
"""The Poll class represents a single poll with a message and multiple
options.
Each user has one vote and can change it until the poll is ended.
Internally each Poll is saved in a database.
Attributes
----------
id:
A unique identifier for this poll
creator_id:
User ID of the user that created the poll.
message:
The message of the poll (may contain markup).
locale:
The locale with which the poll was created.
vote_options:
A list of all available poll choices.
secret: boolean
A secret poll does not show the number of votes until the poll ended.
public: boolean
In a public vote, who voted for what is displayed at the end of the
poll
bars: bollean
Show result as bar chart
max_votes: int
Number of votes each user has.
"""
def __init__(self, connection, id):
"""Loads the poll with `id` from the database connection.
Use `create` or `load` instead.
"""
init_database(connection)
self.connection = connection
self.connection.row_factory = sqlite3.Row
self.id = id
try:
self.vote_options = []
cur = self.connection.cursor()
for (name,) in cur.execute("""SELECT name FROM VoteOptions
WHERE poll_id=?
ORDER BY number ASC""", (self.id,)):
self.vote_options.append(name)
if not self.vote_options:
raise InvalidPollError()
cur.execute("""SELECT creator, message, locale,
secret, public, max_votes, bars FROM Polls
WHERE poll_id=?""", (self.id,))
row = cur.fetchone()
if not row:
raise InvalidPollError()
self.creator_id = row['creator']
self.message = row['message']
self.locale = row['locale']
self.secret = row['secret']
self.public = row['public']
self.max_votes = row['max_votes']
self.bars = row['bars']
except sqlite3.Error as e:
raise InvalidPollError() from e
@classmethod
def create(cls, creator_id, message, locale='en', vote_options=[],
secret=False, public=False, max_votes=1, bars=False):
"""Creates a new poll without any votes.
Empty vote_options will be replaced by ['Yes', 'No'].
"""
con = sqlite3.connect(settings.DATABASE)
init_database(con)
cur = con.cursor()
if not vote_options:
with force_locale(locale):
vote_options = [tr('Yes'), tr('No')]
# clamp to 1 to len(vote_options)
max_votes = max(1, min(max_votes, len(vote_options)))
cur.execute("""INSERT INTO Polls
(creator, message, locale, finished,
secret, public, max_votes, bars) VALUES
(?, ?, ?, ?, ?, ?, ?, ?)""",
(creator_id, message, locale, False,
secret, public, max_votes, bars))
id = cur.lastrowid
for number, name in enumerate(vote_options):
cur.execute("""INSERT INTO VoteOptions
(poll_id, name, number) VALUES
(?, ?, ?)""",
(id, name, number))
con.commit()
return cls(con, id)
@classmethod
def load(cls, id):
"""Loads a poll from the database.
Raise a InvalidPollError if no poll with that id
exists.
"""
con = sqlite3.connect(settings.DATABASE)
return cls(con, id)
def num_votes(self):
"""Returns the total number of votes."""
cur = self.connection.cursor()
cur.execute("""SELECT * FROM Votes WHERE poll_id=?""",
(self.id,))
return len(cur.fetchall())
def num_voters(self):
"""Returns the total number of users which voted."""
cur = self.connection.cursor()
cur.execute("""SELECT COUNT(vote) FROM Votes
WHERE poll_id=?
GROUP BY voter""",
(self.id,))
return len(cur.fetchall())
def count_votes(self, vote_id):
"""Returns the number of votes for the given option.
The `vote_id` is the index of an option in the vote_options.
If the vote_id does not exists, 0 is returned.
"""
cur = self.connection.cursor()
cur.execute("""SELECT * FROM Votes
WHERE poll_id=? AND vote=?""",
(self.id, vote_id))
return len(cur.fetchall())
def voters(self, vote_id):
"""Returns all voters for a given vote_id."""
cur = self.connection.cursor()
cur.execute("""SELECT voter FROM Votes
WHERE poll_id=? AND vote=?""",
(self.id, vote_id))
return [voter[0] for voter in cur.fetchall()]
def votes(self, user_id):
"""Returns a list of `vote_id`s the user voted for.
The `vote_id` is the index of an option in the vote_options.
"""
cur = self.connection.cursor()
cur.execute("""SELECT vote FROM Votes
WHERE poll_id=? AND voter=?""",
(self.id, user_id))
return [v[0] for v in cur.fetchall()]
def vote(self, user_id, vote_id):
"""Places a vote of the given user.
The `vote_id` is the index of an option in the vote_options.
Voting is only possible when the poll is not finished yet.
Invalid `vote_id`s raise an IndexError.
Each user only has a single vote.
When voting multiple times for different options, only the
last vote will remain.
"""
if self.is_finished():
return
if vote_id < 0 or vote_id >= len(self.vote_options):
raise IndexError('Invalid vote_id: {}'.format(vote_id))
cur = self.connection.cursor()
votes = self.votes(user_id)
if vote_id in votes:
# unvote
cur.execute("""DELETE FROM Votes
WHERE poll_id=? AND voter=? AND vote=?""",
(self.id, user_id, vote_id))
else:
# check if the user hasn't used all his votes yet
if len(votes) >= self.max_votes:
if self.max_votes == 1:
# remove the other vote automatically
cur.execute("""DELETE FROM Votes
WHERE poll_id=? AND voter=?""",
(self.id, user_id))
else:
raise NoMoreVotesError()
cur.execute("""INSERT INTO Votes
(poll_id, voter, vote) VALUES
(?, ?, ?)""",
(self.id, user_id, vote_id))
self.connection.commit()
def end(self):
"""Ends the poll.
After the poll ends, voting is not possible anymore.
"""
cur = self.connection.cursor()
cur.execute("""UPDATE Polls SET finished=1 WHERE poll_id=?""",
(self.id,))
self.connection.commit()
def is_finished(self):
"""Return True if the poll is finished or False otherwise."""
cur = self.connection.cursor()
cur.execute("""SELECT finished FROM Polls WHERE poll_id=?""",
(self.id,))
return cur.fetchone()[0] == 1