Skip to content

Commit

Permalink
Implement missing APIs for swift storage
Browse files Browse the repository at this point in the history
  • Loading branch information
rdunklau committed Nov 3, 2021
1 parent 6c2a91c commit ba14476
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 46 deletions.
121 changes: 83 additions & 38 deletions pghoard/rohmu/object_storage/swift.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,14 @@ def _metadata_for_key(self, key, *, resolve_manifest=False):
def iter_key(self, key, *, with_metadata=True, deep=False, include_key=False):
path = self.format_key_for_backend(key, trailing_slash=not include_key)
self.log.debug("Listing path %r", path)
if deep:
if not deep:
kwargs = {"delimiter": "/"}
else:
kwargs = {}
_, results = self.conn.get_container(self.container_name, prefix=path, full_listing=True, **kwargs)
for item in results:
if "subdir" in item:
yield IterKeyItem(type=KEY_TYPE_PREFIX, value=self.format_key_from_backend(item["name"]).rstrip("/"))
yield IterKeyItem(type=KEY_TYPE_PREFIX, value=self.format_key_from_backend(item["subdir"]).rstrip("/"))
else:
if with_metadata:
metadata = self._metadata_for_key(item["name"], resolve_manifest=True)
Expand Down Expand Up @@ -250,44 +250,17 @@ def store_file_from_memory(self, key, memstring, metadata=None, cache_control=No
)

def store_file_from_disk(self, key, filepath, metadata=None, multipart=None, cache_control=None, mimetype=None):
if cache_control is not None:
raise NotImplementedError("SwiftTransfer: cache_control support not implemented")

if multipart:
# Start by trying to delete the file - if it's a potential multipart file we need to manually
# delete it, otherwise old segments won't be cleaned up by anything. Note that we only issue
# deletes with the store_file_from_disk functions, store_file_from_memory is used to upload smaller
# chunks.
with suppress(FileNotFoundFromStorageError):
self.delete_key(key)
key = self.format_key_for_backend(key)
headers = self._metadata_to_headers(self.sanitize_metadata(metadata))
obsz = os.path.getsize(filepath)
with open(filepath, "rb") as fp:
if obsz <= self.segment_size:
self.log.debug("Uploading %r to %r (%r bytes)", filepath, key, obsz)
self.conn.put_object(self.container_name, key, contents=fp, content_length=obsz, headers=headers)
return

# Segmented transfer
# upload segments of a file like `backup-bucket/site-name/basebackup/2016-03-22_0`
# to as `backup-bucket/site-name/basebackup_segments/2016-03-22_0/{:08x}`
segment_no = 0
segment_path = "{}_segments/{}/".format(os.path.dirname(key), os.path.basename(key))
segment_key_format = "{}{{:08x}}".format(segment_path).format
remaining = obsz
while remaining > 0:
this_segment_size = min(self.segment_size, remaining)
remaining -= this_segment_size
segment_no += 1
self.log.debug("Uploading segment %r of %r to %r (%r bytes)", segment_no, filepath, key, this_segment_size)
segment_key = segment_key_format(segment_no) # pylint: disable=too-many-format-args
self.conn.put_object(
self.container_name, segment_key, contents=fp, content_length=this_segment_size, content_type=mimetype
)
self.log.info("Uploaded %r segments of %r to %r", segment_no, key, segment_path)
headers["x-object-manifest"] = "{}/{}".format(self.container_name, segment_path.lstrip("/"))
self.conn.put_object(self.container_name, key, contents="", headers=headers, content_length=0)
self._store_file_contents(
key,
fp,
metadata=metadata,
multipart=multipart,
cache_control=cache_control,
mimetype=mimetype,
content_length=obsz
)

def get_or_create_container(self, container_name):
start_time = time.monotonic()
Expand All @@ -304,3 +277,75 @@ def get_or_create_container(self, container_name):
return container_name
raise
return container_name

def copy_file(self, *, source_key, destination_key, metadata=None, **_kwargs):
source_key = self.format_key_for_backend(source_key)
destination_key = "/".join((self.container_name, self.format_key_for_backend(destination_key)))
headers = self._metadata_to_headers(self.sanitize_metadata(metadata))
if metadata:
headers["X-Fresh-Metadata"] = True
self.conn.copy_object(self.container_name, source_key, destination=destination_key, headers=headers)

