openwpm.storage.storage_controller module

class openwpm.storage.storage_controller.DataSocket(listener_address: Tuple[str, int], client_name: str)[source]

Bases: object

Wrapper around ClientSocket to make sending records to the StorageController more convenient

close() None[source]
finalize_visit_id(visit_id: VisitId, success: bool) None[source]
store_record(table_name: TableName, visit_id: VisitId, data: Dict[str, Any]) None[source]
class openwpm.storage.storage_controller.StorageController(structured_storage: StructuredStorageProvider, unstructured_storage: UnstructuredStorageProvider | None, status_queue: Queue, completion_queue: Queue, shutdown_queue: Queue)[source]

Bases: object

Manages incoming data and it’s saving to disk

Provides it’s status to the task manager via the completion and status queue. Can be shut down via a shutdown signal in the shutdown queue

finalize_tasks: list[tuple[VisitId, Task[None] | None, bool]]

Contains all information required for update_completion_queue to work Tuple structure is: VisitId, optional completion token, success

async finalize_visit_id(visit_id: VisitId, success: bool) Task[None] | None[source]

Makes sure all records for a given visit_id have been processed before we invoke finalize_visit_id on the structured_storage

See StructuredStorageProvider::finalize_visit_id for additional documentation

async handler(reader: StreamReader, _: StreamWriter) None[source]

Created for every new connection to the Server

run() None[source]
async save_batch_if_past_timeout() NoReturn[source]

Save the current batch of records if no new data has been received.

If we aren’t receiving new data for this batch we commit early regardless of the current batch size.

This coroutine will get cancelled with an exception so there is no need for an orderly return

async should_shutdown() None[source]

Returns when we should shut down

async shutdown(completion_queue_task: Task[None]) None[source]
async store_record(table_name: TableName, visit_id: VisitId, data: Dict[str, Any]) None[source]
store_record_tasks: DefaultDict[VisitId, list[Task[None]]]

Contains all store_record tasks for a given visit_id

async update_completion_queue() None[source]

All completed finalize_visit_id tasks get put into the completion_queue here

async update_status_queue() NoReturn[source]

Send manager process a status update.

This coroutine will get cancelled with an exception so there is no need for an orderly return

class openwpm.storage.storage_controller.StorageControllerHandle(structured_storage: StructuredStorageProvider, unstructured_storage: UnstructuredStorageProvider | None)[source]

Bases: object

This class contains all methods relevant for the TaskManager to interact with the StorageController

get_most_recent_status() int[source]

Return the most recent queue size sent from the Storage Controller process

get_new_completed_visits() List[Tuple[int, bool]][source]

Returns a list of all visit ids that have been processed since the last time the method was called and whether or not they ran successfully.

This method will return an empty list in case no visit ids have been processed since the last time this method was called

get_next_browser_id() BrowserId[source]

Generate crawl id as randomly generated positive 32bit integer

Note: Parquet’s partitioned dataset reader only supports integer partition columns up to 32 bits.

get_next_visit_id() VisitId[source]

Generate visit id as randomly generated positive integer less than 2^53.

Parquet can support integers up to 64 bits, but Javascript can only represent integers up to 53 bits: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER Thus, we cap these values at 53 bits.

get_status() int[source]

Get listener process status. If the status queue is empty, block.

launch() None[source]

Starts the storage controller

save_configuration(manager_params: ManagerParamsInternal, browser_params: List[BrowserParamsInternal], openwpm_version: str, browser_version: str) None[source]
shutdown(relaxed: bool = True) None[source]

Terminate the storage controller process