2025-02-17 19:44:17 +05:30

86 lines
2.4 KiB
Python

import io
import os
from typing import Dict, List, Optional
from aiohttp import ClientError
from boto3 import client as boto3_client
from loguru import logger
from pydantic import BaseModel
REGION = "us-east-1"
# noinspection SpellCheckingInspection
class PresignedPost(BaseModel):
url: str
fields: Dict[str, str]
class SimpleStorageService:
# TODO: would be great if with could make this async
def __init__(self, bucket: Optional[str]) -> None:
if not bucket:
raise ValueError("Bucket name must be provided")
self._client = boto3_client("s3", region_name=REGION)
self._bucket = bucket
def create_presigned_upload_url(
self,
object_name: str,
) -> PresignedPost:
return PresignedPost(
**self._client.generate_presigned_post(
Bucket=self._bucket,
Key=object_name,
)
)
def create_presigned_download_url(self, object_name: str) -> str:
return self._client.generate_presigned_url(
"get_object",
Params={"Bucket": self._bucket, "Key": object_name},
)
def upload_to_bucket(
self,
object_name: str,
file: io.BytesIO,
) -> None:
try:
self._client.put_object(
Bucket=self._bucket, Key=object_name, Body=file.getvalue()
)
except ClientError as e:
logger.error(e)
raise e
def download_file(self, object_name: str, local_filename: str) -> None:
self._client.download_file(
Bucket=self._bucket, Key=object_name, Filename=local_filename
)
def list_keys(self, prefix: str) -> List[str]:
files = self._client.list_objects_v2(Bucket=self._bucket, Prefix=prefix)
if "Contents" not in files:
return []
return [file["Key"] for file in files["Contents"]]
def download_folder(self, prefix: str, path: str) -> List[str]:
local_files = []
for key in self.list_keys(prefix):
local_filename = os.path.join(path, key.split("/")[-1])
self.download_file(key, local_filename)
local_files.append(local_filename)
return local_files
def delete_folder(self, prefix: str) -> None:
keys = self.list_keys(prefix)
self._client.delete_objects(
Bucket=self._bucket,
Delete={"Objects": [{"Key": key} for key in keys]},
)