forked from viewfinderco/viewfinder
-
Notifications
You must be signed in to change notification settings - Fork 0
/
NetworkQueue.h
350 lines (301 loc) · 12.1 KB
/
NetworkQueue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
// Copyright 2012 Viewfinder. All rights reserved.
// Author: Peter Mattis.
#ifndef VIEWFINDER_NETWORK_QUEUE_H
#define VIEWFINDER_NETWORK_QUEUE_H
#import <map>
#import "ActivityTable.h"
#import "CommentTable.h"
#import "DB.h"
#import "EpisodeTable.h"
#import "Mutex.h"
#import "PhotoTable.h"
#import "Server.pb.h"
#import "ViewpointTable.h"
class AppState;
class PhotoUpdate;
// These priorities are persisted to disk, so be careful when changing them.
enum {
// Prioritize operations performed explicitly by the user or required by the
// UI. For example, prioritize the retrieval of thumbnail images that need to
// be displayed on screen.
PRIORITY_UI_THUMBNAIL = 10,
PRIORITY_UI_FULL = 20,
PRIORITY_UI_ACTIVITY = 50,
PRIORITY_UI_UPLOAD_PHOTO = 70,
PRIORITY_UI_ORIGINAL = 100,
// Used by NetworkManager to clear all preceding priority levels.
PRIORITY_UI_MAX = 101,
// The priority band for viewpoint updates, such as updating the viewed
// sequence number.
PRIORITY_UPDATE_VIEWPOINT = 300,
// The priority bands for uploading photos that are used by the client for
// display.
PRIORITY_UPLOAD_PHOTO = 400,
PRIORITY_UPLOAD_PHOTO_MEDIUM = 500,
// The priority band for downloading photos that are not visible on the
// screen.
PRIORITY_DOWNLOAD_PHOTO = 550,
// The priority band for uploading original images.
PRIORITY_UPLOAD_PHOTO_ORIGINAL = 600,
PRIORITY_MAX = 1000,
};
// The NetworkQueue maintains a map from:
// <priority>,<sequence> -> <ServerOperation>
// <device-photo-id> -> <priority>,<sequence>
//
// Both priorities and sequence numbers are sorted such that lower values come
// before higher values. The sequence number is internally allocated and
// ensures FIFO for operations with the same priority.
class NetworkQueue {
typedef CallbackSet1<int> DownloadCallbackSet;
typedef std::unordered_map<
int64_t, DownloadCallbackSet*> DownloadCallbackMap;
public:
class Iterator {
public:
Iterator(leveldb::Iterator* iter);
~Iterator();
// Advance to the next queued operation.
void Next();
// Skip to the next priority band containing a queued operation.
void SkipPriority();
const ServerOperation& op() const { return op_; }
bool done() const { return done_; }
int priority() const { return priority_; }
int64_t sequence() const { return sequence_; }
private:
bool UpdateState();
private:
ScopedPtr<leveldb::Iterator> iter_;
bool done_;
int priority_;
int64_t sequence_;
ServerOperation op_;
};
enum PhotoType {
THUMBNAIL = PhotoMetadata::THUMBNAIL,
MEDIUM = PhotoMetadata::MEDIUM,
FULL = PhotoMetadata::FULL,
ORIGINAL = PhotoMetadata::ORIGINAL,
};
struct Episode {
EpisodeHandle parent;
EpisodeHandle episode;
vector<PhotoHandle> photos;
};
struct DownloadPhoto {
EpisodeHandle episode;
PhotoHandle photo;
PhotoType type;
string path;
string url;
};
struct RemovePhotos {
OpHeaders headers;
QueueMetadata queue;
vector<Episode> episodes;
};
struct UpdatePhoto {
OpHeaders headers;
PhotoHandle photo;
};
struct UpdateViewpoint {
OpHeaders headers;
ViewpointHandle viewpoint;
};
// UploadActivity handles a locally-created activity which still must be sent
// to the server via share_new, share_existing, add_followers, post_comment
// or unshare operations.
struct UploadActivity {
OpHeaders headers;
ActivityHandle activity;
ViewpointHandle viewpoint;
// For shares.
vector<Episode> episodes;
// For shares & add_followers.
vector<ContactMetadata> contacts;
// For post comment.
CommentHandle comment;
};
struct UploadEpisode {
OpHeaders headers;
EpisodeHandle episode;
vector<PhotoHandle> photos;
};
struct UploadPhoto {
EpisodeHandle episode;
PhotoHandle photo;
PhotoType type;
string url;
string path;
string md5;
};
public:
NetworkQueue(AppState* state);
~NetworkQueue();
// Add an operation with the specified priority to the queue. Note that
// nothing in the NetworkQueue prohibits the same operation from being
// enqueued multiple times at the same or different priorities. Returns the
// sequence number of the newly added entry.
int64_t Add(int priority, const ServerOperation& op, const DBHandle& updates);
// Remove the operation with the specified priority and sequence number from
// the queue. This should only be called once the operation has completed and
// before retrieving the next operation from the queue.
void Remove(int priority, int64_t sequence, const DBHandle& updates);
void Remove(int priority, int64_t sequence,
const ServerOperation& op, const DBHandle& updates);
// Queue the specified photo, possibly moving it from its current position in
// the queue to a new position. Returns true iff the photo was modified.
bool QueuePhoto(const PhotoHandle& ph, const DBHandle& updates);
// Dequeue the specified photo. Returns true iff the photo was modified.
bool DequeuePhoto(const PhotoHandle& ph, const DBHandle& updates);
// Queue the specified activity. Returns true iff the activity was modified.
bool QueueActivity(const ActivityHandle& ah, const DBHandle& updates);
// Dequeue the specified activity. Returns true iff the activity was
// modified.
bool DequeueActivity(const ActivityHandle& ah, const DBHandle& updates);
// Queue the specified viewpoint. Returns true iff the viewpoint was modified.
bool QueueViewpoint(const ViewpointHandle& vh, const DBHandle& updates);
// Dequeue the specified viewpoint. Returns true iff the viewpoint was
// modified.
bool DequeueViewpoint(const ViewpointHandle& vh, const DBHandle& updates);
// Returns a new Iterator object for iterating over the queued
// operations. The caller is responsible for deleting the iterator.
Iterator* NewIterator();
// Returns true iff nothing is queued.
bool Empty();
// Returns the priority of the item on the top of the queue, or -1 if nothing
// is queued.
int TopPriority();
/// Returns the (adjusted) number of pending network operations.
int GetNetworkCount();
int GetDownloadCount();
int GetUploadCount();
// Returns true iff we should process the given priority band given the
// network (wifi vs 3g/lte) and other settings.
bool ShouldProcessPriority(int priority) const;
// Returns true iff the given priority band corresponds to a download.
bool IsDownloadPriority(int priority) const;
// Commit queued requests.
void CommitQueuedDownloadPhoto(const string& md5, bool retry);
void CommitQueuedRemovePhotos(bool error);
void CommitQueuedUpdatePhoto(bool error);
enum UpdateViewpointType {
UPDATE_VIEWPOINT_METADATA,
UPDATE_VIEWPOINT_FOLLOWER_METADATA,
UPDATE_VIEWPOINT_REMOVE,
UPDATE_VIEWPOINT_VIEWED_SEQ,
};
void CommitQueuedUpdateViewpoint(UpdateViewpointType type, bool error);
void CommitQueuedUploadEpisode(const UploadEpisodeResponse& r, int status);
void CommitQueuedUploadPhoto(bool error);
void CommitQueuedUploadActivity(bool error);
// Process server responses.
void ProcessQueryEpisodes(
const QueryEpisodesResponse& r, const vector<EpisodeSelection>& v,
const DBHandle& updates);
void ProcessQueryFollowed(
const QueryFollowedResponse& r, const DBHandle& updates);
void ProcessQueryNotifications(
const QueryNotificationsResponse& r, const DBHandle& updates);
void ProcessQueryViewpoints(
const QueryViewpointsResponse& r, const vector<ViewpointSelection>& v,
const DBHandle& updates);
// Wait for the specified photo to be downloaded, invoking "done" when the
// photo has been downloaded or an error has occurred. It is the callers
// responsibility to ensure that the specified photo has been queued for
// download.
void WaitForDownload(
int64_t photo_id, PhotoType desired_type, Callback<void ()> done);
// Returns a map containing counts of enqueued operations by priority.
// Should be converted to an integer (with ceil()) before display.
// Fractional values are used to compensate for the fact that one user action
// may result in multiple queued operations (e.g. uploading metadata,
// thumbnail, medium, full sizes are 0.25 each so the counter goes up by one
// for each photo taken).
typedef std::map<int, double> NetworkStatsMap;
NetworkStatsMap stats();
const DownloadPhoto* queued_download_photo() const {
return queued_download_photo_.get();
}
const RemovePhotos* queued_remove_photos() const {
return queued_remove_photos_.get();
}
const UpdatePhoto* queued_update_photo() const {
return queued_update_photo_.get();
}
const UpdateViewpoint* queued_update_viewpoint() const {
return queued_update_viewpoint_.get();
}
const UploadEpisode* queued_upload_episode() const {
return queued_upload_episode_.get();
}
const UploadPhoto* queued_upload_photo() const {
return queued_upload_photo_.get();
}
const UploadActivity* queued_upload_activity() const {
return queued_upload_activity_.get();
}
private:
void UpdateStatsLocked(int priority, const ServerOperation& op, bool addition);
void EnsureInitLocked();
void EnsureStatsInitLocked();
ActivityHandle ProcessActivity(const ActivityMetadata& m, const DBHandle& updates);
CommentHandle ProcessComment(const CommentMetadata& m, const DBHandle& updates);
EpisodeHandle ProcessEpisode(const EpisodeMetadata& m,
bool recurse, const DBHandle& updates);
PhotoHandle ProcessPhoto(const PhotoUpdate& u, EpisodeHandle* old_eh,
const DBHandle& updates);
PhotoHandle ProcessPhoto(PhotoHandle h, const PhotoUpdate& u,
EpisodeHandle* old_eh, const DBHandle& updates);
ViewpointHandle ProcessViewpoint(const ViewpointMetadata& m,
bool recurse, const DBHandle& updates);
void MaybeQueueNetwork(int priority);
bool MaybeQueueUploadActivity(const ServerOperation& op, int priority,
const DBHandle& updates);
bool MaybeQueueUpdatePhoto(const ServerOperation& op, int priority,
const DBHandle& updates);
bool MaybeQueueRemovePhotos(const ServerOperation& op, int priority,
int64_t sequence, const DBHandle& updates);
bool MaybeQueueDownloadPhoto(const PhotoHandle& ph, const DBHandle& updates);
bool MaybeQueueUpdateViewpoint(const ServerOperation& op, const DBHandle& updates);
bool MaybeQueueUploadPhoto(const PhotoHandle& ph, int priority,
const DBHandle& updates);
bool MaybeQueueUpdatePhotoMetadata(const PhotoHandle& ph, const DBHandle& updates);
bool MaybeQueueUploadEpisode(const EpisodeHandle& eh, const DBHandle& updates);
void MaybeReverseGeocode(UploadEpisode* u, int index);
void MaybeLoadImages(UploadEpisode* u, int index);
void QuarantinePhoto(PhotoHandle p, const string& reason);
void QuarantinePhoto(PhotoHandle p, const string& reason,
const DBHandle& updates);
void UpdateViewpointError(ViewpointHandle vh);
void UpdatePhotoError(PhotoHandle p);
void UploadPhotoError(PhotoHandle p, int types);
void DownloadPhotoError(PhotoHandle p, int types);
void UploadActivityError(ActivityHandle ah);
void NotifyDownload(int64_t photo_id, int types);
private:
AppState* state_;
int64_t next_sequence_;
mutable Mutex mu_;
ScopedPtr<NetworkStatsMap> stats_;
Mutex queue_mu_;
bool queue_in_progress_;
WallTime queue_start_time_;
ScopedPtr<DownloadPhoto> queued_download_photo_;
ScopedPtr<RemovePhotos> queued_remove_photos_;
ScopedPtr<UpdatePhoto> queued_update_photo_;
ScopedPtr<UpdateViewpoint> queued_update_viewpoint_;
ScopedPtr<UploadEpisode> queued_upload_episode_;
ScopedPtr<UploadPhoto> queued_upload_photo_;
ScopedPtr<UploadActivity> queued_upload_activity_;
const string photo_tmp_dir_;
Mutex download_callback_mu_;
DownloadCallbackMap download_callback_map_;
};
string EncodeNetworkQueueKey(int priority, int64_t sequence);
bool DecodeNetworkQueueKey(Slice key, int* priority, int64_t* sequence);
#endif // VIEWFINDER_NETWORK_QUEUE_H
// local variables:
// mode: c++
// end: