-
Notifications
You must be signed in to change notification settings - Fork 2
/
app.py
101 lines (79 loc) · 2.96 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/usr/bin/env python3
import argparse
import locale
import logging
import requests
from src import consts
from src.bot import SysBlokBot
from src.config_manager import ConfigManager
from src.scheduler import JobScheduler
from src.tg.sender import TelegramSender
from src.utils.log_handler import ErrorBroadcastHandler
from src.utils.uptrace_logger import add_uptrace_logging
import nest_asyncio
nest_asyncio.apply()
locale.setlocale(locale.LC_TIME, "ru_RU.UTF-8")
logging.basicConfig(format=consts.LOG_FORMAT, level=logging.INFO)
parser = argparse.ArgumentParser()
# maybe we'll move those to config.json later...
parser.add_argument(
"--skip-db-update", help="Skip db update on startup", action="store_true"
)
def get_bot():
"""
All singleton classes must be initialized within this method before bot
actually launched. This includes clients, config manager and scheduler.
"""
config_manager = ConfigManager(consts.CONFIG_PATH, consts.CONFIG_OVERRIDE_PATH)
config = config_manager.load_config_with_override()
if not config:
raise ValueError("Could not load config, can't go on")
scheduler = JobScheduler()
jobs_config_file_key = ConfigManager().get_jobs_config_file_key()
if jobs_config_file_key is None:
raise Exception("No jobs config file key provided")
args = parser.parse_args()
bot = SysBlokBot(
config_manager,
signal_handler=lambda signum, frame: scheduler.stop_running(),
skip_db_update=args.skip_db_update,
)
bot.init_handlers()
jobs_config_json = bot.app_context.drive_client.download_json(jobs_config_file_key)
config_jobs = ConfigManager().set_jobs_config_with_override_from_json(
jobs_config_json
)
if not config_jobs:
raise ValueError("Could not load job config, can't go on")
# Setting final logger and sending a message bot is up
tg_sender = TelegramSender()
for handler in logging.getLogger().handlers:
logging.getLogger().removeHandler(handler)
logging.getLogger().addHandler(ErrorBroadcastHandler(tg_sender))
if consts.UPTRACE_DSN:
add_uptrace_logging(consts.UPTRACE_DSN)
# Scheduler must be run after clients initialized
scheduler.run()
scheduler.init_jobs()
start_msg = f"[{consts.APP_SOURCE}] Bot successfully started"
if consts.COMMIT_HASH:
start_msg += (
f', revision <a href="{consts.COMMIT_URL}">{consts.COMMIT_HASH}</a>.'
)
tg_sender.send_important_event(start_msg)
return bot
def report_critical_error(e: BaseException):
requests.post(
url=f"https://api.telegram.org/bot{consts.TELEGRAM_TOKEN}/sendMessage",
json={
"text": f"Sysblokbot is down, {e}\n",
"chat_id": consts.TELEGRAM_ERROR_CHAT_ID,
"parse_mode": "markdown",
},
)
if __name__ == "__main__":
try:
get_bot().run()
except BaseException as e:
print(e.with_traceback())
report_critical_error(e)