From 6c9a7a4a3bc9c468aea62aeb17ad2279525de942 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 27 Aug 2024 14:46:16 +0200 Subject: [PATCH] add function descriptions --- pilot/util/lokirealtimelogger.py | 105 ++++++++++++++++++++++++------- 1 file changed, 81 insertions(+), 24 deletions(-) diff --git a/pilot/util/lokirealtimelogger.py b/pilot/util/lokirealtimelogger.py index 93075cdf..9c76b66c 100644 --- a/pilot/util/lokirealtimelogger.py +++ b/pilot/util/lokirealtimelogger.py @@ -27,18 +27,21 @@ import logging import os import queue -import requests import threading import time +import requests logger = logging.getLogger(__name__) class PilotLokiLoggerFormatter: - def __init__(self): - pass - def format(self, record): + """ + Logging format function to convert the logging record to a dict format. + + :param record (LogRecord): Logging record + """ + formatted = { "timestamp": record.created, "process": record.process, @@ -76,12 +79,11 @@ def format(self, record): else: formatted['message'] = message except Exception as ex: - logger.warn(f"Format exception: {ex}") + logger.warning(f"Format exception: {ex}") return formatted class PilotLokiLoggerHandler(logging.Handler): - _success_response_code = 204 def __init__( self, @@ -93,8 +95,22 @@ def __init__( formatter=PilotLokiLoggerFormatter(), verbose=False ): + """ + Default Loki logging handler init function. + + :param url: The url of the Loki rest service. + :param label_keys: List of keys that are allowed as labels sent to the Loki service. + If empty, all keys will be allowed. + :param timeout: The time period to sleep between callings to the Loki service. + :param name: The name during sending logs to the Loki service. + :param compressed: Whether to compress the message during sending messages to the Loki service. + :param formatter: The logging formatter. + :param verbose: Whether to print logs. + """ super().__init__() + self._success_response_code = 204 + self.url = url self.label_keys = label_keys self.compressed = compressed @@ -104,17 +120,26 @@ def __init__( self.default_keys = {"namespace": "usdf-panda", "app": name, "env": "production"} self.queue = queue.Queue() - self._lock = threading.RLock() self._graceful_stop = threading.Event() self.session = requests.Session() self._thread = threading.Thread(target=self._runner, daemon=True) self._thread.start() def emit(self, record): + """ + Override the logging.hander emit function to handle logging messages. + + :param record (LogRecord): Logging record. + """ msg = self.formatter.format(record) self.queue.put(msg) def _sleep(self, timeout=10): + """ + A sleep function which can be interrupted. + + :param timeout (int): The number of seconds to sleep. + """ time_start = time.time() while not self._graceful_stop.is_set(): if time.time() - time_start > timeout: @@ -122,10 +147,24 @@ def _sleep(self, timeout=10): time.sleep(1) def stop(self): + """ + Stop the logging handler thread and send the queued messages. + """ self._graceful_stop.set() - self._send() + self._flush() def _send(self, data): + """ + Send the data to the Loki service. + + :param data (dict): The stream data. + The format is a dict {"streams": [{"stream": {"label1": "value1"}, + "values": [{"timestamp", "message"}, + {"timestamp", "message"} + ] + }] + } + """ response = None try: headers = {"Content-type": "application/json"} @@ -134,24 +173,30 @@ def _send(self, data): data = gzip.compress(bytes(data, "utf-8")) if self.verbose: - logger.warn(f"url: {self.url}, headers: {headers}, data: {data}") + logger.warning(f"url: {self.url}, headers: {headers}, data: {data}") response = self.session.post(self.url, data=data, headers=headers) if response.status_code != self._success_response_code: err = f"Failed to send logs: {response.status_code}, {response.text}" raise Exception(err) except Exception as e: - raise Exception(f"Error while sending logs: {e}") + logger.warning(f"Error while sending logs: {e}") + raise e finally: if response: response.close() def format_stream_messages(self, msgs): + """ + Format stream messages. + + :param msgs (list): List of messages. + """ streams = {} for msg in msgs: - for k in self.default_keys: + for k, v in self.default_keys.items(): if k not in msg: - msg[k] = self.default_keys[k] + msg[k] = v ts = str(int(msg.get("timestamp") * 1e9)) msg.pop("timestamp") @@ -160,11 +205,11 @@ def format_stream_messages(self, msgs): # only allowed labels will be put into the stream keys = {} others = {} - for k in msg: + for k, v in msg.items(): if k in self.label_keys: - keys[k] = msg[k] + keys[k] = v else: - others[k] = msg[k] + others[k] = v message = others else: message = msg['message'] @@ -176,19 +221,24 @@ def format_stream_messages(self, msgs): if key not in streams: stream = {k: str(msg[k]) for k in sorted_keys} streams[key] = {'stream': stream, 'values': []} + if type(message) in [dict] and len(message.keys()) == 1 and list(message.keys())[0] == "message": - message = message["message"] + f_message = message["message"] elif type(message) not in [str]: - message = json.dumps(message) - streams[key]['values'].append([ts, message]) + f_message = json.dumps(message) + else: + f_message = message + streams[key]['values'].append([ts, f_message]) data = {"streams": []} - for key in streams: - stream = streams[key] - data['streams'].append(stream) + for key, value in streams.items(): + data['streams'].append(value) return json.dumps(data) def _flush(self): + """ + Flush queued messages. + """ msgs = [] while not self.queue.empty(): msg = self.queue.get() @@ -199,12 +249,15 @@ def _flush(self): stream_msgs = self.format_stream_messages(msgs) self._send(stream_msgs) except Exception as ex: - logger.warn(f"Failed for sending message: {ex}") + logger.warning(f"Failed for sending message: {ex}") # put messages back for msg in msgs: self.queue.put(msg) def _runner(self): + """ + The function for the runner thread which flushes messages in period. + """ atexit.register(self._flush) while not self._graceful_stop.is_set(): @@ -213,7 +266,11 @@ def _runner(self): def setup_loki_handler(name): - """Setup the Loki logger handler.""" + """ + Setup the Loki logger handler. + + :param name (str): The name of the loki logging handler. + """ loki_labelkeys = None try: @@ -239,7 +296,7 @@ def setup_loki_handler(name): loki_verbose = False _handler = PilotLokiLoggerHandler( - url=os.environ["LOKI_URL"], + url=os.environ.get("LOKI_URL", None), label_keys=loki_labelkeys, timeout=loki_period, formatter=PilotLokiLoggerFormatter(),