def store_file_object(self, key, fd, *, cache_control=None, metadata=None, mimetype=None, upload_progress_fn=None):
metadata = metadata or {}
self._store_file_contents(
key,
fd,
cache_control=cache_control,
metadata=metadata,
mimetype=mimetype,
upload_progress_fn=upload_progress_fn,
multipart=True,
content_length=metadata.get("Content-Length")
)

def _store_file_contents(
self,
key,
fp,
cache_control=None,
metadata=None,
mimetype=None,
upload_progress_fn=None,
multipart=None,
content_length=None
):
if cache_control is not None:
raise NotImplementedError("SwiftTransfer: cache_control support not implemented")

if multipart:
# Start by trying to delete the file - if it's a potential multipart file we need to manually
# delete it, otherwise old segments won't be cleaned up by anything. Note that we only issue
# deletes with the store_file_from_disk functions, store_file_from_memory is used to upload smaller
# chunks.
with suppress(FileNotFoundFromStorageError):
self.delete_key(key)
key = self.format_key_for_backend(key)
headers = self._metadata_to_headers(self.sanitize_metadata(metadata))
# Fall back to the "one segment" if possible
if (not multipart) or (not content_length) or content_length <= self.segment_size:
self.log.debug("Uploading %r to %r (%r bytes)", fp, key, content_length)
self.conn.put_object(self.container_name, key, contents=fp, content_length=content_length, headers=headers)
return

# Segmented transfer
# upload segments of a file like `backup-bucket/site-name/basebackup/2016-03-22_0`
# to as `backup-bucket/site-name/basebackup_segments/2016-03-22_0/{:08x}`
segment_no = 0
segment_path = "{}_segments/{}/".format(os.path.dirname(key), os.path.basename(key))
segment_key_format = "{}{{:08x}}".format(segment_path).format
remaining = content_length
while remaining > 0:
this_segment_size = min(self.segment_size, remaining)
remaining -= this_segment_size
segment_no += 1
self.log.debug("Uploading segment %r of %r to %r (%r bytes)", segment_no, fp, key, this_segment_size)
segment_key = segment_key_format(segment_no) # pylint: disable=too-many-format-args
self.conn.put_object(
self.container_name, segment_key, contents=fp, content_length=this_segment_size, content_type=mimetype
)
if upload_progress_fn:
upload_progress_fn(content_length - remaining)
self.log.info("Uploaded %r segments of %r to %r", segment_no, key, segment_path)
headers["x-object-manifest"] = "{}/{}".format(self.container_name, segment_path.lstrip("/"))
self.conn.put_object(self.container_name, key, contents="", headers=headers, content_length=0)
12 changes: 4 additions & 8 deletions pghoard/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,20 +317,16 @@ def handle_upload(self, site, key, file_to_transfer):
try:
storage = self.get_object_storage(site)
unlink_local = file_to_transfer.remove_after_upload
# Basebackups may be multipart uploads, depending on the driver.
# Swift needs to know about this so it can do possible cleanups.
# FIXME: make it more obvious what is happening here, since nothing
# seems to be specific to multipart here ?
multipart = file_to_transfer.filetype in {
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta
}
self.log.info("Uploading file to object store: src=%r dst=%r", file_to_transfer.source_data, key)
if not isinstance(file_to_transfer.source_data, BytesIO):
f = open(file_to_transfer.source_data, "rb")
else:
f = file_to_transfer.source_data
with f:
storage.store_file_object(key, f, metadata=file_to_transfer.metadata)
metadata = file_to_transfer.metadata.copy()
if file_to_transfer.file_size:
metadata["Content-Length"] = file_to_transfer.file_size
storage.store_file_object(key, f, metadata=metadata)
if unlink_local:
try:
self.log.info("Deleting file: %r since it has been uploaded", file_to_transfer.source_data)
Expand Down

0 comments on commit ba14476

Please sign in to comment.