-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathuploader.py
158 lines (127 loc) · 4.85 KB
/
uploader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
from __future__ import annotations
import abc
import logging
from queue import Queue
from threading import Thread
from typing import Any
import boto3
logger = logging.getLogger(__name__)
class LiveReadTask:
@abc.abstractmethod
def execute(self, uploader: LiveReadUploader) -> bool:
"""
Execute this task. Return true to terminate the run loop
"""
pass
class LiveReadCreate(LiveReadTask):
def __str__(self) -> str:
return 'Create task'
def execute(self, uploader: LiveReadUploader) -> bool:
uploader.create_multipart_upload()
return False
class LiveReadUpload(LiveReadTask):
def __init__(self, buffer: bytes):
self.buffer = buffer
def __str__(self) -> str:
return f'Upload task with buffer length {len(self.buffer)}'
def execute(self, uploader: LiveReadUploader) -> bool:
uploader.upload_part(self.buffer)
return False
class LiveReadComplete(LiveReadTask):
def __str__(self) -> str:
return 'Complete task'
def execute(self, uploader: LiveReadUploader) -> bool:
uploader.complete_multipart_upload()
return True
class LiveReadUploader(Thread):
"""
LiveReadUploader encapsulates all the logic associated with a Live Read multipart upload.
"""
def __init__(self, bucket: str, key: str):
"""
Initialize a new LiveReadUploader
"""
super().__init__()
self._task_queue: Queue[LiveReadTask] = Queue()
self.parts = []
self.part_number = 1
self.upload_id: str | None = None
self.bucket = bucket
self.key = key
self.bytes_written = 0
# Create a boto3 client based on configuration in .env file
# AWS_ACCESS_KEY_ID=<Your Backblaze Application Key ID>
# AWS_SECRET_ACCESS_KEY=<Your Backblaze Application Key>
# AWS_ENDPOINT_URL=<Your B2 bucket endpoint, with https protocol, e.g. https://s3.us-west-004.backblazeb2.com>
self.b2_client = boto3.client('s3')
logger.debug("Created boto3 client")
self.b2_client.meta.events.register('before-call.s3.CreateMultipartUpload', add_custom_header)
def run(self) -> None:
"""
Loop, reading the task queue, until we complete the upload
"""
logger.info("Starting multipart upload")
while True:
done = self.get_task().execute(self)
if done:
break
logger.info(f"Finished multipart upload. Uploaded {self.bytes_written} bytes")
def put(self, task: LiveReadTask):
"""
Add a task to the queue
"""
self._task_queue.put(task)
def get_task(self, block=True) -> LiveReadTask:
"""
Retrieve the first task from the queue
"""
return self._task_queue.get(block=block)
def wait_until_complete(self) -> None:
self.join()
def create_multipart_upload(self) -> None:
response = self.b2_client.create_multipart_upload(Bucket=self.bucket, Key=self.key)
self.upload_id = response['UploadId']
logger.debug("Created multipart upload. UploadId is %s", self.upload_id)
def upload_part(self, buffer: bytes) -> None:
logger.debug("Uploading part number %s with size %s", self.part_number, len(buffer))
response = self.b2_client.upload_part(
Bucket=self.bucket,
Key=self.key,
Body=buffer,
PartNumber=self.part_number,
UploadId=self.upload_id
)
logger.debug("Uploaded part number %s; ETag is %s", self.part_number, response['ETag'])
self.parts.append({
"ETag": response['ETag'],
'PartNumber': self.part_number
})
self.part_number += 1
self.bytes_written += len(buffer)
def complete_multipart_upload(self) -> None:
if len(self.parts) > 0:
logger.debug("Completing multipart upload with %s parts", len(self.parts))
self.b2_client.complete_multipart_upload(
Bucket=self.bucket,
Key=self.key,
MultipartUpload={
'Parts': self.parts
},
UploadId=self.upload_id
)
elif self.upload_id:
logger.warning("Aborting multipart upload since there are no parts!")
self.b2_client.abort_multipart_upload(
Bucket=self.bucket,
Key=self.key,
UploadId=self.upload_id
)
else:
# This should never happen!
raise RuntimeError("No upload to complete")
def add_custom_header(params: dict[str, Any], **_kwargs):
"""
Add the Live Read custom headers to the outgoing request.
See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/events.html
"""
params['headers']['x-backblaze-live-read-enabled'] = 'true'