Skip to content

Commit

Permalink
add function descriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
wguanicedew committed Aug 27, 2024
1 parent 18e52d4 commit 6c9a7a4
Showing 1 changed file with 81 additions and 24 deletions.
105 changes: 81 additions & 24 deletions pilot/util/lokirealtimelogger.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,21 @@
import logging
import os
import queue
import requests
import threading
import time
import requests

logger = logging.getLogger(__name__)


class PilotLokiLoggerFormatter:
def __init__(self):
pass

def format(self, record):
"""
Logging format function to convert the logging record to a dict format.
:param record (LogRecord): Logging record
"""

formatted = {
"timestamp": record.created,
"process": record.process,
Expand Down Expand Up @@ -76,12 +79,11 @@ def format(self, record):
else:
formatted['message'] = message
except Exception as ex:
logger.warn(f"Format exception: {ex}")
logger.warning(f"Format exception: {ex}")
return formatted


class PilotLokiLoggerHandler(logging.Handler):
_success_response_code = 204

def __init__(
self,
Expand All @@ -93,8 +95,22 @@ def __init__(
formatter=PilotLokiLoggerFormatter(),
verbose=False
):
"""
Default Loki logging handler init function.
:param url: The url of the Loki rest service.
:param label_keys: List of keys that are allowed as labels sent to the Loki service.
If empty, all keys will be allowed.
:param timeout: The time period to sleep between callings to the Loki service.
:param name: The name during sending logs to the Loki service.
:param compressed: Whether to compress the message during sending messages to the Loki service.
:param formatter: The logging formatter.
:param verbose: Whether to print logs.
"""
super().__init__()

self._success_response_code = 204

self.url = url
self.label_keys = label_keys
self.compressed = compressed
Expand All @@ -104,28 +120,51 @@ def __init__(

self.default_keys = {"namespace": "usdf-panda", "app": name, "env": "production"}
self.queue = queue.Queue()
self._lock = threading.RLock()
self._graceful_stop = threading.Event()
self.session = requests.Session()
self._thread = threading.Thread(target=self._runner, daemon=True)
self._thread.start()

def emit(self, record):
"""
Override the logging.hander emit function to handle logging messages.
:param record (LogRecord): Logging record.
"""
msg = self.formatter.format(record)
self.queue.put(msg)

def _sleep(self, timeout=10):
"""
A sleep function which can be interrupted.
:param timeout (int): The number of seconds to sleep.
"""
time_start = time.time()
while not self._graceful_stop.is_set():
if time.time() - time_start > timeout:
break
time.sleep(1)

def stop(self):
"""
Stop the logging handler thread and send the queued messages.
"""
self._graceful_stop.set()
self._send()
self._flush()

def _send(self, data):
"""
Send the data to the Loki service.
:param data (dict): The stream data.
The format is a dict {"streams": [{"stream": {"label1": "value1"},
"values": [{"timestamp", "message"},
{"timestamp", "message"}
]
}]
}
"""
response = None
try:
headers = {"Content-type": "application/json"}
Expand All @@ -134,24 +173,30 @@ def _send(self, data):
data = gzip.compress(bytes(data, "utf-8"))

if self.verbose:
logger.warn(f"url: {self.url}, headers: {headers}, data: {data}")
logger.warning(f"url: {self.url}, headers: {headers}, data: {data}")
response = self.session.post(self.url, data=data, headers=headers)
if response.status_code != self._success_response_code:
err = f"Failed to send logs: {response.status_code}, {response.text}"
raise Exception(err)
except Exception as e:
raise Exception(f"Error while sending logs: {e}")
logger.warning(f"Error while sending logs: {e}")
raise e

finally:
if response:
response.close()

def format_stream_messages(self, msgs):
"""
Format stream messages.
:param msgs (list): List of messages.
"""
streams = {}
for msg in msgs:
for k in self.default_keys:
for k, v in self.default_keys.items():
if k not in msg:
msg[k] = self.default_keys[k]
msg[k] = v

ts = str(int(msg.get("timestamp") * 1e9))
msg.pop("timestamp")
Expand All @@ -160,11 +205,11 @@ def format_stream_messages(self, msgs):
# only allowed labels will be put into the stream
keys = {}
others = {}
for k in msg:
for k, v in msg.items():
if k in self.label_keys:
keys[k] = msg[k]
keys[k] = v
else:
others[k] = msg[k]
others[k] = v
message = others
else:
message = msg['message']
Expand All @@ -176,19 +221,24 @@ def format_stream_messages(self, msgs):
if key not in streams:
stream = {k: str(msg[k]) for k in sorted_keys}
streams[key] = {'stream': stream, 'values': []}

if type(message) in [dict] and len(message.keys()) == 1 and list(message.keys())[0] == "message":
message = message["message"]
f_message = message["message"]
elif type(message) not in [str]:
message = json.dumps(message)
streams[key]['values'].append([ts, message])
f_message = json.dumps(message)
else:
f_message = message
streams[key]['values'].append([ts, f_message])

data = {"streams": []}
for key in streams:
stream = streams[key]
data['streams'].append(stream)
for key, value in streams.items():
data['streams'].append(value)
return json.dumps(data)

def _flush(self):
"""
Flush queued messages.
"""
msgs = []
while not self.queue.empty():
msg = self.queue.get()
Expand All @@ -199,12 +249,15 @@ def _flush(self):
stream_msgs = self.format_stream_messages(msgs)
self._send(stream_msgs)
except Exception as ex:
logger.warn(f"Failed for sending message: {ex}")
logger.warning(f"Failed for sending message: {ex}")
# put messages back
for msg in msgs:
self.queue.put(msg)

def _runner(self):
"""
The function for the runner thread which flushes messages in period.
"""
atexit.register(self._flush)

while not self._graceful_stop.is_set():
Expand All @@ -213,7 +266,11 @@ def _runner(self):


def setup_loki_handler(name):
"""Setup the Loki logger handler."""
"""
Setup the Loki logger handler.
:param name (str): The name of the loki logging handler.
"""

loki_labelkeys = None
try:
Expand All @@ -239,7 +296,7 @@ def setup_loki_handler(name):
loki_verbose = False

_handler = PilotLokiLoggerHandler(
url=os.environ["LOKI_URL"],
url=os.environ.get("LOKI_URL", None),
label_keys=loki_labelkeys,
timeout=loki_period,
formatter=PilotLokiLoggerFormatter(),
Expand Down

0 comments on commit 6c9a7a4

Please sign in to comment.