Source code for openwpm.storage.arrow_storage

import asyncio
import logging
import random
from abc import abstractmethod
from asyncio import Task
from collections import defaultdict
from typing import Any, DefaultDict, Dict, List, Optional

import pandas as pd
import pyarrow as pa
from pyarrow import Table

from openwpm.types import VisitId

from .parquet_schema import PQ_SCHEMAS
from .storage_providers import INCOMPLETE_VISITS, StructuredStorageProvider, TableName

CACHE_SIZE = 500


[docs] class ArrowProvider(StructuredStorageProvider): """This class implements a StructuredStorage provider that serializes records into the arrow format """ storing_lock: asyncio.Lock def __init__(self) -> None: super().__init__() self.logger = logging.getLogger("openwpm") def factory_function() -> DefaultDict[TableName, List[Dict[str, Any]]]: return defaultdict(list) # Raw records per VisitId and Table self._records: DefaultDict[ VisitId, DefaultDict[TableName, List[Dict[str, Any]]] ] = defaultdict(factory_function) # Record batches by TableName self._batches: DefaultDict[TableName, List[pa.RecordBatch]] = defaultdict(list) self._instance_id = random.getrandbits(32) self.flush_events: List[asyncio.Event] = list()
[docs] async def init(self) -> None: # Used to synchronize the finalizing and the flushing self.storing_lock = asyncio.Lock()
[docs] async def store_record( self, table: TableName, visit_id: VisitId, record: Dict[str, Any] ) -> None: records = self._records[visit_id] # Add nulls for item in PQ_SCHEMAS[table].names: if item not in record: record[item] = None # Add instance_id (for partitioning) record["instance_id"] = self._instance_id records[table].append(record)
def _create_batch(self, visit_id: VisitId) -> None: """Create record batches for all records from `visit_id`""" if visit_id not in self._records: # The batch for this `visit_id` was already created, skip self.logger.error( "Trying to create batch for visit_id %d when one was already created", visit_id, ) return for table_name, data in self._records[visit_id].items(): try: df = pd.DataFrame(data) batch = pa.RecordBatch.from_pandas( df, schema=PQ_SCHEMAS[table_name], preserve_index=False ) self._batches[table_name].append(batch) self.logger.debug( "Successfully created batch for table %s and " "visit_id %s" % (table_name, visit_id) ) except pa.lib.ArrowInvalid: self.logger.error( "Error while creating record batch for table %s\n" % table_name, exc_info=True, ) pass del self._records[visit_id] def _is_cache_full(self) -> bool: for batches in self._batches.values(): if len(batches) > CACHE_SIZE: return True return False
[docs] async def finalize_visit_id( self, visit_id: VisitId, interrupted: bool = False ) -> Task[None]: """This method is the reason the finalize_visit_id interface returns a task. This was necessary as we needed to enable the following pattern. .. code-block:: Python token = await structured_storage.finalize_visit_id(1) structured_storage.flush_cache() await token If there was no task returned and the method would just block/yield after turning the record into a batch, there would be no way to know, when it's safe to flush_cache as I couldn't find a way to run a coroutine until it yields and then run a different one. With the current setup `token` aka a `wait_on_condition` coroutine will only return once it's respective event has been set. """ if interrupted: await self.store_record(INCOMPLETE_VISITS, visit_id, {"visit_id": visit_id}) # This code is pretty tricky as there are a number of things going on # 1. The awaitable returned by finalize_visit_id should only # resolve once the data is saved to persistent storage # 2. No new batches should be created while saving out all the batches async with self.storing_lock: self._create_batch(visit_id) event = asyncio.Event() self.flush_events.append(event) if self._is_cache_full(): await self.flush_cache(self.storing_lock) async def wait_on_condition(e: asyncio.Event) -> None: await e.wait() return asyncio.create_task(wait_on_condition(event))
[docs] @abstractmethod async def write_table(self, table_name: TableName, table: Table) -> None: """Write out the table to persistent storage This should only return once it's actually saved out """
[docs] async def flush_cache(self, lock: Optional[asyncio.Lock] = None) -> None: """We need to hack around the fact that asyncio has no reentrant lock So we either grab the storing_lock ourselves or the caller needs to pass us the locked storing_lock """ has_lock_arg = lock is not None if not has_lock_arg: lock = self.storing_lock await lock.acquire() assert lock == self.storing_lock and lock.locked() for table_name, batches in self._batches.items(): table = pa.Table.from_batches(batches) await self.write_table(table_name, table) self._batches.clear() for event in self.flush_events: event.set() self.flush_events.clear() if not has_lock_arg: lock.release()
[docs] async def shutdown(self) -> None: for table_name, batches in self._batches.items(): if len(batches) != 0: self.logger.error( "While shutting down there were %d cached entries for table %s", len(batches), table_name, )