Skip to content

Commit

Permalink
feat(cloud): Add cloud delete support
Browse files Browse the repository at this point in the history
This allows Packager to remove old segments from cloud storage automatically.

See shaka-project/shaka-packager#1442
  • Loading branch information
joeyparrish committed Oct 24, 2024
1 parent 1231502 commit a8e6baf
Showing 1 changed file with 45 additions and 9 deletions.
54 changes: 45 additions & 9 deletions streamer/proxy_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
# HTTP status codes
HTTP_STATUS_CREATED = 201
HTTP_STATUS_ACCEPTED = 202
HTTP_STATUS_NO_CONTENT = 204
HTTP_STATUS_FAILED = 500

# S3 has a minimum chunk size for multipart uploads.
Expand All @@ -50,14 +51,15 @@

# Optional: To support GCS, import Google Cloud Storage library.
try:
import google.cloud.storage as gcs # type: ignore
import google.cloud.storage # type: ignore
import google.api_core.exceptions # type: ignore
SUPPORTED_PROTOCOLS.append('gs')
except:
pass

# Optional: To support S3, import AWS's boto3 library.
try:
import boto3 as aws # type: ignore
import boto3 # type: ignore
SUPPORTED_PROTOCOLS.append('s3')
except:
pass
Expand Down Expand Up @@ -92,9 +94,10 @@ def _reset(self, now: float) -> None:


class RequestHandlerBase(BaseHTTPRequestHandler):
"""A request handler that processes the PUT requests coming from
Shaka Packager and pushes them to the destination.
"""A request handler that processes requests coming from Shaka Packager and
relays them to the destination.
"""

def __init__(self, rate_limiter: RateLimiter, *args, **kwargs):
self._rate_limiter: RateLimiter = rate_limiter

Expand All @@ -104,7 +107,7 @@ def __init__(self, rate_limiter: RateLimiter, *args, **kwargs):
super().__init__(*args, **kwargs)

def do_PUT(self) -> None:
"""Handle the PUT requests coming from Shaka Packager."""
"""Handle PUT requests coming from Shaka Packager."""

if self._rate_limiter.suppress(self.path):
# Skip this upload.
Expand Down Expand Up @@ -151,6 +154,20 @@ def do_PUT(self) -> None:
# "returned nothing".
self.end_headers()

def do_DELETE(self) -> None:
"""Handle DELETE requests coming from Shaka Packager."""
try:
self.handle_delete(self.path)
self.send_response(HTTP_STATUS_NO_CONTENT)
except Exception as ex:
print('Upload failure: ' + str(ex))
traceback.print_exc()
self.send_response(HTTP_STATUS_FAILED)

# If we don't call this at the end of the handler, Packager says we
# "returned nothing".
self.end_headers()

@abc.abstractmethod
def handle_non_chunked(self, path: str, length: int,
file: BinaryIO) -> None:
Expand All @@ -172,12 +189,16 @@ def end_chunked(self) -> None:
"""End the chunked transfer."""
pass

@abc.abstractmethod
def handle_delete(self, path: str) -> None:
"""Delete the file from cloud storage."""
pass

class GCSHandler(RequestHandlerBase):
# Can't annotate the bucket here as a parameter if we don't have the library.
def __init__(self, bucket: Any, base_path: str,
rate_limiter: RateLimiter, *args, **kwargs) -> None:
self._bucket: gcs.Bucket = bucket
self._bucket: google.cloud.storage.Bucket = bucket
self._base_path: str = base_path
self._chunked_output: Optional[BinaryIO] = None

Expand Down Expand Up @@ -213,12 +234,23 @@ def end_chunked(self) -> None:
self._chunked_output.close()
self._chunked_output = None

def handle_delete(self, path: str) -> None:
# No leading slashes, or we get a blank folder name.
full_path = (self._base_path + path).strip('/')
blob = self._bucket.blob(full_path)
try:
blob.delete()
except google.api_core.exceptions.NotFound:
# Some delete calls seem to throw "not found", but the files still get
# deleted. So ignore these and don't fail the request.
pass


class S3Handler(RequestHandlerBase):
# Can't annotate the client here as a parameter if we don't have the library.
def __init__(self, client: Any, bucket_name: str, base_path: str,
rate_limiter: RateLimiter, *args, **kwargs) -> None:
self._client: aws.Client = client
self._client: boto3.Client = client
self._bucket_name: str = bucket_name
self._base_path: str = base_path

Expand Down Expand Up @@ -292,6 +324,10 @@ def end_chunked(self) -> None:
self._next_part_number = 0
self._part_info = []

def handle_delete(self, path: str) -> None:
self._client.delete_object(
Bucket=self._bucket_name, Key=self._upload_path)


class HTTPUploadBase(ThreadedNodeBase):
"""Runs an HTTP server at `self.server_location` to upload to cloud.
Expand Down Expand Up @@ -362,7 +398,7 @@ def __init__(self, upload_location: str) -> None:
super().__init__()

url = urllib.parse.urlparse(upload_location)
self._client = gcs.Client()
self._client = google.cloud.storage.Client()
self._bucket = self._client.bucket(url.netloc)
# Strip both left and right slashes. Otherwise, we get a blank folder name.
self._base_path = url.path.strip('/')
Expand All @@ -380,7 +416,7 @@ def __init__(self, upload_location: str) -> None:
super().__init__()

url = urllib.parse.urlparse(upload_location)
self._client = aws.client('s3')
self._client = boto3.client('s3')
self._bucket_name = url.netloc
# Strip both left and right slashes. Otherwise, we get a blank folder name.
self._base_path = url.path.strip('/')
Expand Down

0 comments on commit a8e6baf

Please sign in to comment.