Source code for openwpm.storage.sql_provider
import json
import logging
import os
import sqlite3
from pathlib import Path
from sqlite3 import (
Connection,
Cursor,
IntegrityError,
InterfaceError,
OperationalError,
ProgrammingError,
)
from typing import Any, Dict, List, Tuple
from openwpm.types import VisitId
from .storage_providers import StructuredStorageProvider, TableName
SCHEMA_FILE = os.path.join(os.path.dirname(__file__), "schema.sql")
[docs]
class SQLiteStorageProvider(StructuredStorageProvider):
db: Connection
cur: Cursor
def __init__(self, db_path: Path) -> None:
super().__init__()
self.db_path = db_path
self._sql_counter = 0
self._sql_commit_time = 0
self.logger = logging.getLogger("openwpm")
[docs]
async def init(self) -> None:
self.db = sqlite3.connect(str(self.db_path))
self.cur = self.db.cursor()
self._create_tables()
def _create_tables(self) -> None:
"""Create tables (if this is a new database)"""
with open(SCHEMA_FILE, "r") as f:
self.db.executescript(f.read())
self.db.commit()
[docs]
async def flush_cache(self) -> None:
self.db.commit()
[docs]
async def store_record(
self, table: TableName, visit_id: VisitId, record: Dict[str, Any]
) -> None:
"""Submit a record to be stored
The storing might not happen immediately
"""
assert self.cur is not None
statement, args = self._generate_insert(table=table, data=record)
for i in range(len(args)):
if isinstance(args[i], bytes):
args[i] = str(args[i], errors="ignore")
elif callable(args[i]):
args[i] = str(args[i])
elif type(args[i]) == dict:
args[i] = json.dumps(args[i])
try:
self.cur.execute(statement, args)
self._sql_counter += 1
except (
OperationalError,
ProgrammingError,
IntegrityError,
InterfaceError,
) as e:
self.logger.error(
"Unsupported record:\n%s\n%s\n%s\n%s\n"
% (type(e), e, statement, repr(args))
)
@staticmethod
def _generate_insert(
table: TableName, data: Dict[str, Any]
) -> Tuple[str, List[Any]]:
"""Generate a SQL query from `record`"""
statement = "INSERT INTO %s (" % table
value_str = "VALUES ("
values = list()
first = True
for field, value in data.items():
statement += "" if first else ", "
statement += field
value_str += "?" if first else ",?"
values.append(value)
first = False
statement = statement + ") " + value_str + ")"
return statement, values
[docs]
def execute_statement(self, statement: str) -> None:
self.cur.execute(statement)
self.db.commit()
[docs]
async def finalize_visit_id(
self, visit_id: VisitId, interrupted: bool = False
) -> None:
if interrupted:
self.logger.warning("Visit with visit_id %d got interrupted", visit_id)
self.cur.execute("INSERT INTO incomplete_visits VALUES (?)", (visit_id,))
self.db.commit()
[docs]
async def shutdown(self) -> None:
self.db.commit()
self.db.close()