-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabase.py
83 lines (68 loc) · 2.72 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import os
import sys
from typing import Optional
import psycopg
DB_HOST = os.environ.get("DATABASE_HOST", "localhost")
DB_PORT = os.environ.get("DATABASE_PORT", "5432")
DB_NAME = os.environ.get("DATABASE_NAME", "postgres")
DB_USER = os.environ.get("DATABASE_USER", "postgres")
DB_PASSWORD = os.environ.get("DATABASE_PASSWORD", "")
DB_CONN: Optional[psycopg.Connection] = None
def connect():
global DB_CONN
connection_url = get_connection_url()
try:
DB_CONN = psycopg.connect(connection_url)
except Exception as e:
secure_connection_url = connection_url.replace(f":{DB_PASSWORD}", ":***") if len(DB_PASSWORD) > 0 else connection_url
sys.stderr.write(f"Couldn't connect to the database via '{secure_connection_url}':\n{e}\n")
exit(-1)
def get_connection_url():
password_part = f":{DB_PASSWORD}" if len(DB_PASSWORD) > 0 else ""
return f"postgresql://{DB_USER}{password_part}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
def initialize():
connect()
with DB_CONN.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS schema_version (
id TEXT PRIMARY KEY,
applied BOOLEAN DEFAULT FALSE
);
""")
DB_CONN.commit()
def add_migrations_information(migrations):
with DB_CONN.cursor() as cur:
for migration in migrations:
try:
cur.execute("INSERT INTO schema_version (id) VALUES (%s)", (migration,))
except psycopg.errors.UniqueViolation:
DB_CONN.rollback()
except Exception as e:
sys.stderr.write(f"Error when inserting migration '{migration}':\n{e}\n")
DB_CONN.close()
exit(-1)
DB_CONN.commit()
def get_not_yet_applied_migrations():
with DB_CONN.cursor() as cur:
try:
rows = cur.execute("SELECT id FROM schema_version WHERE applied=false").fetchall()
except Exception as e:
sys.stderr.write(f"Error when fetching not yet applied migrations:\n{e}\n")
DB_CONN.close()
exit(-1)
return sorted([row[0] for row in rows])
def apply_migration(migration_id, migration_text):
with DB_CONN.cursor() as cur:
try:
cur.execute(migration_text)
except Exception as e:
sys.stderr.write(f"\nError when applying migration '{migration_id}':\n{e}\n")
DB_CONN.close()
exit(-1)
try:
cur.execute("UPDATE schema_version SET applied=true WHERE id=%s", (migration_id,))
except Exception as e:
sys.stderr.write(f"Error when setting '{migration_id}' as applied in the database:\n{e}\n")
DB_CONN.close()
exit(-1)
DB_CONN.commit()