Skip to content

Commit

Permalink
Merge pull request #11 from PanDAWMS/flin
Browse files Browse the repository at this point in the history
v2.0.2
  • Loading branch information
mightqxc authored Mar 25, 2024
2 parents 2d863e3 + f2f8225 commit ab07c98
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 37 deletions.
23 changes: 8 additions & 15 deletions bin/hgcs_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,6 @@
# sys.path.insert(0, _LIB_PATH)


# ===============================================================

LOG_LEVEL_MAP = {
"ERROR": logging.ERROR,
"WARNING": logging.WARNING,
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
}

# ===============================================================


def main():
"""
main function
Expand Down Expand Up @@ -91,15 +79,20 @@ def main():
"flush_period": getattr(section, "flush_period", None),
"grace_period": getattr(section, "grace_period", None),
"limit": getattr(section, "limit", None),
"logger_format_colored": logger_format_colored,
"log_level": log_level,
"log_file": log_file,
}
agent_instance = class_obj(**param_dict)
utils.setup_logger(agent_instance.logger, pid=agent_instance.get_pid, colored=logger_format_colored, to_file=log_file)
logging_log_level = LOG_LEVEL_MAP.get(log_level, logging.ERROR)
agent_instance.logger.setLevel(logging_log_level)
thread_list.append(agent_instance)
# master log
main_logger = logging.getLogger("hgcs_main")
utils.setup_logger(main_logger, pid=os.getpid(), colored=logger_format_colored, to_file=log_file)
main_logger.info("This is HGCS")
# run threads
for thr in thread_list:
print(f"Start thread of agent {thr.__class__.__name__}")
main_logger.info(f"Start thread of agent {thr.__class__.__name__}")
thr.start()


Expand Down
23 changes: 19 additions & 4 deletions lib/hgcs/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ def __init__(self, flush_period=86400, retrieve_mode="copy", **kwarg):
self.retrieve_mode = retrieve_mode

def run(self):
self.set_logger()
self.logger.info("agent starts")
self.logger.debug(f"startTimestamp: {self.start_timestamp}")
already_handled_job_id_set = set()
last_flush_timestamp = time.time()
Expand All @@ -86,6 +88,7 @@ def run(self):
else:
self.logger.error(f"{exc} . No more retry. Exit")
return
n_new_handled_jobs = 0
for job in schedd.xquery(constraint=self.requirements, projection=self.projection):
job_id = get_condor_job_id(job)
if job_id in already_handled_job_id_set:
Expand All @@ -97,6 +100,7 @@ def run(self):
ret_val = self.via_system(job)
if ret_val:
already_handled_job_id_set.add(job_id)
n_new_handled_jobs += 1
elif self.retrieve_mode == "condor":
self.via_condor_retrieve(job)
n_try = 3
Expand All @@ -112,7 +116,7 @@ def run(self):
else:
already_handled_job_id_set.clear()
break
self.logger.info("run ends")
self.logger.info(f"run ends; handled {n_new_handled_jobs} jobs")
time.sleep(self.sleep_period)

def via_system(self, job, symlink_mode=False):
Expand Down Expand Up @@ -204,6 +208,8 @@ def __init__(self, sleep_period=60, delay_time=7200):
self.delay_time = delay_time

def run(self):
self.set_logger()
self.logger.info("agent starts")
self.logger.debug(f"startTimestamp: {self.start_timestamp}")
while True:
self.logger.info("run starts")
Expand Down Expand Up @@ -267,6 +273,8 @@ def __init__(self, flush_period=86400, limit=6000, **kwarg):
self.limit = 6000

def run(self):
self.set_logger()
self.logger.info("agent starts")
self.logger.debug(f"startTimestamp: {self.start_timestamp}")
already_handled_job_id_set = set()
last_flush_timestamp = time.time()
Expand All @@ -290,6 +298,8 @@ def run(self):
return
already_sdf_copied_job_id_set = set()
to_skip_sdf_copied_job_id_set = set()
n_new_handled_jobs = 0
n_new_skipped_jobs = 0
try:
jobs_iter = schedd.xquery(constraint=self.requirements, projection=self.projection, limit=self.limit)
for job in jobs_iter:
Expand All @@ -300,8 +310,10 @@ def run(self):
ret_val = self.via_system(job)
if ret_val is True:
already_sdf_copied_job_id_set.add(job_id)
n_new_handled_jobs += 1
elif ret_val is False:
to_skip_sdf_copied_job_id_set.add(job_id)
n_new_skipped_jobs += 1
except RuntimeError as exc:
self.logger.error(f"Failed to query jobs. Exit. RuntimeError: {exc} ")
else:
Expand Down Expand Up @@ -333,7 +345,7 @@ def run(self):
already_handled_job_id_set.update(to_skip_sdf_copied_job_id_set)
to_skip_sdf_copied_job_id_set.clear()
break
self.logger.info("run ends")
self.logger.info(f"run ends; handled {n_new_handled_jobs} jobs, skipped {n_new_skipped_jobs} jobs")
time.sleep(self.sleep_period)

def via_system(self, job):
Expand Down Expand Up @@ -397,6 +409,8 @@ def __init__(self, grace_period=86400, **kwarg):
self.grace_period = grace_period

def run(self):
self.set_logger()
self.logger.info("agent starts")
self.logger.debug(f"startTimestamp: {self.start_timestamp}")
while True:
self.logger.info("run starts")
Expand All @@ -412,6 +426,7 @@ def run(self):
else:
self.logger.error(f"{exc} . No more retry. Exit")
return
res_str = str(None)
try:
requirements = self.requirements_template.format(grace_period=int(self.grace_period))
self.logger.debug("try to remove-x jobs")
Expand All @@ -420,6 +435,6 @@ def run(self):
except RuntimeError as exc:
self.logger.error(f"Failed to remove-x jobs. Exit. RuntimeError: {exc} ")
else:
self.logger.debug(f"act return : {str(dict(act_ret))}")
self.logger.info("run ends")
res_str = str(dict(act_ret))
self.logger.info(f"run ends; return: {res_str}")
time.sleep(self.sleep_period)
28 changes: 22 additions & 6 deletions lib/hgcs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,28 @@
import threading
import time

try:
from threading import get_ident
except ImportError:
from thread import get_ident
from threading import get_ident

import htcondor


# ===============================================================

global_lock = threading.Lock()

# ===============================================================

# ===============================================================

LOG_LEVEL_MAP = {
"ERROR": logging.ERROR,
"WARNING": logging.WARNING,
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
}

# ===============================================================


def setup_logger(logger, pid=None, colored=True, to_file=None):
"""
Expand Down Expand Up @@ -68,14 +77,21 @@ class ThreadBase(threading.Thread):
base class of thread to run HGCS agents
"""

def __init__(self, sleep_period=60, **kwarg):
def __init__(self, sleep_period=60, **kwargs):
threading.Thread.__init__(self)
self.os_pid = os.getpid()
self.logger = logging.getLogger(self.__class__.__name__)
self.sleep_period = sleep_period
self.start_timestamp = time.time()
self.logger_format_colored = kwargs.get("logger_format_colored")
self.log_level = kwargs.get("log_level")
self.log_file = kwargs.get("log_file")

def set_logger(self):
setup_logger(self.logger, pid=self.get_pid(), colored=self.logger_format_colored, to_file=self.log_file)
logging_log_level = LOG_LEVEL_MAP.get(self.log_level, logging.ERROR)
self.logger.setLevel(logging_log_level)

@property
def get_pid(self):
"""
get unique thread identifier including process ID (from OS) and thread ID (from python)
Expand Down
2 changes: 1 addition & 1 deletion pkg_info.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "2.0.1"
release_version = "2.0.2"
4 changes: 1 addition & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ dependencies = [
]

