forked from lnbits/lncalendar
-
Notifications
You must be signed in to change notification settings - Fork 0
/
crud.py
194 lines (159 loc) · 5.56 KB
/
crud.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
from typing import List, Optional, Union
from datetime import datetime, timedelta
from lnbits.helpers import urlsafe_short_hash
from . import db
from .models import (
CreateSchedule,
Schedule,
UnavailableTime,
CreateUnavailableTime,
CreateAppointment,
Appointment,
)
## Schedule CRUD
async def create_schedule(wallet_id: str, data: CreateSchedule) -> Schedule:
schedule_id = urlsafe_short_hash()
await db.execute(
"""
INSERT INTO lncalendar.schedule (id, wallet, name, start_day, end_day, start_time, end_time, amount)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
schedule_id,
wallet_id,
data.name,
data.start_day,
data.end_day,
data.start_time,
data.end_time,
data.amount,
),
)
schedule = await get_schedule(schedule_id)
assert schedule, "Newly created schedule couldn't be retrieved"
return schedule
async def update_schedule(schedule_id: str, data: CreateSchedule) -> Schedule:
await db.execute(
"""
UPDATE lncalendar.schedule SET name = ?, start_day = ?, end_day = ?, start_time = ?, end_time = ?, amount = ?
WHERE id = ?
""",
(
data.name,
data.start_day,
data.end_day,
data.start_time,
data.end_time,
data.amount,
schedule_id,
),
)
schedule = await get_schedule(schedule_id)
assert schedule, "Updated schedule couldn't be retrieved"
return schedule
async def get_schedule(schedule_id: str) -> Optional[Schedule]:
row = await db.fetchone(
"SELECT * FROM lncalendar.schedule WHERE id = ?", (schedule_id,)
)
return Schedule(**row) if row else None
async def get_schedules(wallet_ids: Union[str, List[str]]) -> List[Schedule]:
if isinstance(wallet_ids, str):
wallet_ids = [wallet_ids]
q = ",".join(["?"] * len(wallet_ids))
rows = await db.fetchall(
f"SELECT * FROM lncalendar.schedule WHERE wallet IN ({q})", (*wallet_ids,)
)
return [Schedule(**row) for row in rows]
async def delete_schedule(schedule_id: str) -> None:
await db.execute("DELETE FROM lncalendar.schedule WHERE id = ?", (schedule_id,))
## Appointment CRUD
async def create_appointment(
schedule_id: str, payment_hash: str, data: CreateAppointment
) -> Appointment:
appointment_id = payment_hash
await db.execute(
"""
INSERT INTO lncalendar.appointment (id, name, email, info, start_time, end_time, schedule)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
appointment_id,
data.name,
data.email,
data.info,
data.start_time,
data.end_time,
schedule_id,
),
)
appointment = await get_appointment(appointment_id)
assert appointment, "Newly created appointment couldn't be retrieved"
return appointment
async def get_appointment(appointment_id: str) -> Optional[Appointment]:
row = await db.fetchone(
"SELECT * FROM lncalendar.appointment WHERE id = ?", (appointment_id,)
)
return Appointment(**row) if row else None
async def get_appointments(schedule_id: str) -> List[Appointment]:
print(schedule_id)
rows = await db.fetchall(
"SELECT * FROM lncalendar.appointment WHERE schedule = ?", (schedule_id,)
)
return [Appointment(**row) for row in rows]
async def get_appointments_wallets(
wallet_ids: Union[str, List[str]]
) -> List[Appointment]:
if isinstance(wallet_ids, str):
wallet_ids = [wallet_ids]
schedules = await get_schedules(wallet_ids)
if not schedules:
return []
schedule_ids = [schedule.id for schedule in schedules]
q = ",".join(["?"] * len(schedules))
rows = await db.fetchall(
f"SELECT * FROM lncalendar.appointment WHERE schedule IN ({q})",
(*schedule_ids,),
)
return [Appointment(**row) for row in rows]
async def set_appointment_paid(appointment_id: str) -> None:
await db.execute(
"""
UPDATE lncalendar.appointment SET paid = true
WHERE id = ?
""",
(appointment_id,),
)
async def purge_appointments(schedule_id: str) -> None:
time_diff = datetime.now() - timedelta(hours=24)
await db.execute(
"""
DELETE FROM lncalendar.appointment WHERE schedule = ? AND paid = false AND time < ?
""",
(
schedule_id,
time_diff,
),
)
## UnavailableTime CRUD
async def create_unavailable_time(data: CreateUnavailableTime) -> UnavailableTime:
unavailable_time_id = urlsafe_short_hash()
await db.execute(
"""
INSERT INTO lncalendar.unavailable (id, start_time, end_time, schedule)
VALUES (?, ?, ?, ?)
""",
(unavailable_time_id, data.start_time, data.end_time, data.schedule),
)
unavailable_time = await get_unavailable_time(unavailable_time_id)
assert unavailable_time, "Newly created unavailable_time couldn't be retrieved"
return unavailable_time
async def get_unavailable_time(unavailable_time_id: str) -> Optional[UnavailableTime]:
row = await db.fetchone(
"SELECT * FROM lncalendar.unavailable WHERE id = ?", (unavailable_time_id,)
)
return UnavailableTime(**row) if row else None
async def get_unavailable_times(schedule_id: str) -> List[UnavailableTime]:
rows = await db.fetchall(
"SELECT * FROM lncalendar.unavailable WHERE schedule = ?", (schedule_id,)
)
return [UnavailableTime(**row) for row in rows]