Skip to content

Commit

Permalink
Merge pull request #158 from Aiven-Open/giacomo-alzetta-aiven-fix-mis…
Browse files Browse the repository at this point in the history
…sing-md5-on-google-omit-keys

Handle rare case where GCP does not return size, updated and md5Hash
  • Loading branch information
fingon authored Nov 21, 2023
2 parents 638cebe + f348332 commit 5570129
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 16 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ jobs:
- id: pylint
run: make lint

- id: pip-freeze
run: pip freeze

- id: mypy
run: make mypy

Expand Down
35 changes: 20 additions & 15 deletions rohmu/object_storage/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
MediaUploadProgress,
)
from http.client import IncompleteRead
from io import IOBase
from oauth2client import GOOGLE_TOKEN_URI
from oauth2client.client import GoogleCredentials
from rohmu.common.models import StorageOperation
Expand All @@ -43,7 +44,7 @@
)
from rohmu.typing import AnyPath, Metadata
from rohmu.util import get_total_size_from_content_range
from typing import Any, BinaryIO, Callable, Collection, Iterable, Iterator, Optional, TextIO, Tuple, TypeVar, Union
from typing import Any, BinaryIO, Callable, cast, Collection, Iterable, Iterator, Optional, TextIO, Tuple, TypeVar, Union
from typing_extensions import Protocol

