import sqlite3 import json import logging import sys import glob logger = logging.getLogger(__name__) def get_connection(library): conn = sqlite3.connect(f'{library}/db.sqlite3') cursor = conn.cursor() cursor.execute('PRAGMA foreign_keys = ON') cursor.execute('PRAGMA journal_mode = WAL') return conn def migrate(resources, conn): current_version, = next(conn.cursor().execute('PRAGMA user_version'), (0, )) for version, migration_path in enumerate(glob.glob(f'{resources}/migrations/*.sql'), start=1): if version > current_version: cur = conn.cursor() try: logger.info('Applying %s', migration_path) with open(migration_path, 'r') as file: migration_content = file.read() cur.executescript(f'begin; PRAGMA user_version={version};' + migration_content) except Exception as e: logger.error('Failed migration %s: %s. Exiting', migration_path, e) cur.execute('rollback') sys.exit(1) else: cur.execute('commit') def get_books(conn): books = {} for r in conn.execute("SELECT id, json(data) FROM books"): books[r[0]] = json.loads(r[1]) return books def create_book(conn, book_id, data): cursor = conn.cursor() encoded_data = bytes(json.dumps(data), 'utf-8') cursor.execute( 'INSERT INTO books(id, created_at, updated_at, data) VALUES (?, datetime(), datetime(), ?)', (book_id, encoded_data)) cursor.execute('commit') def update_book(conn, book_id, data): cursor = conn.cursor() encoded_data = bytes(json.dumps(data), 'utf-8') cursor.execute( 'UPDATE books SET data = ?, updated_at = datetime() WHERE id = ?', (encoded_data, book_id)) cursor.execute('commit') def delete_book(conn, book_id): cursor = conn.cursor() cursor.execute( 'DELETE FROM books WHERE id = ?', (book_id,)) cursor.execute('commit')