Source code for openwpm.storage.cloud_storage.s3_storage
import logging
from typing import Any, Set
import pyarrow.parquet as pq
from pyarrow.lib import Table
from s3fs import S3FileSystem
from ..arrow_storage import ArrowProvider
from ..storage_providers import TableName, UnstructuredStorageProvider
[docs]
class S3StructuredProvider(ArrowProvider):
"""This class allows you to upload Parquet files to S3.
S3StructuredProvider will by default store into
base_path/visits/table_name in the given bucket.
Pass a different sub_dir to change this.
**kwargs get passed on to S3FileSystem.__init__
Please look at https://s3fs.readthedocs.io/en/latest/api.html#s3fs.core.S3FileSystem
for further information
"""
file_system: S3FileSystem
def __init__(
self, bucket_name: str, base_path: str, sub_dir: str = "visits", **kwargs: Any
) -> None:
super().__init__()
self.kwargs = kwargs
self.base_path = f"{bucket_name}/{base_path}/{sub_dir}/{{table_name}}"
def __str__(self) -> str:
return f"S3FS:{self.base_path.removesuffix('/{table_name}')}"
[docs]
async def init(self) -> None:
await super(S3StructuredProvider, self).init()
self.file_system = S3FileSystem(**self.kwargs)
[docs]
async def write_table(self, table_name: TableName, table: Table) -> None:
self.file_system.start_transaction()
pq.write_to_dataset(
table,
self.base_path.format(table_name=table_name),
filesystem=self.file_system,
)
self.file_system.end_transaction()
[docs]
class S3UnstructuredProvider(UnstructuredStorageProvider):
"""This class allows you to upload arbitrary bytes to S3.
They will be stored under bucket_name/base_path/filename
**kwargs get passed on to S3FileSystem.__init__
Please look at https://s3fs.readthedocs.io/en/latest/api.html#s3fs.core.S3FileSystem
for further information
"""
file_system: S3FileSystem
def __init__(self, bucket_name: str, base_path: str, **kwargs: Any) -> None:
super().__init__()
self.kwargs = kwargs
self.bucket_name = bucket_name
self.base_path = base_path
self.base_path = f"{bucket_name}/{base_path}/{{filename}}"
self.file_name_cache: Set[str] = set()
"""The set of all filenames ever uploaded, checked before uploading"""
self.logger = logging.getLogger("openwpm")
[docs]
async def init(self) -> None:
self.file_system = S3FileSystem(**self.kwargs)
[docs]
async def store_blob(
self, filename: str, blob: bytes, overwrite: bool = False
) -> None:
target_path = self.base_path.format(filename=filename)
if not overwrite and (
filename in self.file_name_cache or self.file_system.exists(target_path)
):
self.logger.info("Not saving out file %s as it already exists", filename)
return
self.file_system.start_transaction()
with self.file_system.open(target_path, mode="wb") as f:
f.write(blob)
self.file_system.end_transaction()
self.file_name_cache.add(filename)
[docs]
async def flush_cache(self) -> None:
pass
[docs]
async def shutdown(self) -> None:
pass