Skip to content

Commit

Permalink
Merge pull request #96 from xtrojak/92-streaming-log
Browse files Browse the repository at this point in the history
Store logs during annotation
  • Loading branch information
hechth authored Apr 27, 2022
2 parents 9eb427f + 8766616 commit 7d82230
Show file tree
Hide file tree
Showing 18 changed files with 192 additions and 154 deletions.
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
**__pycache__
MSMetaEnhancer_*.log
.pytest_cache
.coverage
dist
MSMetaEnhancer.egg-info
.vscode
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [dev] - unreleased
### Added
* introduced `error` level into logging [#95](https://github.com/RECETOX/MSMetaEnhancer/issues/95)
* logging of unknown errors in Annotator [#90](https://github.com/RECETOX/MSMetaEnhancer/issues/90)
### Changed
* the log file is now written continuously during annotation and the metrics added at the end of the file [#92](https://github.com/RECETOX/MSMetaEnhancer/issues/92)
### Removed

## [0.2.1] - 2022-04-05
Expand Down
10 changes: 6 additions & 4 deletions MSMetaEnhancer/app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio

import aiohttp

from MSMetaEnhancer.libs.Annotator import Annotator
from MSMetaEnhancer.libs.Converter import Converter
from MSMetaEnhancer.libs.Curator import Curator
from MSMetaEnhancer.libs.Spectra import Spectra
from MSMetaEnhancer.libs.utils import logger
Expand All @@ -12,10 +14,9 @@


class Application:
def __init__(self, log_level='warning', log_file=None):
self.log_level = log_level
self.log_file = log_file
def __init__(self, log_level='info', log_file=None):
self.spectra = Spectra()
logger.setup(log_level, log_file)

def load_spectra(self, filename, file_format):
"""
Expand Down Expand Up @@ -78,6 +79,7 @@ async def annotate_spectra(self, converters, jobs=None, repeat=False, monitor=Mo
# create all possible jobs if not given
if not jobs:
jobs = []
converter: Converter
for converter in converters.values():
jobs += converter.get_conversion_functions()
jobs = convert_to_jobs(jobs)
Expand All @@ -90,4 +92,4 @@ async def annotate_spectra(self, converters, jobs=None, repeat=False, monitor=Mo
monitor.join()

self.spectra.spectrums = results
logger.write_log(self.log_level, self.log_file)
logger.write_metrics()
24 changes: 14 additions & 10 deletions MSMetaEnhancer/libs/Annotator.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import traceback

from MSMetaEnhancer.libs.Curator import Curator
from MSMetaEnhancer.libs.utils import logger
from MSMetaEnhancer.libs.utils.Errors import TargetAttributeNotRetrieved, SourceAttributeNotAvailable, \
ServiceNotAvailable
from MSMetaEnhancer.libs.utils.Logger import LogWarning
ServiceNotAvailable, UnknownResponse, DataAlreadyPresent
from MSMetaEnhancer.libs.utils.Logger import LogRecord


class Annotator:
Expand Down Expand Up @@ -31,7 +33,7 @@ async def annotate(self, spectra, jobs, repeat=False):
"""
metadata = spectra.metadata
cache = dict()
warning = LogWarning(dict(metadata))
log = LogRecord(dict(metadata))
logger.add_coverage_before(metadata.keys())

added_metadata = True
Expand All @@ -40,17 +42,19 @@ async def annotate(self, spectra, jobs, repeat=False):
for job in jobs:
if job.target not in metadata:
try:
metadata, cache = await self.execute_job_with_cache(job, metadata, cache, warning)
metadata, cache = await self.execute_job_with_cache(job, metadata, cache, log)
if repeat:
added_metadata = True
except SourceAttributeNotAvailable as exc:
warning.add_info(exc)
except Exception as exc:
warning.add_warning(type(exc)((f'{job}:\n' + str(exc))))
except (SourceAttributeNotAvailable, TargetAttributeNotRetrieved) as exc:
log.update(exc, job, level=3)
except (ServiceNotAvailable, UnknownResponse) as exc:
log.update(exc, job, level=2)
except Exception:
log.update(Exception(traceback.format_exc()), job, level=1)
else:
warning.add_info(f'{job}: Requested attribute {job.target} already present in given metadata.')
log.update(DataAlreadyPresent(f'Requested attribute {job.target} already present.'), job, level=2)

logger.add_warning(warning)
logger.add_logs(log)
logger.add_coverage_after(metadata.keys())

spectra.metadata = metadata
Expand Down
10 changes: 5 additions & 5 deletions MSMetaEnhancer/libs/Converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,22 @@ def create_top_level_conversion_methods(self, conversions, asynch=True):
for conversion in conversions:
create_top_level_method(self, *conversion, asynch)

def get_conversion_functions(self):
def get_conversion_functions(self) -> list:
"""
Method to compute all available conversion functions.
Assumes that the functions always have from {source}_to_{target}
:return: a list of available conversion functions
"""
jobs = []
available_conversions = []
methods = [method_name for method_name in dir(self) if '_to_' in method_name]
for method in methods:
jobs.append((*method.split('_to_'), self.converter_name))
return jobs
available_conversions.append((*method.split('_to_'), self.converter_name))
return available_conversions


def create_top_level_method(obj, source, target, method, asynch=True):
def create_top_level_method(obj: Converter, source: str, target: str, method: str, asynch: bool = True):
"""
Assign a new method to {obj} called {source}_to_{target} which calls {method}.
Expand Down
7 changes: 3 additions & 4 deletions MSMetaEnhancer/libs/Curator.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ def fix_cas_number(cas_number):
return cas_number

@staticmethod
def filter_invalid_metadata(metadata, warning, job):
def filter_invalid_metadata(metadata, log, job):
"""
Validates metadata and filters out invalid ones.
:param metadata: metadata content
:param warning: object storing warnings related to current metadata
:param log: object storing logs related to current metadata
:param job: executed job
:return: only valid metadata
"""
Expand All @@ -67,8 +67,7 @@ def filter_invalid_metadata(metadata, warning, job):
if filters[attribute](value):
valid_metadata[attribute] = value
else:
warning.add_warning(
InvalidAttributeFormat(f'{job}:\n Obtained {attribute} in invalid format: {value}'))
log.update(InvalidAttributeFormat(f'Obtained {attribute} in invalid format: {value}'), job, level=2)
else:
valid_metadata[attribute] = value
return valid_metadata
13 changes: 8 additions & 5 deletions MSMetaEnhancer/libs/Spectra.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from matchms.importing import load_from_msp
from typing import List

from matchms import Spectrum
from matchms.exporting import save_as_msp
from matchms.importing import load_from_msp


class Spectra:
Expand All @@ -8,15 +11,15 @@ class Spectra:
It is using `matchms` package to load and save MSP files.
"""
def __init__(self):
self.spectrums = []
self.spectrums: List[Spectrum] = []

def __eq__(self, other):
if len(self.spectrums) == len(other.spectrums):
return all([spectra_eq(self.spectrums[i], other.spectrums[i]) for i in range(len(self.spectrums))])
else:
return False

def load_from_msp(self, filename):
def load_from_msp(self, filename: str):
"""
Loads given MSP filename as a list of matchms.Spectra objects and
stores them in self.spectrums attribute
Expand All @@ -25,7 +28,7 @@ def load_from_msp(self, filename):
"""
self.spectrums = list(load_from_msp(filename))

def save_to_msp(self, filename):
def save_to_msp(self, filename: str):
"""
Exports all matchms.Spectra objects stored in self.spectrums to
a file given by filename
Expand All @@ -35,7 +38,7 @@ def save_to_msp(self, filename):
save_as_msp(self.spectrums, filename)


def spectra_eq(first, second):
def spectra_eq(first: Spectrum, second: Spectrum):
"""
Compare two Spectra objects.
Native __eq__ definition does not work properly.
Expand Down
41 changes: 29 additions & 12 deletions MSMetaEnhancer/libs/converters/web/WebConverter.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,52 @@
from typing import Any, Union
import aiohttp
from asyncstdlib import lru_cache
from multidict import MultiDict


from aiohttp.client_exceptions import ServerDisconnectedError
from aiohttp.client_exceptions import ServerDisconnectedError, ClientConnectorError
from asyncio.exceptions import TimeoutError

from MSMetaEnhancer.libs.Converter import Converter
from MSMetaEnhancer.libs.utils import logger
from MSMetaEnhancer.libs.utils.Errors import ServiceNotAvailable, UnknownResponse, TargetAttributeNotRetrieved


class WebConverter(Converter):
"""
General class for web conversions.
"""
def __init__(self, session):
def __init__(self, session: aiohttp.ClientSession):
"""Constructor for Webconverter.
Args:
session (aiohttp.ClientSession): Session to use for web IO.
"""
super().__init__()
self.session = session
self.session: aiohttp.ClientSession = session
self.endpoints = dict()

async def convert(self, source: str, target: str, data: Union[str, int, float]):
"""Convert data from source attribute to target attribute.
async def convert(self, source, target, data):
Args:
source (str): Source attribute name.
target (str): Target attribute name.
data (Union[str, int, float]): Data to use for the conversion
Raises:
TargetAttributeNotRetrieved: Exception raised if the target attribute is not retrieved from the converter.
Returns:
_type_: Data retrieved from the service.
"""
result = await getattr(self, f'{source}_to_{target}')(data)
if result:
return result
else:
raise TargetAttributeNotRetrieved(f'No data retrieved.')

@lru_cache
async def query_the_service(self, service, args, method='GET', data=None, headers=None):
async def query_the_service(self, service: str, args: str, method: str = 'GET', data=None, headers=None) -> str:
"""
Make get request to given converter with arguments.
Raises ConnectionError if converter is not available.
Expand All @@ -43,9 +62,9 @@ async def query_the_service(self, service, args, method='GET', data=None, header
result = await self.loop_request(self.endpoints[service] + args, method, data, headers)
return result
except TypeError:
logger.error(TypeError(f'Incorrect argument {args} for converter {service}.'))
raise TypeError(f'Incorrect argument {args} for converter {service}.')

async def loop_request(self, url, method, data, headers, depth=10):
async def loop_request(self, url: str, method: str, data: Any, headers: dict, depth: int = 10) -> str:
"""
Execute request with type depending on specified method.
Expand All @@ -66,14 +85,12 @@ async def loop_request(self, url, method, data, headers, depth=10):
data = MultiDict(data)
async with self.session.post(url, data=data, headers=headers) as response:
return await self.process_request(response, url, method)
except (ServerDisconnectedError, aiohttp.client_exceptions.ClientConnectorError, TimeoutError):
except (ServerDisconnectedError, ClientConnectorError, TimeoutError):
if depth > 0:
logger.error(ServiceNotAvailable(f'Service {self.converter_name} '
f'temporarily unavailable, trying again...'))
return await self.loop_request(url, method, data, headers, depth - 1)
raise ServiceNotAvailable(f'Service {self.converter_name} not available.')

async def process_request(self, response, url, method):
async def process_request(self, response: aiohttp.ClientResponse, url: str, method: str) -> str:
"""
Method to wrap response handling (same for POST and GET requests).
Expand Down
4 changes: 4 additions & 0 deletions MSMetaEnhancer/libs/utils/Errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ class UnknownResponse(Exception):

class InvalidAttributeFormat(Exception):
pass


class DataAlreadyPresent(Exception):
pass
13 changes: 9 additions & 4 deletions MSMetaEnhancer/libs/utils/Job.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
from MSMetaEnhancer.libs.utils.Errors import ConversionNotSupported, SourceAttributeNotAvailable
from typing import Any, Tuple

from matchms import Metadata
from MSMetaEnhancer.libs.Converter import Converter
from MSMetaEnhancer.libs.utils.Errors import (ConversionNotSupported,
SourceAttributeNotAvailable)


class Job:
def __init__(self, data):
def __init__(self, data: Tuple[str, str, str]):
self.source, self.target, self.converter = data

def __str__(self):
Expand All @@ -11,7 +16,7 @@ def __str__(self):
def __repr__(self):
return f'Job(({self.source}, {self.target}, {self.converter}))'

def validate(self, converters, metadata):
def validate(self, converters: dict, metadata: Metadata) -> Tuple[Converter, Any]:
"""
Makes sure to job is supported or possible to execute on given metadata.
Expand All @@ -31,5 +36,5 @@ def validate(self, converters, metadata):
return converter, data


def convert_to_jobs(jobs):
def convert_to_jobs(jobs: Tuple[str, str, str]):
return [Job(data) for data in jobs]
35 changes: 35 additions & 0 deletions MSMetaEnhancer/libs/utils/LogRecord.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from MSMetaEnhancer.libs.utils.Job import Job


class LogRecord:
def __init__(self, metadata):
self.metadata = metadata
self.logs = []

def format_log(self, level: str) -> str:
"""Format log message according to level.
Args:
level (str): Log level to use for formatting.
Returns:
str: Formatted log message
"""
message = f'Issues related to metadata:\n\n{self.metadata}\n\n'
filtered_logs = [log['msg'] for log in self.logs if level >= log['level']]
if filtered_logs:
for log in filtered_logs:
message += f'{log}\n'
else:
return None
return f'{message}\n'

def update(self, exc: Exception, job: Job, level: str):
"""
Process given log record.
:param exc: exception
:param job: related job
:param level: log level
"""
self.logs.append({'level': level, 'msg': f'-> {type(exc).__name__} - {job}:\n{exc}'})
Loading

0 comments on commit 7d82230

Please sign in to comment.