import codecs
Expand Down Expand Up @@ -278,7 +279,7 @@ def _retry_on_reset(self, request: HttpRequest, action: Callable[[], ResType], r
retry_reporter.report(self.stats)

# we want to reset the http connection state in case of error
if request and hasattr(request, "http"):
if request and hasattr(request, "http") and hasattr(request.http, "connections"):
request.http.connections.clear() # reset connection cache

retries -= 1
Expand Down Expand Up @@ -380,16 +381,18 @@ def initial_op(domain: Any) -> HttpRequest:
self.log.warning("list_iter: directory entry %r", item)
continue # skip directory level objects

yield IterKeyItem(
type=KEY_TYPE_OBJECT,
value={
"name": self.format_key_from_backend(item["name"]),
"size": int(item["size"]),
"last_modified": parse_timestamp(item["updated"]),
"metadata": item.get("metadata", {}),
"md5": base64_to_hex(item["md5Hash"]),
},
)
value = {
"name": self.format_key_from_backend(item["name"]),
"metadata": item.get("metadata", {}),
}
# in very rare circumstances size, updated and md5Hash can be missing. Omit the keys if that happens
if (size := item.get("size")) is not None:
value["size"] = int(size)
if (updated := item.get("updated")) is not None:
value["last_modified"] = parse_timestamp(updated)
if (md5 := item.get("md5Hash")) is not None:
value["md5"] = base64_to_hex(md5)
yield IterKeyItem(type=KEY_TYPE_OBJECT, value=value)
elif property_name == "prefixes":
for prefix in items:
yield IterKeyItem(type=KEY_TYPE_PREFIX, value=self.format_key_from_backend(prefix).rstrip("/"))
Expand Down Expand Up @@ -434,7 +437,8 @@ def get_contents_to_fileobj(
req: HttpRequest = clob.get_media(bucket=self.bucket_name, object=path)
download: MediaDownloadProtocol
if byte_range is None:
download = MediaIoBaseDownload(fileobj_to_store_to, req, chunksize=DOWNLOAD_CHUNK_SIZE)
# MediaIoBaseDownload only calls .write(bytes) method, so BinaryIO works fine even if mypy complains
download = MediaIoBaseDownload(cast(IOBase, fileobj_to_store_to), req, chunksize=DOWNLOAD_CHUNK_SIZE)
else:
download = MediaIoBaseDownloadWithByteRange(
fileobj_to_store_to, req, chunksize=DOWNLOAD_CHUNK_SIZE, byte_range=byte_range
Expand All @@ -452,7 +456,7 @@ def get_contents_to_fileobj(
last_log_output = now

if progress_callback and progress_pct > next_prog_report:
progress_callback(progress_pct, 100)
progress_callback(int(progress_pct), 100)
next_prog_report = progress_pct + 0.1
elif done:
reporter.report_status(self.stats, MediaDownloadProgress(size_to_download, size_to_download))
Expand Down Expand Up @@ -768,7 +772,8 @@ def __init__(
self._done = False

self._headers = {}
for k, v in request.headers.items():
req_headers = request.headers or {}
for k, v in req_headers.items():
# allow users to supply custom headers by setting them on the request
# but strip out the ones that are set by default on requests generated by
# API methods like Drive's files().get(fileId=...)
Expand Down
83 changes: 82 additions & 1 deletion test/object_storage/test_google.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
from __future__ import annotations

from contextlib import ExitStack
from datetime import datetime
from datetime import datetime, timezone
from googleapiclient.http import MediaUploadProgress
from io import BytesIO
from rohmu.common.models import StorageOperation
from rohmu.errors import InvalidByteRangeError
from rohmu.object_storage.base import IterKeyItem
from rohmu.object_storage.google import GoogleTransfer, MediaIoBaseDownloadWithByteRange, Reporter
from tempfile import NamedTemporaryFile
from unittest.mock import ANY, call, MagicMock, Mock, patch

import base64
import pytest


Expand Down Expand Up @@ -211,3 +213,82 @@ def test_media_io_download_with_byte_range_and_very_small_object() -> None:
assert status.progress() == 1.0
assert result.getvalue() == b"lo, World!"
mock_request.http.request.assert_called_once_with(ANY, ANY, headers={"range": "bytes=3-100"})


def test_object_listed_when_missing_md5hash_size_and_updated() -> None:
notifier = MagicMock()
with ExitStack() as stack:
stack.enter_context(patch("rohmu.object_storage.google.get_credentials"))
stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer.get_or_create_bucket"))
mock_operation = stack.enter_context(patch("rohmu.common.statsd.StatsClient.operation"))
transfer = GoogleTransfer(
project_id="test-project-id",
bucket_name="test-bucket",
notifier=notifier,
)

# mock instance because there is decorator and context managers in the way
mock_client = stack.enter_context(patch.object(transfer, "_object_client"))
mock_client.return_value.__enter__.return_value.list_next.return_value = None
object_name = (
"aiventest/111aa1aa-1aaa-1111-11a1-11111aaaaa11/a1111111-aaa1-1aaa-aa1a-1a11aaaa11a1"
"/tiered_storage/ccs/aaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
)
escaped_name = object_name.replace("/", "%2F")

# API response missing size, updated & md5Hash fields
sample_item = {
"bucket": "test-bucket",
"contentType": "binary/octet-stream",
"generation": "1111111111111111",
"id": f"test-bucket/{object_name}/1111111111111111",
"kind": "storage#object",
"mediaLink": f"https://storage.googleapis.com/download/storage/v1/b/test-bucket/o/"
f"{escaped_name}?generation=1111111111111111&alt=media",
"metageneration": "1",
"name": object_name,
"selfLink": f"https://www.googleapis.com/storage/v1/b/"
f"p812de5da-0bab-4990-90e8-57303eebfd30-99012089cf1d961516b8b3ff6/o/"
f"{escaped_name}?generation=1111111111111111",
"storageClass": "REGIONAL",
}
mock_client.return_value.__enter__.return_value.list.return_value.execute.return_value = {
"items": [
sample_item,
{"size": 100, **sample_item},
{"md5Hash": base64.encodebytes(b"Missing md5Hash!"), **sample_item},
{"updated": "2023-11-20T16:18:00+00:00", **sample_item},
]
}

got = list(
transfer.iter_key(
key="testkey",
with_metadata=False,
deep=True,
include_key=False,
)
)
assert mock_operation.call_count == 1
mock_operation.assert_has_calls(
[
call(operation=StorageOperation.iter_key),
]
)
expected = [
IterKeyItem(type="object", value={"name": object_name, "metadata": {}}),
IterKeyItem(type="object", value={"name": object_name, "metadata": {}, "size": 100}),
IterKeyItem(
type="object", value={"name": object_name, "metadata": {}, "md5": "4d697373696e67206d64354861736821"}
),
IterKeyItem(
type="object",
value={
"name": object_name,
"metadata": {},
"last_modified": datetime(2023, 11, 20, 16, 18, tzinfo=timezone.utc),
},
),
]
assert len(got) == len(expected)
assert got == expected

0 comments on commit 5570129

Please sign in to comment.