Skip to content

Commit

Permalink
add docstring in parent and rename symbol
Browse files Browse the repository at this point in the history
  • Loading branch information
zix-xiao committed Jan 26, 2024
1 parent e8d956c commit 3790a13
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 25 deletions.
2 changes: 1 addition & 1 deletion oktoberfest/qdaemon/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from parent import queueDaemon
from parent import queue_daemon
26 changes: 21 additions & 5 deletions oktoberfest/qdaemon/parent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,28 @@


def spawn(base_dir, sqlite_path):
"""
Spawns a worker process for the Oktoberfest application.
:param base_dir: The base directory of the application.
:param sqlite_path: The path to the SQLite database.
"""
OKworker(base_dir, sqlite_path)
sys.exit()


def queueDaemon():
def queue_daemon():
"""
Function to run the queue daemon.
This function continuously spawns worker processes to process jobs from a queue.
It monitors for termination signals and gracefully stops the server when required.
"""
os.chdir("/home/armin/projects/ok_ui/oktoberfest/oktoberfest/qdaemon")

sqlite_path = "/home/armin/projects/ok_ui/oktoberfest/oktoberfest/qdaemon/jobs.db"
sqlite_path = (
"/home/armin/projects/ok_ui/oktoberfest/oktoberfest/qdaemon/jobs.db"
)

killer = GracefulKiller()
processes = []
Expand All @@ -27,8 +41,10 @@ def queueDaemon():
alive.append(i)
processes = copy(alive)
alive = []
for i in range(0, workers - len(processes)):
p = multiprocessing.Process(target=spawn, args=("/baseDir", sqlite_path), daemon=True)
for _i in range(0, workers - len(processes)):
p = multiprocessing.Process(
target=spawn, args=("/baseDir", sqlite_path), daemon=True
)
p.start()
processes.append(p)
time.sleep(0.5)
Expand All @@ -41,4 +57,4 @@ def queueDaemon():


if __name__ == "__main__":
queueDaemon()
queue_daemon()
41 changes: 22 additions & 19 deletions oktoberfest/qdaemon/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,53 +12,58 @@ def __init__(self, base_dir, sqlite_path):
self.base_dir = Path(base_dir)
self.sqlite_path = Path(sqlite_path)
self.conn = sqlite3.connect(self.sqlite_path, isolation_level=None)
self.conn.execute('PRAGMA journal_mode=wal')
self.conn.execute("PRAGMA journal_mode=wal")
self.cursor = self.conn.cursor()
self.check_db()

def startTransaction(self):
isDone = False
while not isDone:
def start_transaction(self):
is_done = False
while not is_done:
try:
# self.conn.execute("PRAGMA locking_mode = RESERVED")
self.cursor.execute("BEGIN EXCLUSIVE TRANSACTION")
isDone = True
is_done = True
except sqlite3.OperationalError:
print("Lock", self.pid)
time.sleep(5)
def endTransaction(self):

def end_transaction(self):
self.cursor.execute("COMMIT")
# self.conn.execute("PRAGMA locking_mode = NORMAL")
# try:
# self.cursor.execute("SELECT COUNT(*) FROM JOBS")
# except sqlite3.OperationalError:
# pass
self.conn.commit()

# def endTransaction(self):
# self.cursor.execute("COMMIT")
# self.conn.execute("PRAGMA locking_mode = NORMAL")
# self.cursor.execute("")

def check_db(self):
# Execute your command here
self.startTransaction()
self.cursor.execute("SELECT * FROM JOBS WHERE status='PENDING' ORDER BY ID ASC LIMIT 1;")
self.start_transaction()
self.cursor.execute(
"SELECT * FROM JOBS WHERE status='PENDING' ORDER BY ID ASC"
" LIMIT 1;"
)

# Fetch the results
result = self.cursor.fetchone()

if result:
print(f"Job found: {result}")
config_path = Path(self.base_dir, result[1], "config.json").resolve()
config_path = Path(
self.base_dir, result[1], "config.json"
).resolve()
if not config_path.exists():
self.cursor.execute(
f"""
UPDATE JOBS SET STATUS='FAILED' WHERE ID={result[0]}
"""
)
self.endTransaction()
self.end_transaction()
self.suicide("ConfigNotFound")
return -1
# Update database
Expand All @@ -80,12 +85,10 @@ def check_db(self):

def run_ok(self, result, config_path):
try:
run_job(
config_path=config_path
)
run_job(config_path=config_path)
outFolder = Path("/output")
zip_folder(outFolder, outFolder)
self.startTransaction()
self.start_transaction()
self.cursor.execute(
f"""
UPDATE JOBS SET STATUS='DONE' WHERE ID={result[0]}
Expand All @@ -94,7 +97,7 @@ def run_ok(self, result, config_path):
self.cursor.execute("COMMIT")
self.conn.commit()
except Exception as e:
self.startTransaction()
self.start_transaction()
self.cursor.execute(
f"""
UPDATE JOBS SET STATUS='FAILED' WHERE ID={result[0]}
Expand All @@ -109,7 +112,7 @@ def suicide(self, reason):
# Cleanup, report, remove PID and suicide
print(f"Suicide triggered because {reason}")
self.conn.close()

def __del__(self):
self.conn.close()

Expand Down

0 comments on commit 3790a13

Please sign in to comment.