1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
import sqlite3
import json
import logging
import sys
import glob
logger = logging.getLogger(__name__)
def get_connection(library):
conn = sqlite3.connect(f'{library}/db.sqlite3')
cursor = conn.cursor()
cursor.execute('PRAGMA foreign_keys = ON')
cursor.execute('PRAGMA journal_mode = WAL')
return conn
def migrate(resources, conn):
current_version, = next(conn.cursor().execute('PRAGMA user_version'), (0, ))
for version, migration_path in enumerate(glob.glob(f'{resources}/migrations/*.sql'), start=1):
if version > current_version:
cur = conn.cursor()
try:
logger.info('Applying %s', migration_path)
with open(migration_path, 'r') as file:
migration_content = file.read()
cur.executescript(f'begin; PRAGMA user_version={version};' + migration_content)
except Exception as e:
logger.error('Failed migration %s: %s. Exiting', migration_path, e)
cur.execute('rollback')
sys.exit(1)
else:
cur.execute('commit')
def get_books(conn):
books = {}
for r in conn.execute("SELECT id, json(data) FROM books"):
books[r[0]] = json.loads(r[1])
return books
def create_book(conn, book_id, data):
cursor = conn.cursor()
encoded_data = bytes(json.dumps(data), 'utf-8')
cursor.execute(
'INSERT INTO books(id, created_at, updated_at, data) VALUES (?, datetime(), datetime(), ?)',
(book_id, encoded_data))
cursor.execute('commit')
def update_book(conn, book_id, data):
cursor = conn.cursor()
encoded_data = bytes(json.dumps(data), 'utf-8')
cursor.execute(
'UPDATE books SET data = ?, updated_at = datetime() WHERE id = ?',
(encoded_data, book_id))
cursor.execute('commit')
def delete_book(conn, book_id):
cursor = conn.cursor()
cursor.execute(
'DELETE FROM books WHERE id = ?',
(book_id,))
cursor.execute('commit')
|