[docs]@dataclassclassColumn:name:strdtype:strnotnull:bool=field(default=False)default:Any|None=field(default=None)pk:int=field(default=0)@propertydefdefinition(self):# TODO: Is there a way to do this via a prepared statement?returnf"{self.name}{self.dtype}"
[docs]@dataclassclassTable:name:strcolumns:list[Database.Column]@propertydefcreate_statement(self):"""Return the SQL statement required to create this table."""# TODO: Is there a way to do this via a prepared statement?columns=",".join([c.definitionforcinself.columns])return"".join([f"CREATE TABLE {self.name} (",columns,");"])
def__init__(self,dbpath:pathlib.Path|Literal[":memory:"]):self.path=dbpathifisinstance(self.path,pathlib.Path)andnotself.path.parent.exists():self.path.parent.mkdir(parents=True)self.db=sqlite3.connect(self.path)# Ensure that Write Ahead Logging is enabled.self.db.execute("PRAGMA journal_mode(WAL)")self._checked_tables:set[str]=set()def_get_table(self,name:str)->Table|None:"""Get the table with the given name, if it exists."""# TODO: SQLite does not seem to like '?' syntax in this statement...cursor=self.db.execute(f"PRAGMA table_info({name});")rows=cursor.fetchall()iflen(rows)==0:# Table does not existreturnNonecolumns=[self.Column(name=name,dtype=type_,notnull=notnull,default=default,pk=pk)for(_,name,type_,notnull,default,pk)inrows]returnself.Table(name=name,columns=columns)def_create_table(self,table:Table):"""Create the given table."""cursor=self.db.cursor()# TODO: Is there a way to do this via a prepared statement?cursor.execute(f"DROP TABLE IF EXISTS {table.name}")cursor.execute(table.create_statement)self.db.commit()
[docs]defclear_table(self,table:Table,**kwargs):"""Clear the given table Parameters ---------- kwargs Constraints to limit the rows that get cleared """# TODO: Is there a way to pass the table name as a '?' parameter?base_query=f"DELETE FROM {table.name}"# noqa: S608where:list[str]=[]parameters:list[Any]=[]forparam,valueinkwargs.items():ifvalueisNone:where.append(f"{param} is null")else:where.append(f"{param} = ?")parameters.append(value)ifwhere:conditions=" AND ".join(where)query=" ".join([base_query,"WHERE",conditions])else:query=base_querycursor=self.db.cursor()cursor.execute(query,tuple(parameters))self.db.commit()
[docs]defensure_table(self,table:Table):"""Ensure that the given table exists in the database. If the table *does* exist, but has the wrong shape, it will be dropped and recreated. """# If we've already checked the table, then there's nothing to doiftable.nameinself._checked_tables:returnif(existing:=self._get_table(table.name))isNone:self._create_table(table)return# Are the tables compatible?iflen(existing.columns)!=len(table.columns):self._create_table(table)else:forexisting_col,colinzip(existing.columns,table.columns):ifexisting_col.name!=col.nameorexisting_col.dtype!=col.dtype:self._create_table(table)breakself._checked_tables.add(table.name)
[docs]definsert_values(self,table:Table,values:list[tuple]):"""Insert the given values into the given table."""iflen(values)==0:returncursor=self.db.cursor()placeholder="("+",".join(["?"for_inrange(len(values[0]))])+")"cursor.executemany(f"INSERT INTO {table.name} VALUES {placeholder}",values)# noqa: S608self.db.commit()