[project.optional-dependencies]
kubernetes = ['kubernetes', 'pyyaml']
mysql = ['mysqlclient']
atlasgrid = ['uWSGI >= 2.0.20', 'htcondor >= 10.3.0', 'mysqlclient >= 2.1.1']
atlasgrid = ['htcondor >= 10.3.0']

[project.urls]
Homepage = "https://github.com/PanDAWMS/HGCS"
Expand Down
11 changes: 3 additions & 8 deletions temp/single_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,10 @@
import threading
import time

try:
from threading import get_ident
except ImportError:
from thread import get_ident
from threading import get_ident

import classad
import htcondor
from six import configparser

# ===============================================================

Expand Down Expand Up @@ -66,10 +62,9 @@ def __init__(self):
threading.Thread.__init__(self)
self.os_pid = os.getpid()
self.logger = logging.getLogger(self.__class__.__name__)
setupLogger(self.logger, pid=self.get_pid, colored=False)
setupLogger(self.logger, pid=self.get_pid(), colored=False)
self.start_timestamp = time.time()

@property
def get_pid(self):
return f"{self.os_pid}-{get_ident()}"

Expand Down Expand Up @@ -178,7 +173,7 @@ def via_system(self, job, symlink_mode=False):
name = match.group(1)
dest_path = os.path.normpath(match.group(2))
if name == src_log_name:
dest_log = osdest_path
dest_log = dest_path
elif name == src_out_name:
dest_out = dest_path
elif name == src_err_name:
Expand Down

0 comments on commit ab07c98

Please sign in to comment.