diff --git a/doc/source/examples.rst b/doc/source/examples.rst index fa40baa..9bd12a6 100644 --- a/doc/source/examples.rst +++ b/doc/source/examples.rst @@ -109,7 +109,7 @@ former one using `openMP `_ and therefore being expected to be much faster when analyzing larger data. Besides, you can set three technical arguments which we will not change here: -1. ``number_of_threads`` (int) giving the number of threads in which the job +1. ``n_jobs`` (int) giving the number of threads in which the job should be executed (default=2) 2. ``sequence`` (int) giving the length of sublists generated from all outcomes (default=10) diff --git a/pyndl/__init__.py b/pyndl/__init__.py index b3f6594..42f28d6 100644 --- a/pyndl/__init__.py +++ b/pyndl/__init__.py @@ -11,20 +11,27 @@ import os import sys import multiprocessing as mp -from pip._vendor import pkg_resources +try: + from importlib.metadata import requires +except ModuleNotFoundError: # python 3.7 and before + requires = None +try: + from packaging.requirements import Requirement +except ModuleNotFoundError: # this should only happend during setup phase + Requirement = None __author__ = ('Konstantin Sering, Marc Weitz, ' 'David-Elias Künstle, Lennard Schneider, ' 'Elnaz Shafaei-Bajestan') __author_email__ = 'konstantin.sering@uni-tuebingen.de' -__version__ = '0.8.2' +__version__ = '0.8.1' __license__ = 'MIT' __description__ = ('Naive discriminative learning implements learning and ' 'classification models based on the Rescorla-Wagner ' 'equations.') __classifiers__ = [ - 'Development Status :: 3 - Alpha', + 'Development Status :: 4 - Beta', 'Environment :: Console', 'Intended Audience :: Science/Research', 'License :: OSI Approved :: MIT License', @@ -45,8 +52,9 @@ def sysinfo(): """ Prints system the dependency information """ - pyndl = pkg_resources.working_set.by_key["pyndl"] - dependencies = [r.project_name for r in pyndl.requires()] + if requires: + dependencies = [Requirement(req).name for req in requires('pyndl') + if not Requirement(req).marker] header = ("Pyndl Information\n" "=================\n\n") @@ -78,7 +86,10 @@ def sysinfo(): deps = ("Dependencies\n" "------------\n") - deps += "\n".join("{pkg.__name__}: {pkg.__version__}".format(pkg=__import__(dep)) - for dep in dependencies) + if requires: + deps += "\n".join("{pkg.__name__}: {pkg.__version__}".format(pkg=__import__(dep)) + for dep in dependencies) + else: + deps = 'You need Python 3.8 or higher to show dependencies.' print(header + general + osinfo + deps) diff --git a/pyndl/activation.py b/pyndl/activation.py index 5f0b52f..e546be0 100644 --- a/pyndl/activation.py +++ b/pyndl/activation.py @@ -9,6 +9,7 @@ import multiprocessing as mp import ctypes from collections import defaultdict, OrderedDict +import warnings import numpy as np import xarray as xr @@ -17,7 +18,8 @@ # pylint: disable=W0621 -def activation(events, weights, number_of_threads=1, remove_duplicates=None, ignore_missing_cues=False): +def activation(events, weights, *, n_jobs=1, number_of_threads=None, + remove_duplicates=None, ignore_missing_cues=False): """ Estimate activations for given events in event file and outcome-cue weights. @@ -31,7 +33,7 @@ def activation(events, weights, number_of_threads=1, remove_duplicates=None, ign weights : xarray.DataArray or dict[dict[float]] the xarray.DataArray needs to have the dimensions 'outcomes' and 'cues' the dictionaries hold weight[outcome][cue]. - number_of_threads : int + n_jobs : int a integer giving the number of threads in which the job should executed remove_duplicates : {None, True, False} @@ -58,6 +60,11 @@ def activation(events, weights, number_of_threads=1, remove_duplicates=None, ign returned if weights is instance of dict """ + if number_of_threads is not None: + warnings.warn("Parameter `number_of_threads` is renamed to `n_jobs`. The old name " + "will stop working with v0.9.0.", + DeprecationWarning, stacklevel=2) + n_jobs = number_of_threads if isinstance(events, str): events = io.events_from_file(events) @@ -87,14 +94,14 @@ def check_no_duplicates(cues): for event_cues in events) # pylint: disable=W0621 activations = _activation_matrix(list(event_cue_indices_list), - weights.values, number_of_threads) + weights.values, n_jobs) return xr.DataArray(activations, coords={ 'outcomes': outcomes }, dims=('outcomes', 'events')) elif isinstance(weights, dict): - assert number_of_threads == 1, "Estimating activations with multiprocessing is not implemented for dicts." + assert n_jobs == 1, "Estimating activations with multiprocessing is not implemented for dicts." activations = defaultdict(lambda: np.zeros(len(events))) events = list(events) for outcome, cue_dict in weights.items(): @@ -130,7 +137,7 @@ def _run_mp_activation_matrix(event_index, cue_indices): activations[:, event_index] = weights[:, cue_indices].sum(axis=1) -def _activation_matrix(indices_list, weights, number_of_threads): +def _activation_matrix(indices_list, weights, n_jobs): """ Estimate activation for indices in weights @@ -143,7 +150,7 @@ def _activation_matrix(indices_list, weights, number_of_threads): events as cue indices in weights weights : numpy.array weight matrix with shape (outcomes, cues) - number_of_threads : int + n_jobs : int Returns ------- @@ -151,10 +158,10 @@ def _activation_matrix(indices_list, weights, number_of_threads): estimated activations as matrix with shape (outcomes, events) """ - assert number_of_threads >= 1, "Can't run with less than 1 thread" + assert n_jobs >= 1, "Can't run with less than 1 thread" activations_dim = (weights.shape[0], len(indices_list)) - if number_of_threads == 1: + if n_jobs == 1: activations = np.empty(activations_dim, dtype=np.float64) for row, event_cues in enumerate(indices_list): activations[:, row] = weights[:, event_cues].sum(axis=1) @@ -164,7 +171,7 @@ def _activation_matrix(indices_list, weights, number_of_threads): weights = np.ascontiguousarray(weights) shared_weights = mp.sharedctypes.copy(np.ctypeslib.as_ctypes(np.float64(weights))) initargs = (shared_weights, weights.shape, shared_activations, activations_dim) - with mp.Pool(number_of_threads, initializer=_init_mp_activation_matrix, initargs=initargs) as pool: + with mp.Pool(n_jobs, initializer=_init_mp_activation_matrix, initargs=initargs) as pool: pool.starmap(_run_mp_activation_matrix, enumerate(indices_list)) activations = np.ctypeslib.as_array(shared_activations) activations.shape = activations_dim diff --git a/pyndl/count.py b/pyndl/count.py index 58b21d0..5e45b8e 100644 --- a/pyndl/count.py +++ b/pyndl/count.py @@ -10,11 +10,16 @@ """ # pylint: disable=redefined-outer-name, invalid-name -from collections import Counter +from collections import Counter, namedtuple import gzip import itertools import multiprocessing import sys +import warnings + + +CuesOutcomes = namedtuple('CuesOutcomes', 'n_events, cues, outcomes') +WordsSymbols = namedtuple('WordsSymbols', 'words, symbols') def _job_cues_outcomes(event_file_name, start, step, verbose=False): @@ -46,9 +51,9 @@ def _job_cues_outcomes(event_file_name, start, step, verbose=False): def cues_outcomes(event_file_name, - *, number_of_processes=2, verbose=False): + *, n_jobs=2, number_of_processes=None, verbose=False): """ - Counts cues and outcomes in event_file_name using number_of_processes + Counts cues and outcomes in event_file_name using n_jobs processes. Returns @@ -56,14 +61,19 @@ def cues_outcomes(event_file_name, (n_events, cues, outcomes) : (int, collections.Counter, collections.Counter) """ - with multiprocessing.Pool(number_of_processes) as pool: - step = number_of_processes + if number_of_processes is not None: + warnings.warn("Parameter `number_of_processes` is renamed to `n_jobs`. The old name " + "will stop working with v0.9.0.", + DeprecationWarning, stacklevel=2) + n_jobs = number_of_processes + with multiprocessing.Pool(n_jobs) as pool: + step = n_jobs results = pool.starmap(_job_cues_outcomes, ((event_file_name, start, step, verbose) - for start in range(number_of_processes))) + for start in range(n_jobs))) n_events = 0 cues = Counter() outcomes = Counter() @@ -75,7 +85,7 @@ def cues_outcomes(event_file_name, if verbose: print('\n...counting done.') - return n_events, cues, outcomes + return CuesOutcomes(n_events, cues, outcomes) def _job_words_symbols(corpus_file_name, start, step, lower_case=False, @@ -117,9 +127,9 @@ def _job_words_symbols(corpus_file_name, start, step, lower_case=False, def words_symbols(corpus_file_name, - *, number_of_processes=2, lower_case=False, verbose=False): + *, n_jobs=2, number_of_processes=None, lower_case=False, verbose=False): """ - Counts words and symbols in corpus_file_name using number_of_processes + Counts words and symbols in corpus_file_name using n_jobs processes. Returns @@ -127,15 +137,20 @@ def words_symbols(corpus_file_name, (words, symbols) : (collections.Counter, collections.Counter) """ - with multiprocessing.Pool(number_of_processes) as pool: - step = number_of_processes + if number_of_processes is not None: + warnings.warn("Parameter `number_of_processes` is renamed to `n_jobs`. The old name " + "will stop working with v0.9.0.", + DeprecationWarning, stacklevel=2) + n_jobs = number_of_processes + with multiprocessing.Pool(n_jobs) as pool: + step = n_jobs results = pool.starmap(_job_words_symbols, ((corpus_file_name, start, step, lower_case, verbose) for start in - range(number_of_processes))) + range(n_jobs))) words = Counter() symbols = Counter() for words_process, symbols_process in results: @@ -145,7 +160,7 @@ def words_symbols(corpus_file_name, if verbose: print('\n...counting done.') - return words, symbols + return WordsSymbols(words, symbols) def save_counter(counter, filename, *, header='key\tfreq\n'): diff --git a/pyndl/ndl.py b/pyndl/ndl.py index f051343..84c16b4 100644 --- a/pyndl/ndl.py +++ b/pyndl/ndl.py @@ -16,13 +16,14 @@ import threading import time import warnings +import types import cython import pandas as pd import numpy as np import xarray as xr -from . import __version__ +from . import __version__ as pyndl_version from . import count from . import preprocess from . import ndl_parallel @@ -40,12 +41,36 @@ warnings.simplefilter('always', DeprecationWarning) -def events_from_file(event_path): - warnings.warn("Usage of pyndl.ndl.events_from_file is depreceated and will " - "be removed in v0.6.0. Please use pyndl.io.events_from_file " - "instead.", - DeprecationWarning, stacklevel=2) - return io.events_from_file(event_path) +class WeightDict(defaultdict): + # pylint: disable=missing-docstring + + """ + Subclass of defaultdict to represent outcome-cue weights. + + Notes + ----- + Weight for each outcome-cue combination is 0 per default. + + """ + + # pylint: disable=W0613 + def __init__(self, *args, **kwargs): + super().__init__(lambda: defaultdict(float)) + + self._attrs = OrderedDict() + + if 'attrs' in kwargs: + self.attrs = kwargs['attrs'] + else: + self.attrs = {} + + @property + def attrs(self): + return self._attrs + + @attrs.setter + def attrs(self, attrs): + self._attrs = OrderedDict(attrs) def ndl(events, alpha, betas, lambda_=1.0, *, @@ -63,8 +88,8 @@ def ndl(events, alpha, betas, lambda_=1.0, *, Parameters ---------- - events : str - path to the event file + events : generator or str + generates cues, outcomes pairs or the path to the event file alpha : float saliency of all cues betas : (float, float) @@ -102,6 +127,13 @@ def ndl(events, alpha, betas, lambda_=1.0, *, """ + # Create temporary file if events is a generator + if isinstance(events, types.GeneratorType): + file_path = tempfile.NamedTemporaryFile().name + io.events_to_file(events, file_path) + events = file_path + del file_path + if number_of_threads is not None: warnings.warn("Parameter `number_of_threads` is renamed to `n_jobs`. The old name " "will stop working with v0.9.0.", @@ -124,7 +156,7 @@ def ndl(events, alpha, betas, lambda_=1.0, *, # preprocessing n_events, cues, outcomes = count.cues_outcomes(events, - number_of_processes=n_jobs, + n_jobs=n_jobs, verbose=verbose) cues = list(cues.keys()) outcomes = list(outcomes.keys()) @@ -167,12 +199,16 @@ def ndl(events, alpha, betas, lambda_=1.0, *, else: raise ValueError('weights need to be None or xarray.DataArray with method=%s' % method) + if any(length > 4294967295 for length in weights.shape): + raise ValueError("Neither number of cues nor outcomes shall exceed 4294967295 " + "for now. See https://github.com/quantling/pyndl/issues/169") + beta1, beta2 = betas with tempfile.TemporaryDirectory(prefix="pyndl", dir=temporary_directory) as binary_path: number_events = preprocess.create_binary_event_files(events, binary_path, cue_map, outcome_map, overwrite=True, - number_of_processes=n_jobs, + n_jobs=n_jobs, events_per_file=events_per_temporary_file, remove_duplicates=remove_duplicates, verbose=verbose) @@ -185,6 +221,8 @@ def ndl(events, alpha, betas, lambda_=1.0, *, if verbose: print('start learning...') # learning + if not weights.data.c_contiguous: + raise ValueError('weights has to be c_contiguous') if method == 'openmp': if not sys.platform.startswith('linux'): raise NotImplementedError("OpenMP is linux only at the moment." @@ -286,7 +324,7 @@ def _format(value): 'wall_time': _format(str(wall_time)), 'hostname': _format(socket.gethostname()), 'username': _format(getpass.getuser()), - 'pyndl': _format(__version__), + 'pyndl': _format(pyndl_version), 'numpy': _format(np.__version__), 'pandas': _format(pd.__version__), 'xarray': _format(xr.__version__), @@ -306,38 +344,6 @@ def _format(value): return new_attrs -class WeightDict(defaultdict): - # pylint: disable=missing-docstring - - """ - Subclass of defaultdict to represent outcome-cue weights. - - Notes - ----- - Weight for each outcome-cue combination is 0 per default. - - """ - - # pylint: disable=W0613 - def __init__(self, *args, **kwargs): - super().__init__(lambda: defaultdict(float)) - - self._attrs = OrderedDict() - - if 'attrs' in kwargs: - self.attrs = kwargs['attrs'] - else: - self.attrs = {} - - @property - def attrs(self): - return self._attrs - - @attrs.setter - def attrs(self, attrs): - self._attrs = OrderedDict(attrs) - - def dict_ndl(events, alphas, betas, lambda_=1.0, *, weights=None, inplace=False, remove_duplicates=None, make_data_array=False, verbose=False): @@ -472,24 +478,62 @@ def dict_ndl(events, alphas, betas, lambda_=1.0, *, __name__ + "." + dict_ndl.__name__, attrs=attrs_to_update) if make_data_array: - outcomes = list(weights.keys()) - cues = set() - for outcome in outcomes: - cues.update(set(weights[outcome].keys())) + weights = data_array(weights, attrs=attrs) + else: + weights.attrs = attrs - cues = list(cues) + return weights - weights_dict = weights - shape = (len(outcomes), len(cues)) - weights = xr.DataArray(np.zeros(shape), attrs=attrs, - coords={'outcomes': outcomes, 'cues': cues}, - dims=('outcomes', 'cues')) - for outcome in outcomes: - for cue in cues: - weights.loc[{"outcomes": outcome, "cues": cue}] = weights_dict[outcome][cue] - else: - weights.attrs = attrs +def data_array(weights, *, attrs=None): + """ + Calculate the weights for all_outcomes over all events in event_file. + + Parameters + ---------- + weights : dict of dicts of floats or WeightDict + the first dict has outcomes as keys and dicts as values + the second dict has cues as keys and weights as values + weights[outcome][cue] gives the weight between outcome and cue. + If a dict of dicts is given, attrs is required. If a WeightDict is + given, attrs is optional + attrs : dict + A dictionary of attributes + + Returns + ------- + weights : xarray.DataArray + with dimensions 'outcomes' and 'cues'. You can lookup the weights + between a cue and an outcome with ``weights.loc[{'outcomes': outcome, + 'cues': cue}]`` or ``weights.loc[outcome].loc[cue]``. + """ + + if isinstance(weights, xr.DataArray) and weights.dims == ('outcomes', 'cues'): + return weights + + if attrs is None: + try: + attrs = weights.attrs + except AttributeError: + raise AttributeError("weights does not have attributes and no attrs " + "argument is given.") + + outcomes = list(weights.keys()) + cues = set() + for outcome in outcomes: + cues.update(set(weights[outcome].keys())) + + cues = list(cues) + + weights_dict = weights + shape = (len(outcomes), len(cues)) + weights = xr.DataArray(np.zeros(shape), attrs=attrs, + coords={'outcomes': outcomes, 'cues': cues}, + dims=('outcomes', 'cues')) + + for outcome in outcomes: + for cue in cues: + weights.loc[{"outcomes": outcome, "cues": cue}] = weights_dict[outcome][cue] return weights diff --git a/pyndl/ndl_openmp.pyx b/pyndl/ndl_openmp.pyx index 6b2c2d2..d6c3e50 100644 --- a/pyndl/ndl_openmp.pyx +++ b/pyndl/ndl_openmp.pyx @@ -24,7 +24,7 @@ def learn_inplace_binary_to_binary(binary_file_paths, np.ndarray[dtype_t, ndim=2] weights, np.ndarray[unsigned int, ndim=1] all_outcomes, unsigned int chunksize, - unsigned int number_of_threads): + unsigned int n_jobs): cdef unsigned int n_all_cues = weights.shape[1] # number of cues == columns cdef unsigned int* all_outcomes_ptr = all_outcomes.data @@ -45,7 +45,7 @@ def learn_inplace_binary_to_binary(binary_file_paths, number_parts = math.ceil( length_all_outcomes / chunksize) - with nogil, parallel(num_threads=number_of_threads): + with nogil, parallel(num_threads=n_jobs): for ii in prange(number_parts, schedule="dynamic", chunksize=1): start_val = ii * chunksize end_val = min(start_val + chunksize, length_all_outcomes) @@ -66,7 +66,7 @@ def learn_inplace_binary_to_real(binary_file_paths, np.ndarray[dtype_t, ndim=2] outcome_vectors, np.ndarray[dtype_t, ndim=2] weights, unsigned int chunksize, - unsigned int number_of_threads): + unsigned int n_jobs): cdef unsigned int n_all_cues = weights.shape[1] # number of cues == columns cdef unsigned int n_outcome_vector_dimensions = outcome_vectors.shape[1] @@ -88,7 +88,7 @@ def learn_inplace_binary_to_real(binary_file_paths, if n_outcome_vector_dimensions % chunksize != 0: number_parts += 1 - with nogil, parallel(num_threads=number_of_threads): + with nogil, parallel(num_threads=n_jobs): for ii in prange(number_parts, schedule="dynamic", chunksize=1): start_val = ii * chunksize end_val = min(start_val + chunksize, n_outcome_vector_dimensions) @@ -116,7 +116,7 @@ def learn_inplace_real_to_binary(binary_file_paths, np.ndarray[dtype_t, ndim=2] cue_vectors, np.ndarray[dtype_t, ndim=2] weights, unsigned int chunksize, - unsigned int number_of_threads): + unsigned int n_jobs): cdef unsigned int n_all_outcomes = weights.shape[0] # number of outcomes == rows cdef unsigned int n_cue_vector_dimensions = weights.shape[1] # number of cue vector dimensions == columns @@ -138,7 +138,7 @@ def learn_inplace_real_to_binary(binary_file_paths, if n_all_outcomes % chunksize != 0: number_parts += 1 - with nogil, parallel(num_threads=number_of_threads): + with nogil, parallel(num_threads=n_jobs): for ii in prange(number_parts, schedule="dynamic", chunksize=1): start_val = ii * chunksize end_val = min(start_val + chunksize, n_all_outcomes) @@ -166,7 +166,7 @@ def learn_inplace_real_to_real(binary_file_paths, np.ndarray[dtype_t, ndim=2] outcome_vectors, np.ndarray[dtype_t, ndim=2] weights, unsigned int chunksize, - unsigned int number_of_threads): + unsigned int n_jobs): assert weights.shape[1] == cue_vectors.shape[1] assert weights.shape[0] == outcome_vectors.shape[1] @@ -192,7 +192,7 @@ def learn_inplace_real_to_real(binary_file_paths, if n_outcome_vector_dimensions % chunksize != 0: number_parts += 1 - with nogil, parallel(num_threads=number_of_threads): + with nogil, parallel(num_threads=n_jobs): for ii in prange(number_parts, schedule="dynamic", chunksize=1): start_val = ii * chunksize end_val = min(start_val + chunksize, n_outcome_vector_dimensions) diff --git a/pyndl/ndl_parallel.pyx b/pyndl/ndl_parallel.pyx index 9d04fe5..a1d727e 100644 --- a/pyndl/ndl_parallel.pyx +++ b/pyndl/ndl_parallel.pyx @@ -5,10 +5,12 @@ from libc.stdio cimport fopen, fread, fclose, FILE from error_codes cimport ErrorCode, NO_ERROR, MAGIC_NUMBER_DOES_NOT_MATCH, VERSION_NUMBER_DOES_NOT_MATCH, INITIAL_ERROR_CODE, ERROR_CODES + cdef unsigned int MAGIC_NUMBER = 14159265 cdef unsigned int CURRENT_VERSION_WITH_FREQ = 215 cdef unsigned int CURRENT_VERSION = 2048 + 215 + # run two sanity checks while loading the extension # 1. check if sizeof(unsigned int) != 4: diff --git a/pyndl/preprocess.py b/pyndl/preprocess.py index 51f7dd9..f762043 100644 --- a/pyndl/preprocess.py +++ b/pyndl/preprocess.py @@ -458,7 +458,7 @@ def filter_event_file(input_event_file, output_event_file, *, keep_cues="all", keep_outcomes="all", remove_cues=None, remove_outcomes=None, cue_map=None, outcome_map=None, - number_of_processes=1, chunksize=100000, + n_jobs=1, number_of_processes=None, chunksize=100000, verbose=False): """ Filter an event file by a list or a map of cues and outcomes. @@ -487,7 +487,7 @@ def filter_event_file(input_event_file, output_event_file, *, maps every outcome as key to the value. Removes all outcome that do not have a key. This can be used to map several different outcomes to the same outcome or to rename outcomes. - number_of_processes : int + n_jobs : int number of threads to use chunksize : int number of chunks per submitted job, should be around 100000 @@ -502,10 +502,15 @@ def filter_event_file(input_event_file, output_event_file, *, is still present in order to capture the background rate of that cues. """ + if number_of_processes is not None: + warnings.warn("Parameter `number_of_processes` is renamed to `n_jobs`. The old name " + "will stop working with v0.9.0.", + DeprecationWarning, stacklevel=2) + n_jobs = number_of_processes job = JobFilter(keep_cues, keep_outcomes, remove_cues, remove_outcomes, cue_map, outcome_map) - with multiprocessing.Pool(number_of_processes) as pool: + with multiprocessing.Pool(n_jobs) as pool: with gzip.open(input_event_file, "rt") as infile: with gzip.open(output_event_file, "wt") as outfile: # copy header @@ -702,7 +707,7 @@ def create_binary_event_files(event_file, outcome_id_map, *, sort_within_event=False, - number_of_processes=2, + n_jobs=2, events_per_file=10000000, overwrite=False, remove_duplicates=None, @@ -723,7 +728,7 @@ def create_binary_event_files(event_file, outcome to id map sort_within_event : bool should we sort the cues and outcomes within the event - number_of_processes : int + n_jobs : int number of threads to use events_per_file : int Number of events in each binary file. Has to be larger than 1 @@ -761,7 +766,7 @@ def create_binary_event_files(event_file, number_events = 0 - with multiprocessing.Pool(number_of_processes) as pool: + with multiprocessing.Pool(n_jobs) as pool: def _error_callback(error): if isinstance(error, StopIteration): @@ -807,8 +812,8 @@ def _callback(result): else: raise error ii += 1 - # only start jobs in chunks of 4*number_of_processes - if ii % (number_of_processes*4) == 0: + # only start jobs in chunks of 4*n_jobs + if ii % (n_jobs*4) == 0: while True: if result.ready(): break diff --git a/pyndl/wh.py b/pyndl/wh.py index 667c708..d59c64a 100644 --- a/pyndl/wh.py +++ b/pyndl/wh.py @@ -21,7 +21,7 @@ import numpy as np import xarray as xr -from . import __version__ +from . import __version__ as pyndl_version from . import count from . import preprocess from . import io @@ -363,9 +363,12 @@ def _wh_binary_to_real(events, eta, outcome_vectors, *, # TODO: convert dict to xarray here raise NotImplementedError('dicts are not supported yet.') + if not outcome_vectors.data.data.c_contiguous: + raise ValueError('outcome_vectors have to be c_contiguous') + # preprocessing n_events, cues, outcomes_from_events = count.cues_outcomes(events, - number_of_processes=n_jobs, + n_jobs=n_jobs, verbose=verbose) cues = list(cues.keys()) outcomes_from_events = list(outcomes_from_events.keys()) @@ -408,7 +411,7 @@ def _wh_binary_to_real(events, eta, outcome_vectors, *, with tempfile.TemporaryDirectory(prefix="pyndl", dir=temporary_directory) as binary_path: number_events = preprocess.create_binary_event_files(events, binary_path, cue_map, outcome_map, overwrite=True, - number_of_processes=n_jobs, + n_jobs=n_jobs, events_per_file=events_per_temporary_file, remove_duplicates=remove_duplicates, verbose=verbose) @@ -421,6 +424,8 @@ def _wh_binary_to_real(events, eta, outcome_vectors, *, if verbose: print('start learning...') # learning + if not weights.data.c_contiguous: + raise ValueError('weights has to be c_contiguous') if method == 'openmp': if not sys.platform.startswith('linux'): raise NotImplementedError("OpenMP is linux only at the moment." @@ -451,7 +456,7 @@ def _wh_binary_to_real(events, eta, outcome_vectors, *, # for partlist in part_lists: # working_queue.put(np.array(partlist, dtype=np.uint32)) - # for _ in range(number_of_threads): + # for _ in range(n_jobs): # thread = threading.Thread(target=worker) # thread.start() # threads.append(thread) @@ -475,8 +480,9 @@ def _wh_binary_to_real(events, eta, outcome_vectors, *, __name__ + "." + ndl.__name__, method=method, attrs=attrs_to_be_updated) # post-processing - weights = xr.DataArray(weights, [('outcome_vector_dimensions', outcome_vectors.coords['outcome_vector_dimensions']), - ('cues', cues)], attrs=attrs) + weights = xr.DataArray(weights, coords=[('outcome_vector_dimensions', + outcome_vectors.coords['outcome_vector_dimensions'].data), + ('cues', cues)], attrs=attrs) return weights @@ -535,16 +541,24 @@ def _wh_real_to_binary(events, betas, lambda_, cue_vectors, *, """ if not (remove_duplicates is None or isinstance(remove_duplicates, bool)): raise ValueError("remove_duplicates must be None, True or False") + if not isinstance(events, str): raise ValueError("'events' need to be the path to a gzipped event file not {}".format(type(events))) + if type(cue_vectors) == dict: + # TODO: convert dict to xarray here + raise NotImplementedError('dicts are not supported yet.') + + if not cue_vectors.data.data.c_contiguous: + raise ValueError('cue_vectors have to be c_contiguous') + weights_ini = weights wall_time_start = time.perf_counter() cpu_time_start = time.process_time() # preprocessing n_events, cues_from_events, outcomes_from_events = count.cues_outcomes(events, - number_of_processes=n_jobs, + n_jobs=n_jobs, verbose=verbose) cues_from_events = list(cues_from_events.keys()) @@ -595,7 +609,7 @@ def _wh_real_to_binary(events, betas, lambda_, cue_vectors, *, with tempfile.TemporaryDirectory(prefix="pyndl", dir=temporary_directory) as binary_path: number_events = preprocess.create_binary_event_files(events, binary_path, cue_map, outcome_map, overwrite=True, - number_of_processes=n_jobs, + n_jobs=n_jobs, events_per_file=events_per_temporary_file, remove_duplicates=remove_duplicates, verbose=verbose) @@ -608,6 +622,8 @@ def _wh_real_to_binary(events, betas, lambda_, cue_vectors, *, if verbose: print('start learning...') # learning + if not weights.data.data.c_contiguous: + raise ValueError('weights has to be c_contiguous') if method == 'openmp': if not sys.platform.startswith('linux'): raise NotImplementedError("OpenMP is linux only at the moment." @@ -715,9 +731,15 @@ def _wh_real_to_real(events, eta, cue_vectors, outcome_vectors, *, # TODO: convert dict to xarray here raise NotImplementedError('dicts are not supported yet.') + if not cue_vectors.data.data.c_contiguous: + raise ValueError('cue_vectors have to be c_contiguous') + + if not outcome_vectors.data.data.c_contiguous: + raise ValueError('outcome_vectors have to be c_contiguous') + # preprocessing n_events, cues_from_events, outcomes_from_events = count.cues_outcomes(events, - number_of_processes=n_jobs, + n_jobs=n_jobs, verbose=verbose) cues_from_events = list(cues_from_events.keys()) @@ -767,6 +789,8 @@ def _wh_real_to_real(events, eta, cue_vectors, outcome_vectors, *, raise ValueError('weights need to be None or xarray.DataArray with method=%s' % method) del shape + if not weights.data.data.c_contiguous: + raise ValueError('weights has to be c_contiguous') if method == 'numpy': event_generator = io.events_from_file(events) number_events = 0 @@ -809,7 +833,7 @@ def _wh_real_to_real(events, eta, cue_vectors, outcome_vectors, *, with tempfile.TemporaryDirectory(prefix="pyndl", dir=temporary_directory) as binary_path: number_events = preprocess.create_binary_event_files(events, binary_path, cue_map, outcome_map, overwrite=True, - number_of_processes=n_jobs, + n_jobs=n_jobs, events_per_file=events_per_temporary_file, remove_duplicates=remove_duplicates, verbose=verbose) @@ -884,7 +908,7 @@ def _format(value): 'wall_time': _format(str(wall_time)), 'hostname': _format(socket.gethostname()), 'username': _format(getpass.getuser()), - 'pyndl': _format(__version__), + 'pyndl': _format(pyndl_version), 'numpy': _format(np.__version__), 'pandas': _format(pd.__version__), 'xarray': _format(xr.__version__), diff --git a/requirements.txt b/requirements.txt index cda3f44..c5d1a52 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,8 @@ numpy>=1.8.2 -scipy>=1.6.3 +scipy>=1.5.4 cython>=0.21.1 pandas>=0.14.1 xarray>=0.7.2 netCDF4>=1.3.1 pip>=9.0.1 +packaging>=20.0 diff --git a/tests/conftest.py b/tests/conftest.py index 6c11194..d699ae5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,13 +2,13 @@ Configuration for py.test-3. ''' + import pytest def pytest_addoption(parser): parser.addoption("--runslow", action="store_true", help="run slow tests") - parser.addoption("--no-linux", action="store_true", help="run without linux tests") diff --git a/tests/test_activation.py b/tests/test_activation.py index 385c995..272e79a 100644 --- a/tests/test_activation.py +++ b/tests/test_activation.py @@ -15,6 +15,7 @@ from pyndl import ndl from pyndl.activation import activation + TEST_ROOT = os.path.join(os.path.pardir, os.path.dirname(__file__)) FILE_PATH_SIMPLE = os.path.join(TEST_ROOT, "resources/event_file_simple.tab.gz") FILE_PATH_MULTIPLE_CUES = os.path.join(TEST_ROOT, "resources/event_file_multiple_cues.tab.gz") @@ -50,10 +51,10 @@ def test_activation_matrix(): reference_activations = np.array([[1, 0, 1, 0], [1, 1, 0, 1]]) with pytest.raises(ValueError): - activations = activation(events, weights, number_of_threads=1) + activations = activation(events, weights, n_jobs=1) - activations = activation(events, weights, number_of_threads=1, remove_duplicates=True) - activations_mp = activation(events, weights, number_of_threads=3, remove_duplicates=True) + activations = activation(events, weights, n_jobs=1, remove_duplicates=True) + activations_mp = activation(events, weights, n_jobs=3, remove_duplicates=True) assert np.allclose(reference_activations, activations) assert np.allclose(reference_activations, activations_mp) @@ -74,12 +75,12 @@ def test_ignore_missing_cues(): reference_activations = np.array([[1, 0, 1, 0], [1, 1, 0, 1]]) with pytest.raises(KeyError): - activations = activation(events, weights, number_of_threads=1, + activations = activation(events, weights, n_jobs=1, remove_duplicates=True) - activations = activation(events, weights, number_of_threads=1, + activations = activation(events, weights, n_jobs=1, remove_duplicates=True, ignore_missing_cues=True) - activations_mp = activation(events, weights, number_of_threads=3, + activations_mp = activation(events, weights, n_jobs=3, remove_duplicates=True, ignore_missing_cues=True) assert np.allclose(reference_activations, activations) @@ -104,9 +105,9 @@ def test_activation_dict(): } with pytest.raises(ValueError): - activations = activation(events, weights, number_of_threads=1) + activations = activation(events, weights, n_jobs=1) - activations = activation(events, weights, number_of_threads=1, remove_duplicates=True) + activations = activation(events, weights, n_jobs=1, remove_duplicates=True) for outcome, activation_list in activations.items(): assert np.allclose(reference_activations[outcome], activation_list) @@ -129,9 +130,9 @@ def test_ignore_missing_cues_dict(): } with pytest.raises(ValueError): - activations = activation(events, weights, number_of_threads=1) + activations = activation(events, weights, n_jobs=1) - activations = activation(events, weights, number_of_threads=1, + activations = activation(events, weights, n_jobs=1, remove_duplicates=True, ignore_missing_cues=True) for outcome, activation_list in activations.items(): assert np.allclose(reference_activations[outcome], activation_list) @@ -175,10 +176,10 @@ def dec_func(*args, **kwargs): print("") gc.collect() asp = (time_test(activation, of="single threaded") - (events, weights, number_of_threads=1, remove_duplicates=True)) + (events, weights, n_jobs=1, remove_duplicates=True)) gc.collect() amp = (time_test(activation, of="multi threaded (up to 8 threads)") - (events, weights, number_of_threads=8, remove_duplicates=True)) + (events, weights, n_jobs=8, remove_duplicates=True)) del weights del events gc.collect() diff --git a/tests/test_count.py b/tests/test_count.py index 173a952..dfdb5de 100644 --- a/tests/test_count.py +++ b/tests/test_count.py @@ -14,7 +14,7 @@ def test_cues_outcomes(): n_events, cues, outcomes = count.cues_outcomes(EVENT_RESOURCE_FILE) n_events3, cues3, outcomes3 = count.cues_outcomes(EVENT_RESOURCE_FILE, - number_of_processes=6, + n_jobs=6, verbose=True) assert n_events == 2772 assert n_events == n_events3 @@ -25,7 +25,7 @@ def test_cues_outcomes(): def test_words_symbols(): words, symbols = count.words_symbols(CORPUS_RESOURCE_FILE) words3, symbols3 = count.words_symbols(CORPUS_RESOURCE_FILE, - number_of_processes=3, + n_jobs=3, verbose=True) assert words == words3 assert symbols == symbols3 diff --git a/tests/test_ndl.py b/tests/test_ndl.py index ed417b8..e5e19f7 100644 --- a/tests/test_ndl.py +++ b/tests/test_ndl.py @@ -124,6 +124,36 @@ def test_exceptions(): with pytest.raises(ValueError, match="events_per_file has to be larger than 1") as e_info: ndl.ndl(FILE_PATH_SIMPLE, ALPHA, BETAS, method='threading', events_per_temporary_file=1) + with pytest.raises(AttributeError, match="weights does not have attributes " + "and no attrs argument is given.") as e_info: + ndl.data_array(dict()) + +# # Test usually exeeds memory limit; It demands ~32GB of RAM. +# with pytest.raises(ValueError, match="Neither number of cues nor outcomes " +# "shall exceed 4294967295 for now. See " +# "https://github.com/quantling/pyndl/issues/169") as e_info: +# ndl.ndl(FILE_PATH_SIMPLE, ALPHA, BETAS, +# weights=xr.DataArray(np.zeros(shape=(4294967295 + 1, 1)))) + + +def test_generator_learning(): + events = io.events_from_file(FILE_PATH_SIMPLE) + result_ndl_gen = ndl.ndl(events, ALPHA, BETAS, method='threading') + result_ndl = ndl.ndl(FILE_PATH_SIMPLE, ALPHA, BETAS, method='threading') + + unequal, unequal_ratio = compare_arrays(FILE_PATH_SIMPLE, + result_ndl_gen, + result_ndl) + print(result_ndl_gen) + print('%.2f ratio unequal' % unequal_ratio) + assert len(unequal) == 0 # pylint: disable=len-as-condition + + +def test_data_array_cast(): + result_ndl = ndl.ndl(FILE_PATH_SIMPLE, ALPHA, BETAS, method='threading') + casted_result = ndl.data_array(result_ndl) + assert isinstance(casted_result, xr.DataArray) and (result_ndl == casted_result).all() + def test_continue_learning_dict(): events_simple = pd.read_csv(FILE_PATH_SIMPLE, sep="\t") diff --git a/tests/test_preprocess.py b/tests/test_preprocess.py index 36b4a23..24553d0 100644 --- a/tests/test_preprocess.py +++ b/tests/test_preprocess.py @@ -24,7 +24,7 @@ def test_bandsample(): resource_file = os.path.join(TEST_ROOT, "resources/event_file_trigrams_to_word.tab.gz") _, _, outcome_freq_map = cues_outcomes(resource_file, - number_of_processes=2) + n_jobs=2) outcome_freq_map_filtered = bandsample(outcome_freq_map, 50, cutoff=1, seed=None, verbose=False) assert len(outcome_freq_map_filtered) == 50 @@ -210,7 +210,7 @@ def test_filter_event_file(): filter_event_file(input_event_file, output_event_file, keep_cues=cues, keep_outcomes=outcomes, - number_of_processes=2, + n_jobs=2, verbose=True) _, cue_freq_map, outcome_freq_map = cues_outcomes(output_event_file) cues_new = list(cue_freq_map) @@ -328,7 +328,7 @@ def test_preprocessing(): lower_case=True, verbose=True) # read in outcomes - _, _, outcome_freq_map = cues_outcomes(event_file, number_of_processes=2) + _, _, outcome_freq_map = cues_outcomes(event_file, n_jobs=2) # reduce number of outcomes through bandsampling outcome_freq_map_filtered = bandsample(outcome_freq_map, 50, cutoff=1, seed=None)