diff --git a/pytimber/__init__.py b/pytimber/__init__.py index f4b5d00..0364ac7 100644 --- a/pytimber/__init__.py +++ b/pytimber/__init__.py @@ -10,11 +10,12 @@ from .pagestore import PageStore -__version__ = "2.5.2" +__version__ = "2.6.0" __cmmnbuild_deps__ = [ "accsoft-cals-extr-client", "accsoft-cals-extr-domain", + "lhc-commons-cals-utils", "slf4j-log4j12", "slf4j-api", "log4j" diff --git a/pytimber/pytimber.py b/pytimber/pytimber.py index acf6381..6866ac8 100644 --- a/pytimber/pytimber.py +++ b/pytimber/pytimber.py @@ -37,17 +37,17 @@ import datetime import six import logging + try: - import jpype - import cmmnbuild_dep_manager + import jpype + import cmmnbuild_dep_manager except ImportError: - print("""ERROR: module jpype and cmmnbuild_dep_manager not found! + print("""ERROR: module jpype and cmmnbuild_dep_manager not found! Exporting data from the logging database will not be available!""") import numpy as np from collections import namedtuple - Stat = namedtuple( 'Stat', ['MinTstamp', 'MaxTstamp', 'ValueCount', @@ -55,7 +55,6 @@ 'StandardDeviationValue'] ) - if six.PY3: long = int @@ -65,9 +64,10 @@ class LoggingDB(object): try: - _jpype=jpype + _jpype = jpype except NameError: - print('ERROR: jpype is note defined!') + print('ERROR: jpype is note defined!') + def __init__(self, appid='LHC_MD_ABP_ANALYSIS', clientid='BEAM PHYSICS', source='all', loglevel=None): # Configure logging @@ -94,7 +94,7 @@ def __init__(self, appid='LHC_MD_ABP_ANALYSIS', clientid='BEAM PHYSICS', ServiceBuilder = (jpype.JPackage('cern').accsoft.cals.extr.client .service.ServiceBuilder) builder = ServiceBuilder.getInstance(appid, clientid, loc) - self._builder=builder + self._builder = builder self._md = builder.createMetaService() self._ts = builder.createTimeseriesService() self._FillService = FillService = builder.createLHCFillService() @@ -108,12 +108,12 @@ def toTimestamp(self, t): return Timestamp.valueOf(t.strftime('%Y-%m-%d %H:%M:%S.%f')) elif t is None: return None - elif isinstance(t,Timestamp): + elif isinstance(t, Timestamp): return t else: - ts = Timestamp(long(t*1000)) + ts = Timestamp(long(t * 1000)) sec = int(t) - nanos = int((t-sec)*1e9) + nanos = int((t - sec) * 1e9) ts.setNanos(nanos) return ts @@ -134,32 +134,35 @@ def toStringList(self, myArray): return myList def toTimescale(self, timescale_list): - Timescale = jpype.JPackage('cern').accsoft.cals.extr.domain.core.constants.TimescalingProperties - try: - timescale_str='_'.join(timescale_list) - return Timescale.valueOf(timescale_str) - except Exception as e: - self._log.warning('exception in timescale:{}'.format(e)) - - def getVariables(self,pattern): + Timescale = jpype.JPackage('cern').accsoft.cals.extr.domain.core.constants.TimescalingProperties + try: + timescale_str = '_'.join(timescale_list) + return Timescale.valueOf(timescale_str) + except Exception as e: + self._log.warning('exception in timescale:{}'.format(e)) + + def getVariables(self, pattern): """Get Variable from pattern. Wildcard is '%'.""" VariableDataType = (jpype.JPackage('cern').accsoft.cals.extr.domain .core.constants.VariableDataType) types = VariableDataType.ALL - v=self._md.getVariablesOfDataTypeWithNameLikePattern(pattern, types) + v = self._md.getVariablesOfDataTypeWithNameLikePattern(pattern, types) return list(v.getVariables()) + def search(self, pattern): """Search for parameter names. Wildcard is '%'.""" return [vv.getVariableName() for vv in self.getVariables(pattern)] - def getDescription(self,pattern): + def getDescription(self, pattern): """Get Variable Description from pattern. Wildcard is '%'.""" - return dict([(vv.getVariableName(),vv.getDescription()) - for vv in self.getVariables(pattern)]) - def getUnit(self,pattern): + return dict([(vv.getVariableName(), vv.getDescription()) + for vv in self.getVariables(pattern)]) + + def getUnit(self, pattern): """Get Variable Unit from pattern. Wildcard is '%'.""" - return dict([(vv.getVariableName(),vv.getUnit()) - for vv in self.getVariables(pattern)]) + return dict([(vv.getVariableName(), vv.getUnit()) + for vv in self.getVariables(pattern)]) + def getFundamentals(self, t1, t2, fundamental): self._log.info( 'Querying fundamentals (pattern: {0}):'.format(fundamental) @@ -200,52 +203,61 @@ def processDataset(self, dataset, datatype, unixtime): spi = (jpype.JPackage('cern').accsoft.cals.extr.domain.core .timeseriesdata.spi) - datas = [] - tss = [] - for tt in dataset: - ts = self.fromTimestamp(tt.getStamp(), unixtime) - if datatype == 'MATRIXNUMERIC': - if isinstance(tt, spi.MatrixNumericDoubleData): - val = np.array(tt.getMatrixDoubleValues(), dtype=float) - elif isinstance(tt, spi.MatrixNumericLongData): - val = np.array(tt.getMatrixLongValues(), dtype=int) - else: - self._log.warning('Unsupported datatype, returning the ' - 'java object') - val = tt - elif datatype == 'VECTORNUMERIC': - if isinstance(tt, spi.VectorNumericDoubleData): - val = np.array(tt.getDoubleValues()[:], dtype=float) - elif isinstance(tt, spi.VectorNumericLongData): - val = np.array(tt.getLongValues()[:], dtype=int) - else: - self._log.warning('Unsupported datatype, returning the ' - 'java object') - val = tt - elif datatype == 'VECTORSTRING': - val = np.array(tt.getStringValues(), dtype='U') - elif datatype == 'NUMERIC': - if isinstance(tt, spi.NumericDoubleData): - val = tt.getDoubleValue() - elif isinstance(tt, spi.NumericLongData): - val = tt.getLongValue() - else: - self._log.warning('Unsupported datatype, returning the ' - 'java object') - val = tt - elif datatype == 'FUNDAMENTAL': - val = 1 - elif datatype == 'TEXTUAL': - val = tt.getVarcharValue() + if type(dataset) is list: + new_ds = jpype.JPackage('cern').accsoft.cals.extr.domain.core.timeseriesdata.spi.TimeseriesDataSetImpl() + for data in dataset: + new_ds.add(data) + dataset = new_ds + + if dataset.isEmpty(): + return (np.array([], dtype=float), np.array([], dtype=float)) + + PrimitiveDataSets = jpype.JPackage('cern').lhc.commons.cals.PrimitiveDataSets + timestamps = np.array(PrimitiveDataSets.unixTimestamps(dataset)[:], dtype=float) + if not unixtime: + timestamps = np.array([datetime.datetime.fromtimestamp(t) for t in timestamps]) + + dataclass = PrimitiveDataSets.dataClass(dataset) + if datatype == 'MATRIXNUMERIC': + if dataclass == spi.MatrixNumericDoubleData: + data = np.array([[np.array(a[:], dtype=float) for a in matrix] for matrix in + PrimitiveDataSets.doubleMatrixData(dataset)]) + elif dataclass == spi.MatrixNumericLongData: + data = np.array([[np.array(a[:], dtype=int) for a in matrix] for matrix in + PrimitiveDataSets.longMatrixData(dataset)]) + else: + self._log.warning('Unsupported datatype, returning the ' + 'java object') + data = [t for t in dataset] + elif datatype == 'VECTORNUMERIC': + if dataclass == spi.VectorNumericDoubleData: + data = np.array([np.array(a[:], dtype=float) for a in PrimitiveDataSets.doubleVectorData(dataset)]) + elif dataclass == spi.VectorNumericLongData: + data = np.array([np.array(a[:], dtype=int) for a in PrimitiveDataSets.longVectorData(dataset)]) + else: + self._log.warning('Unsupported datatype, returning the ' + 'java object') + data = [t for t in dataset] + elif datatype == 'VECTORSTRING': + data = np.array([np.array(a[:], dtype='U') for a in PrimitiveDataSets.stringVectorData(dataset)]) + elif datatype == 'NUMERIC': + if dataclass == spi.NumericDoubleData: + data = np.array(PrimitiveDataSets.doubleData(dataset)[:], dtype=float) + elif dataclass == spi.NumericLongData: + data = np.array(PrimitiveDataSets.longData(dataset)[:], dtype=int) else: self._log.warning('Unsupported datatype, returning the ' 'java object') - val = tt - datas.append(val) - tss.append(ts) - tss = np.array(tss) - datas = np.array(datas) - return (tss, datas) + data = [t for t in dataset] + elif datatype == 'FUNDAMENTAL': + data = np.ones_like(timestamps, dtype=bool) + elif datatype == 'TEXTUAL': + data = np.array(PrimitiveDataSets.stringData(dataset)[:], dtype='U') + else: + self._log.warning('Unsupported datatype, returning the ' + 'java object') + data = [t for t in dataset] + return (timestamps, data) def getAligned(self, pattern_or_list, t1, t2, fundamental=None, master=None, unixtime=True): @@ -323,7 +335,7 @@ def getAligned(self, pattern_or_list, t1, t2, self._log.info('Retrieved {0} values for {1}'.format( res.size(), jvar.getVariableName() )) - self._log.info('{0} seconds for aqn'.format(time.time()-start_time)) + self._log.info('{0} seconds for aqn'.format(time.time() - start_time)) out[v] = self.processDataset( res, res.getVariableDataType().toString(), unixtime )[1] @@ -355,8 +367,8 @@ def getStats(self, pattern_or_list, t1, t2, unixtime=True): for v in variables: logvars.append(v) self._log.info('List of variables to be queried: {0}'.format( - ', '.join(logvars) - )) + ', '.join(logvars) + )) # Acquire data = self._ts.getVariableStatisticsOverMultipleVariablesInTimeWindow( @@ -381,24 +393,24 @@ def getStats(self, pattern_or_list, t1, t2, unixtime=True): return out -# def getSize(self, pattern_or_list, t1, t2): -# ts1 = self.toTimestamp(t1) -# ts2 = self.toTimestamp(t2) -# -# # Build variable list -# variables = self.getVariablesList(pattern_or_list) -# if len(variables) == 0: -# log.warning('No variables found.') -# return {} -# else: -# logvars = [] -# for v in variables: -# logvars.append(v) -# log.info('List of variables to be queried: {0}'.format( -# ', '.join(logvars))) -# # Acquire -# for v in variables: -# return self._ts.getJVMHeapSizeEstimationForDataInTimeWindow(v,ts1,ts2,None,None) + # def getSize(self, pattern_or_list, t1, t2): + # ts1 = self.toTimestamp(t1) + # ts2 = self.toTimestamp(t2) + # + # # Build variable list + # variables = self.getVariablesList(pattern_or_list) + # if len(variables) == 0: + # log.warning('No variables found.') + # return {} + # else: + # logvars = [] + # for v in variables: + # logvars.append(v) + # log.info('List of variables to be queried: {0}'.format( + # ', '.join(logvars))) + # # Acquire + # for v in variables: + # return self._ts.getJVMHeapSizeEstimationForDataInTimeWindow(v,ts1,ts2,None,None) def get(self, pattern_or_list, t1, t2=None, fundamental=None, unixtime=True): @@ -456,7 +468,7 @@ def get(self, pattern_or_list, t1, t2=None, else: datatype = res[0].getVariableDataType().toString() self._log.info('Retrieved {0} values for {1}'.format( - 1, jvar.getVariableName() + 1, jvar.getVariableName() )) elif t2 == 'next': res = [ @@ -486,8 +498,8 @@ def get(self, pattern_or_list, t1, t2=None, out[v] = self.processDataset(res, datatype, unixtime) return out - def getScaled(self, pattern_or_list, t1, t2,unixtime=True, - scaleAlgorithm='SUM', scaleInterval='MINUTE', scaleSize='1'): + def getScaled(self, pattern_or_list, t1, t2, unixtime=True, + scaleAlgorithm='SUM', scaleInterval='MINUTE', scaleSize='1'): """Query the database for a list of variables or for variables whose name matches a pattern (string) in a time window from t1 to t2. @@ -500,7 +512,7 @@ def getScaled(self, pattern_or_list, t1, t2,unixtime=True, """ ts1 = self.toTimestamp(t1) ts2 = self.toTimestamp(t2) - timescaling=self.toTimescale([scaleSize,scaleInterval,scaleAlgorithm]) + timescaling = self.toTimescale([scaleSize, scaleInterval, scaleAlgorithm]) out = {} # Build variable list @@ -519,15 +531,15 @@ def getScaled(self, pattern_or_list, t1, t2,unixtime=True, for v in variables: jvar = variables.getVariable(v) try: - res = self._ts.getDataInFixedIntervals(jvar, ts1, ts2, timescaling) + res = self._ts.getDataInFixedIntervals(jvar, ts1, ts2, timescaling) except jpype.JavaException as e: - print(e.message()) - print(''' + print(e.message()) + print(''' scaleAlgorithm should be one of:{}, scaleInterval one of:{}, - scaleSize an integer'''.format(['MAX','MIN','AVG','COUNT','SUM','REPEAT','INTERPOLATE'] - ,['SECOND', 'MINUTE','HOUR', 'DAY','WEEK','MONTH','YEAR'])) - return + scaleSize an integer'''.format(['MAX', 'MIN', 'AVG', 'COUNT', 'SUM', 'REPEAT', 'INTERPOLATE'] + , ['SECOND', 'MINUTE', 'HOUR', 'DAY', 'WEEK', 'MONTH', 'YEAR'])) + return datatype = res.getVariableDataType().toString() self._log.info('Retrieved {0} values for {1}'.format( res.size(), jvar.getVariableName() @@ -599,7 +611,7 @@ def getLHCFillsByTime(self, t1, t2, beam_modes=None, unixtime=True): fills = ( self._FillService - .getLHCFillsAndBeamModesInTimeWindowContainingBeamModes( + .getLHCFillsAndBeamModesInTimeWindowContainingBeamModes( ts1, ts2, java_beam_modes ) ) @@ -625,32 +637,33 @@ def getIntervalsByLHCModes(self, t1, t2, mode1, mode2, unixtime=True, """ ts1 = self.toTimestamp(t1) ts2 = self.toTimestamp(t2) - fills=self.getLHCFillsByTime(ts1,ts2,[mode1,mode2]) - out=[] + fills = self.getLHCFillsByTime(ts1, ts2, [mode1, mode2]) + out = [] for fill in fills: fn=fill['fillNumber'] m1=[] m2=[] for bm in fill['beamModes']: - if bm['mode']==mode1: - m1.append(bm[mode1time]) - if bm['mode']==mode2: - m2.append(bm[mode2time]) - if len(m1)>0 and len(m2)>0: - out.append([fn,m1[mode1idx],m2[mode2idx]]) + if bm['mode'] == mode1: + m1.append(bm[mode1time]) + if bm['mode'] == mode2: + m2.append(bm[mode2time]) + if len(m1) > 0 and len(m2) > 0: + out.append([fn, m1[mode1idx], m2[mode2idx]]) return out - def getMetaData(self,pattern_or_list): + + def getMetaData(self, pattern_or_list): """Get All MetaData for a variable defined by a pattern_or_list""" - out={} + out = {} variables = self.getVariablesList(pattern_or_list).getVariables() for variable in variables: - metadata=(self._md.getVectorElements(variable) - .getVectornumericElements()) - ts=[tt.fastTime/1000+tt.getNanos()/1e9 for tt in metadata] -# vv=[dict([(aa.key,aa.value) for aa in a.iterator()]) -# for a in metadata.values()] - vv=[[aa.value for aa in a.iterator()] for a in metadata.values()] - out[variable.getVariableName()]=ts,vv + metadata = (self._md.getVectorElements(variable) + .getVectornumericElements()) + ts = [tt.fastTime / 1000 + tt.getNanos() / 1e9 for tt in metadata] + # vv=[dict([(aa.key,aa.value) for aa in a.iterator()]) + # for a in metadata.values()] + vv = [[aa.value for aa in a.iterator()] for a in metadata.values()] + out[variable.getVariableName()] = ts, vv return out @@ -674,7 +687,7 @@ def _get_childs(self): def _cleanName(self, s): if s[0].isdigit(): - s = '_'+s + s = '_' + s out = [] for ss in s: if ss in ' _-;> 0]) return sorted(self._dict.keys()) + v