diff --git a/fmatch/requirements.txt b/fmatch/requirements.txt index 74ccdbd..adc7dfb 100644 --- a/fmatch/requirements.txt +++ b/fmatch/requirements.txt @@ -22,6 +22,7 @@ mdurl==0.1.2 more-itertools==10.2.0 nh3==0.2.15 numpy==1.26.3 +orjson==3.10.7 packaging==23.2 pandas==2.1.4 pip-name==1.0.2 @@ -45,6 +46,7 @@ requests-toolbelt==1.0.0 rfc3986==2.0.0 rich==13.7.0 six==1.16.0 +splunk-sdk==2.0.2 tomlkit==0.12.3 twine==4.0.2 tzdata==2023.4 diff --git a/fmatch/splunk_matcher.py b/fmatch/splunk_matcher.py new file mode 100644 index 0000000..b175483 --- /dev/null +++ b/fmatch/splunk_matcher.py @@ -0,0 +1,71 @@ +# pylint: disable = C0209, R0913, E0401 +""" +Matcher for splunk datasource +""" +from typing import Dict, Any +import orjson +from splunklib import client, results + + +class SplunkMatcher: + """Splunk data source matcher""" + + def __init__( + self, host: str, port: int, username: str, password: str, indice: str + ): # pylint: disable = R0917 + self.indice = indice + self.service = client.connect( + host=host, port=port, username=username, password=password + ) + + async def query( + self, query: Dict[Any, Any], searchList: str = "", max_results: int = 10000 + ): + """ + Query data from splunk server using splunk lib sdk + + Args: + query (string): splunk query + OPTIONAL: searchList (string): additional query parameters for index + """ + query["count"] = max_results + + # If additional search parameters are provided, include those in searchindex + searchindex = ( + "search index={} {}".format(self.indice, searchList) + if searchList + else "search index={}".format(self.indice) + ) + try: + oneshotsearch_results = self.service.jobs.oneshot(searchindex, **query) + except Exception as e: # pylint: disable = W0718 + print("Error querying splunk: {}".format(e)) + return None + + # Get the results and display them using the JSONResultsReader + res_array = [] + async for record in self._stream_results(oneshotsearch_results): + try: + res_array.append( + { + "data": orjson.loads(record["_raw"]), # pylint: disable = E1101 + "host": record["host"], + "source": record["source"], + "sourcetype": record["sourcetype"], + "bucket": record["_bkt"], + "serial": record["_serial"], + "timestamp": record["_indextime"], + } + ) + except Exception as e: # pylint: disable = W0718 + print(f"Error on including Splunk record query in results array: {e}") + + return res_array + + async def _stream_results(self, oneshotsearch_results: Any) -> Any: + for record in results.JSONResultsReader(oneshotsearch_results): + yield record + + async def close(self): + """Closes splunk client connections""" + await self.service.logout() diff --git a/setup.py b/setup.py index 9b53d63..0531bbd 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ long_description_content_type="text/x-rst", long_description=LONG_DESCRIPTION, packages=find_packages(), - install_requires=['elasticsearch==7.13.0', 'elasticsearch-dsl', 'pyyaml','pandas'], + install_requires=['elasticsearch==7.13.0', 'elasticsearch-dsl', 'pyyaml','pandas', 'orjson', 'splunk-sdk'], keywords=['python', 'matching', 'red hat', 'perf-scale', 'matcher', 'orion'], classifiers=[ "Development Status :: 1 - Planning",