Skip to content

Commit

Permalink
Merge pull request #14 from sullivanmatt/AddDebugLogging
Browse files Browse the repository at this point in the history
Add debug output to library for troubleshooting
  • Loading branch information
sullivanmatt authored Apr 12, 2017
2 parents 7a13869 + a85df2e commit fb8ece5
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 10 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Example:
#timeout=60, # timeout for waiting on a 200 OK from Splunk server, defaults to 60s
#flush_interval=15.0, # send batches of log statements every n seconds, defaults to 15.0
#queue_size=5000, # a throttle to prevent resource overconsumption, defaults to 5000
#debug=False, # turn on debug mode; prints module activity to stdout, defaults to False
)

logging.getLogger('').addHandler(splunk)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name = 'splunk_handler',
version = '2.0.0',
version = '2.0.1',
license = 'MIT License',
description = 'A Python logging handler that sends your logs to Splunk',
long_description = open('README.md').read(),
Expand Down
69 changes: 60 additions & 9 deletions splunk_handler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class SplunkHandler(logging.Handler):
def __init__(self, host, port, token, index,
hostname=None, source=None, sourcetype='text',
verify=True, timeout=60, flush_interval=15.0,
queue_size=5000):
queue_size=5000, debug=False):

SplunkHandler.instances.append(self)
logging.Handler.__init__(self)
Expand All @@ -47,12 +47,17 @@ def __init__(self, host, port, token, index,
self.testing = False # Used for slightly altering logic during unit testing
# It is possible to get 'behind' and never catch up, so we limit the queue size
self.queue = Queue(maxsize=queue_size)
self.debug = debug

self.write_log("Starting debug mode", is_debug=True)

if hostname is None:
self.hostname = socket.gethostname()
else:
self.hostname = hostname

self.write_log("Preparing to override loggers", is_debug=True)

# prevent infinite recursion by silencing requests and urllib3 loggers
logging.getLogger('requests').propagate = False

Expand All @@ -63,20 +68,42 @@ def __init__(self, host, port, token, index,
if not self.verify:
requests.packages.urllib3.disable_warnings()

self.write_log("Preparing to spin off first worker thread Timer", is_debug=True)

# Start a worker thread responsible for sending logs
self.timer = Timer(self.flush_interval, self._splunk_worker)
self.timer.daemon = True # Auto-kill thread if main process exits
self.timer.start()

self.write_log("Class initialize complete", is_debug=True)

def write_log(self, log_message, is_debug=False):
if is_debug:
if self.debug:
print("[SplunkHandler DEBUG] " + log_message)
else:
print("[SplunkHandler] " + log_message)

def emit(self, record):
record = self.format_record(record)
self.write_log("emit() called", is_debug=True)

try:
record = self.format_record(record)
except Exception as e:
self.write_log("Exception in Splunk logging handler: %s" % str(e))
self.write_log(traceback.format_exc())
return

try:
self.write_log("Writing record to log queue", is_debug=True)
# Put log message into queue; worker thread will pick up
self.queue.put_nowait(record)
except Full:
print("Log queue full; log data will be dropped.")
self.write_log("Log queue full; log data will be dropped.")

def format_record(self, record):
self.write_log("format_record() called", is_debug=True)

if self.source is None:
source = record.pathname
else:
Expand All @@ -95,29 +122,42 @@ def format_record(self, record):
'event': self.format(record),
}

return json.dumps(params, sort_keys=True)
self.write_log("Record dictionary created", is_debug=True)

formatted_record = json.dumps(params, sort_keys=True)
self.write_log("Record formatting complete", is_debug=True)

return formatted_record

def _splunk_worker(self):
self.write_log("_splunk_worker() called", is_debug=True)

queue_empty = True

# Pull everything off the queue.
while not self.queue.empty():
self.write_log("Recursing through queue", is_debug=True)
try:
item = self.queue.get(block=False)
self.log_payload = self.log_payload + item
self.queue.task_done()
self.write_log("Queue task completed", is_debug=True)
except Empty:
pass
self.write_log("Queue was empty", is_debug=True)

# If the payload is getting very long, stop reading and send immediately.
if not self.SIGTERM and len(self.log_payload) >= 524288: # 50MB
queue_empty = False
self.write_log("Payload maximum size exceeded, sending immediately", is_debug=True)
break

if self.log_payload:
self.write_log("Payload available for sending", is_debug=True)
url = 'https://%s:%s/services/collector' % (self.host, self.port)
self.write_log("Destination URL is " + url, is_debug=True)

try:
self.write_log("Sending payload: " + self.log_payload, is_debug=True)
r = requests.post(
url,
data=self.log_payload,
Expand All @@ -126,30 +166,41 @@ def _splunk_worker(self):
timeout=self.timeout,
)
r.raise_for_status() # Throws exception for 4xx/5xx status
self.write_log("Payload sent successfully", is_debug=True)

except Exception as e:
try:
print(traceback.format_exc())
print("Exception in Splunk logging handler: %s" % str(e))
self.write_log("Exception in Splunk logging handler: %s" % str(e))
self.write_log(traceback.format_exc())
except:
pass
self.write_log("Exception encountered, but traceback could not be formatted", is_debug=True)

self.log_payload = ""
else:
self.write_log("Timer thread executed but no payload was available to send", is_debug=True)

# Restart the timer
timer_interval = self.flush_interval
if not self.SIGTERM:
if self.SIGTERM:
self.write_log("Timer reset aborted due to SIGTERM received", is_debug=True)
else:
if not queue_empty:
self.write_log("Queue not empty, scheduling timer to run immediately", is_debug=True)
timer_interval = 1.0 # Start up again right away if queue was not cleared

self.write_log("Resetting timer thread", is_debug=True)

self.timer = Timer(timer_interval, self._splunk_worker)
self.timer.daemon = True # Auto-kill thread if main process exits
self.timer.start()
self.write_log("Timer thread scheduled", is_debug=True)

def shutdown(self):
self.write_log("Immediate shutdown requested", is_debug=True)
self.SIGTERM = True
self.timer.cancel() # Cancels the scheduled Timer, allows exit immediatley

self.write_log("Starting up the final run of the worker thread before shutdown", is_debug=True)
# Send the remaining items that might be sitting in queue.
self._splunk_worker()

Expand Down
3 changes: 3 additions & 0 deletions tests/test_splunk_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
SPLUNK_TIMEOUT = 27
SPLUNK_FLUSH_INTERVAL = 5.0
SPLUNK_QUEUE_SIZE = 1111
SPLUNK_DEBUG = False

RECEIVER_URL = 'https://%s:%s/services/collector' % (SPLUNK_HOST, SPLUNK_PORT)

Expand Down Expand Up @@ -47,6 +48,7 @@ def setUp(self):
timeout=SPLUNK_TIMEOUT,
flush_interval=SPLUNK_FLUSH_INTERVAL,
queue_size=SPLUNK_QUEUE_SIZE,
debug=SPLUNK_DEBUG,
)
self.splunk.testing = True

Expand All @@ -68,6 +70,7 @@ def test_init(self):
self.assertEqual(self.splunk.timeout, SPLUNK_TIMEOUT)
self.assertEqual(self.splunk.flush_interval, SPLUNK_FLUSH_INTERVAL)
self.assertEqual(self.splunk.queue.maxsize, SPLUNK_QUEUE_SIZE)
self.assertEqual(self.splunk.debug, SPLUNK_DEBUG)

self.assertFalse(logging.getLogger('requests').propagate)
self.assertFalse(logging.getLogger('splunk_handler').propagate)
Expand Down

0 comments on commit fb8ece5

Please sign in to comment.