-
Notifications
You must be signed in to change notification settings - Fork 0
/
database.py
333 lines (283 loc) · 12.9 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
import logging
import typing
import requests
import telegram
from sqlalchemy import Column, ForeignKey, UniqueConstraint
from sqlalchemy import Integer, BigInteger, String, Text, LargeBinary, DateTime, Boolean,Index
from sqlalchemy.ext.declarative import declarative_base, DeferredReflection
from sqlalchemy.orm import relationship, backref
import utils
if typing.TYPE_CHECKING:
import worker
log = logging.getLogger(__name__)
# Create a base class to define all the database subclasses
TableDeclarativeBase = declarative_base()
# Define all the database tables using the sqlalchemy declarative base
class User(DeferredReflection, TableDeclarativeBase):
"""A Telegram user who used the bot at least once."""
# Telegram data
user_id = Column(BigInteger, primary_key=True)
first_name = Column(String, nullable=False)
last_name = Column(String)
username = Column(String)
language = Column(String, nullable=False)
transactions = relationship("Transaction", back_populates="user")
orders = relationship("Order", back_populates="user")
# Current wallet credit
credit = Column(Integer, nullable=False)
# Extra table parameters
__tablename__ = "users"
def __init__(self, w: "worker.Worker", **kwargs):
# Initialize the super
super().__init__(**kwargs)
# Get the data from telegram
self.user_id = w.telegram_user.id
self.first_name = w.telegram_user.first_name
self.last_name = w.telegram_user.last_name
self.username = w.telegram_user.username
if w.telegram_user.language_code:
self.language = w.telegram_user.language_code
else:
self.language = w.cfg["Language"]["default_language"]
# The starting wallet value is 0
self.credit = 0
def __str__(self):
"""Describe the user in the best way possible given the available data."""
if self.username is not None:
return f"@{self.username}"
elif self.last_name is not None:
return f"{self.first_name} {self.last_name}"
else:
return self.first_name
def identifiable_str(self):
"""Describe the user in the best way possible, ensuring a way back to the database record exists."""
return f"user_{self.user_id} ({str(self)})"
def mention(self):
"""Mention the user in the best way possible given the available data."""
if self.username is not None:
return f"@{self.username}"
else:
return f"[{self.first_name}](tg://user?id={self.user_id})"
def recalculate_credit(self):
"""Recalculate the credit for this user by calculating the sum of the values of all their transactions."""
valid_transactions: typing.List[Transaction] = [t for t in self.transactions if not t.refunded]
self.credit = sum(map(lambda t: t.value, valid_transactions))
@property
def full_name(self):
if self.last_name:
return f"{self.first_name} {self.last_name}"
else:
return self.first_name
def __repr__(self):
return f"<User {self.mention()} having {self.credit} credit>"
Index('user_id_idx',User.user_id)
class Product(DeferredReflection, TableDeclarativeBase):
"""A purchasable product."""
# Product id
id = Column(Integer, primary_key=True)
# Category id
category_id = Column(Integer, ForeignKey("categories.id"))
# Product name
name = Column(String)
# Product description
description = Column(Text)
# Product price, if null product is not for sale
price = Column(Integer)
# Image data
image = Column(LargeBinary)
# Product has been deleted
deleted = Column(Boolean, nullable=False)
# Extra table parameters
__tablename__ = "products"
# No __init__ is needed, the default one is sufficient
def text(self, w: "worker.Worker", *, style: str = "full", cart_qty: int = None):
"""Return the product details formatted with Telegram HTML. The image is omitted."""
if style == "short":
return f"{cart_qty}x {utils.telegram_html_escape(self.name)} - {str(w.Price(self.price) * cart_qty)}"
elif style == "full":
if cart_qty is not None:
cart = w.loc.get("in_cart_format_string", quantity=cart_qty)
else:
cart = ''
return w.loc.get("product_format_string", name=utils.telegram_html_escape(self.name),
description=utils.telegram_html_escape(self.description),
price=str(w.Price(self.price)),
cart=cart)
else:
raise ValueError("style is not an accepted value")
def __repr__(self):
return f"<Product {self.name}>"
def send_as_message(self, w: "worker.Worker", chat_id: int) -> dict:
"""Send a message containing the product data."""
if self.image is None:
r = requests.get(f"https://api.telegram.org/bot{w.cfg['Telegram']['token']}/sendMessage",
params={"chat_id": chat_id,
"text": self.text(w),
"parse_mode": "HTML"})
else:
r = requests.post(f"https://api.telegram.org/bot{w.cfg['Telegram']['token']}/sendPhoto",
files={"photo": self.image},
params={"chat_id": chat_id,
"caption": self.text(w),
"parse_mode": "HTML"})
return r.json()
def set_image(self, file: telegram.File):
"""Download an image from Telegram and store it in the image column.
This is a slow blocking function. Try to avoid calling it directly, use a thread if possible."""
# Download the photo through a get request
r = requests.get(file.file_path)
# Store the photo in the database record
self.image = r.content
# index the column
Index('id_idx',Product.id)
class Category(DeferredReflection, TableDeclarativeBase):
"""A category for products."""
# Category id
id = Column(Integer, primary_key=True)
# Category name
name = Column(String)
# Category has been deleted
deleted = Column(Boolean, nullable=False)
# Extra table parameters
__tablename__ = "categories"
# index the column
Index('category_id_idx',Category.id)
class Transaction(DeferredReflection, TableDeclarativeBase):
"""A greed wallet transaction.
Wallet credit ISN'T calculated from these, but they can be used to recalculate it."""
# TODO: split this into multiple tables
# The internal transaction ID
transaction_id = Column(Integer, primary_key=True)
# The user whose credit is affected by this transaction
user_id = Column(BigInteger, ForeignKey("users.user_id"), nullable=False)
user = relationship("User", back_populates="transactions")
# The value of this transaction. Can be both negative and positive.
value = Column(Integer, nullable=False)
# Refunded status: if True, ignore the value of this transaction when recalculating
refunded = Column(Boolean, default=False)
# Extra notes on the transaction
notes = Column(Text)
# Payment provider
provider = Column(String)
# Transaction ID supplied by Telegram
telegram_charge_id = Column(String)
# Transaction ID supplied by the payment provider
provider_charge_id = Column(String)
# Extra transaction data, may be required by the payment provider in case of a dispute
payment_name = Column(String)
payment_phone = Column(String)
payment_email = Column(String)
# Order ID
order_id = Column(Integer, ForeignKey("orders.order_id"))
order = relationship("Order", back_populates="transaction")
# Extra table parameters
__tablename__ = "transactions"
__table_args__ = (UniqueConstraint("provider", "provider_charge_id"),)
def text(self, w: "worker.Worker"):
string = f"<b>T{self.transaction_id}</b> | {str(self.user)} | {w.Price(self.value)}"
if self.refunded:
string += f" | {w.loc.get('emoji_refunded')}"
if self.provider:
string += f" | {self.provider}"
if self.notes:
string += f" | {self.notes}"
return string
def __repr__(self):
return f"<Transaction {self.transaction_id} for User {self.user_id}>"
# index the column
Index('transaction_id_idx',Transaction.transaction_id)
class Admin(DeferredReflection, TableDeclarativeBase):
"""A greed administrator with his permissions."""
# The telegram id
user_id = Column(BigInteger, ForeignKey("users.user_id"), primary_key=True)
user = relationship("User")
# Permissions
edit_products = Column(Boolean, default=False)
edit_categories = Column(Boolean, default=False)
receive_orders = Column(Boolean, default=False)
create_transactions = Column(Boolean, default=False)
display_on_help = Column(Boolean, default=False)
is_owner = Column(Boolean, default=False)
# Live mode enabled
live_mode = Column(Boolean, default=False)
# Extra table parameters
__tablename__ = "admins"
def __repr__(self):
return f"<Admin {self.user_id}>"
# index the column
Index('admin_id_idx',Admin.user_id)
class Order(DeferredReflection, TableDeclarativeBase):
"""An order which has been placed by an user.
It may include multiple products, available in the OrderItem table."""
# The unique order id
order_id = Column(Integer, primary_key=True)
# The user who placed the order
user_id = Column(BigInteger, ForeignKey("users.user_id"))
user = relationship("User")
# Date of creation
creation_date = Column(DateTime, nullable=False)
# Date of delivery
delivery_date = Column(DateTime)
# Date of refund: if null, product hasn't been refunded
refund_date = Column(DateTime)
# Refund reason: if null, product hasn't been refunded
refund_reason = Column(Text)
# List of items in the order
items: typing.List["OrderItem"] = relationship("OrderItem")
# Extra details specified by the purchasing user
notes = Column(Text)
# Linked transaction
transaction = relationship("Transaction", uselist=False)
# Extra table parameters
__tablename__ = "orders"
def __repr__(self):
return f"<Order {self.order_id} placed by User {self.user_id}>"
def text(self, w: "worker.Worker", session, user=False):
joined_self = session.query(Order).filter_by(order_id=self.order_id).join(Transaction).one()
items = ""
for item in self.items:
items += item.text(w) + "\n"
if self.delivery_date is not None:
status_emoji = w.loc.get("emoji_completed")
status_text = w.loc.get("text_completed")
elif self.refund_date is not None:
status_emoji = w.loc.get("emoji_refunded")
status_text = w.loc.get("text_refunded")
else:
status_emoji = w.loc.get("emoji_not_processed")
status_text = w.loc.get("text_not_processed")
if user and w.cfg["Appearance"]["full_order_info"] == "no":
return w.loc.get("user_order_format_string",
status_emoji=status_emoji,
status_text=status_text,
items=items,
notes=self.notes,
value=str(w.Price(-joined_self.transaction.value))) + \
(w.loc.get("refund_reason", reason=self.refund_reason) if self.refund_date is not None else "")
else:
return status_emoji + " " + \
w.loc.get("order_number", id=self.order_id) + "\n" + \
w.loc.get("order_format_string",
user=self.user.mention(),
date=self.creation_date.isoformat(),
items=items,
notes=self.notes if self.notes is not None else "",
value=str(w.Price(-joined_self.transaction.value))) + \
(w.loc.get("refund_reason", reason=self.refund_reason) if self.refund_date is not None else "")
# index the column
Index('order_id_idx',Order.order_id)
class OrderItem(DeferredReflection, TableDeclarativeBase):
"""A product that has been purchased as part of an order."""
# The unique item id
item_id = Column(Integer, primary_key=True)
# The product that is being ordered
product_id = Column(Integer, ForeignKey("products.id"), nullable=False)
product = relationship("Product")
# The order in which this item is being purchased
order_id = Column(Integer, ForeignKey("orders.order_id"), nullable=False)
# Extra table parameters
__tablename__ = "orderitems"
def text(self, w: "worker.Worker"):
return f"{self.product.name} - {str(w.Price(self.product.price))}"
def __repr__(self):
return f"<OrderItem {self.item_id}>"