openwpm.storage.storage_controller module¶
- class openwpm.storage.storage_controller.DataSocket(listener_address: Tuple[str, int], client_name: str)[source]¶
Bases:
object
Wrapper around ClientSocket to make sending records to the StorageController more convenient
- class openwpm.storage.storage_controller.StorageController(structured_storage: StructuredStorageProvider, unstructured_storage: UnstructuredStorageProvider | None, status_queue: Queue, completion_queue: Queue, shutdown_queue: Queue)[source]¶
Bases:
object
Manages incoming data and it’s saving to disk
Provides it’s status to the task manager via the completion and status queue. Can be shut down via a shutdown signal in the shutdown queue
- finalize_tasks: list[tuple[VisitId, Task[None] | None, bool]]¶
Contains all information required for update_completion_queue to work Tuple structure is: VisitId, optional completion token, success
- async finalize_visit_id(visit_id: VisitId, success: bool) Task[None] | None [source]¶
Makes sure all records for a given visit_id have been processed before we invoke finalize_visit_id on the structured_storage
See StructuredStorageProvider::finalize_visit_id for additional documentation
- async handler(reader: StreamReader, _: StreamWriter) None [source]¶
Created for every new connection to the Server
- async save_batch_if_past_timeout() NoReturn [source]¶
Save the current batch of records if no new data has been received.
If we aren’t receiving new data for this batch we commit early regardless of the current batch size.
This coroutine will get cancelled with an exception so there is no need for an orderly return
- store_record_tasks: DefaultDict[VisitId, list[Task[None]]]¶
Contains all store_record tasks for a given visit_id
- class openwpm.storage.storage_controller.StorageControllerHandle(structured_storage: StructuredStorageProvider, unstructured_storage: UnstructuredStorageProvider | None)[source]¶
Bases:
object
This class contains all methods relevant for the TaskManager to interact with the StorageController
- get_most_recent_status() int [source]¶
Return the most recent queue size sent from the Storage Controller process
- get_new_completed_visits() List[Tuple[int, bool]] [source]¶
Returns a list of all visit ids that have been processed since the last time the method was called and whether or not they ran successfully.
This method will return an empty list in case no visit ids have been processed since the last time this method was called
- get_next_browser_id() BrowserId [source]¶
Generate crawl id as randomly generated positive 32bit integer
Note: Parquet’s partitioned dataset reader only supports integer partition columns up to 32 bits.
- get_next_visit_id() VisitId [source]¶
Generate visit id as randomly generated positive integer less than 2^53.
Parquet can support integers up to 64 bits, but Javascript can only represent integers up to 53 bits: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER Thus, we cap these values at 53 bits.
- save_configuration(manager_params: ManagerParamsInternal, browser_params: List[BrowserParamsInternal], openwpm_version: str, browser_version: str) None [source]¶