Skip to content

Commit

Permalink
Add function to delete objects from s3
Browse files Browse the repository at this point in the history
  • Loading branch information
vishalvvr committed Jul 2, 2024
1 parent 44829de commit b5535b7
Showing 1 changed file with 38 additions and 3 deletions.
41 changes: 38 additions & 3 deletions opl/s3_tools.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import logging

import boto3

import botocore.client
from typing import List, Dict


def connect(s3_conf):
def connect(s3_conf: Dict) -> boto3.resource:
"""
Connect to s3
Args:
s3_conf: all aws access creadentials like access_key, secret_key, region, etc.
Returns:
s3 connection object
"""
logging.debug("Going to connect")
s3_resource = boto3.resource(
"s3",
Expand Down Expand Up @@ -42,3 +48,32 @@ def get_presigned_url(s3_resource, bucket, remote_name):
)
logging.info(f"For {remote_name} obtained signed url {download_url}")
return download_url


def delete_files(
s3_resource: boto3.resource, bucket_name: str, file_paths: str | List
) -> Dict | bool:
"""
Delete s3-objets from s3 bucket
Args:
s3_resource: create s3 resource object from using connect() and pass
bucket_name: s3 bucket name
file_paths: filename or list of file names you want to delete
Returns:
Delete response or False
"""
# Create a list of objects to delete
if isinstance(file_paths, list):
objects_to_delete = [{"Key": path} for path in file_paths]
elif isinstance(file_paths, str):
objects_to_delete = [{"Key": file_paths}]
else:
print(f"file_paths attribute must be string or list not {type(file_paths)}")
return False
logging.debug("Going to delete")
if objects_to_delete:
response = s3_resource.meta.client.delete_objects(
Bucket=bucket_name, Delete={"Objects": objects_to_delete}
)
logging.debug("object/objects deleted")
return response

0 comments on commit b5535b7

Please sign in to comment.