Source code for openwpm.storage.cloud_storage.gcp_storage

import logging
from typing import Optional, Set

import pyarrow.parquet as pq
from gcsfs import GCSFileSystem
from pyarrow.lib import Table

from ..arrow_storage import ArrowProvider
from ..storage_providers import TableName, UnstructuredStorageProvider


[docs] class GcsStructuredProvider(ArrowProvider): """This class allows you to upload Parquet files to GCS. This might not actually be the thing that we want to do long term but seeing as GCS is the S3 equivalent of GCP it is the easiest way forward. Inspired by the old S3Aggregator structure the GcsStructuredProvider will by default store into base_path/visits/table_name in the given bucket. Pass a different sub_dir to change this. """ file_system: GCSFileSystem def __init__( self, project: str, bucket_name: str, base_path: str, token: Optional[str] = None, sub_dir: str = "visits", ) -> None: super().__init__() self.project = project self.token = token self.base_path = f"{bucket_name}/{base_path}/{sub_dir}/{{table_name}}" def __str__(self) -> str: return f"GCS:{self.base_path.removesuffix('/{table_name}')}"
[docs] async def init(self) -> None: await super(GcsStructuredProvider, self).init() self.file_system = GCSFileSystem( project=self.project, token=self.token, access="read_write" )
[docs] async def write_table(self, table_name: TableName, table: Table) -> None: pq.write_to_dataset( table, self.base_path.format(table_name=table_name), filesystem=self.file_system, )
[docs] async def shutdown(self) -> None: pass
[docs] class GcsUnstructuredProvider(UnstructuredStorageProvider): """This class allows you to upload arbitrary bytes to GCS. They will be stored under bucket_name/base_path/filename """ file_system: GCSFileSystem def __init__( self, project: str, bucket_name: str, base_path: str, token: Optional[str] = None, ) -> None: super().__init__() self.project = project self.bucket_name = bucket_name self.base_path = base_path self.token = token self.base_path = f"{bucket_name}/{base_path}/{{filename}}" self.file_name_cache: Set[str] = set() """The set of all filenames ever uploaded, checked before uploading""" self.logger = logging.getLogger("openwpm")
[docs] async def init(self) -> None: self.file_system = GCSFileSystem( project=self.project, token=self.token, access="read_write" )
[docs] async def store_blob( self, filename: str, blob: bytes, overwrite: bool = False ) -> None: target_path = self.base_path.format(filename=filename) if not overwrite and ( filename in self.file_name_cache or self.file_system.exists(target_path) ): self.logger.info("Not saving out file %s as it already exists", filename) return with self.file_system.open(target_path, mode="wb") as f: f.write(blob) self.file_name_cache.add(filename)
[docs] async def flush_cache(self) -> None: pass
[docs] async def shutdown(self) -> None: pass