From bf7f406beb4ab56c25fa8596cb3d03e56570e9ce Mon Sep 17 00:00:00 2001 From: $aTyam Date: Thu, 2 Nov 2023 11:09:52 -0400 Subject: [PATCH] [TESTED]Forest Model Integration 1. switching a model is as simple as changing model_type in config file 2. ForestModel is now working. Main model is in model.py file which is copied from label_assist 3. TestRunForestModel.py is working. 3. Regression test in TestForestmodel.py are still under construction. --- .../modelling/trip_model/forest_classifier.py | 629 ++------- .../modelling/trip_model/model_type.py | 17 +- .../analysis/modelling/trip_model/models.py | 1194 +++++++++++++++++ emission/core/wrapper/entry.py | 3 + .../tests/modellingTests/TestForestModel.py | 101 +- .../modellingTests/TestRunForestModel.py | 12 +- .../modellingTests/modellingTestAssets.py | 5 +- 7 files changed, 1369 insertions(+), 592 deletions(-) create mode 100644 emission/analysis/modelling/trip_model/models.py diff --git a/emission/analysis/modelling/trip_model/forest_classifier.py b/emission/analysis/modelling/trip_model/forest_classifier.py index a76d03628..5a23d867e 100644 --- a/emission/analysis/modelling/trip_model/forest_classifier.py +++ b/emission/analysis/modelling/trip_model/forest_classifier.py @@ -5,17 +5,13 @@ from sklearn.metrics.pairwise import haversine_distances import emission.core.wrapper.confirmedtrip as ecwc import logging -import numpy as np -import copy +from io import BytesIO import emission.analysis.modelling.trip_model.trip_model as eamuu -import emission.analysis.modelling.trip_model.dbscan_svm as eamtd -import emission.analysis.modelling.trip_model.util as eamtu import emission.analysis.modelling.trip_model.config as eamtc import emission.storage.timeseries.builtin_timeseries as estb -from sklearn.exceptions import NotFittedError - -from sklearn.ensemble import RandomForestClassifier +import emission.storage.decorations.trip_queries as esdtq +from emission.analysis.modelling.trip_model.models import ForestClassifierModel EARTH_RADIUS = 6371000 @@ -33,7 +29,6 @@ def __init__(self,config=None): 'loc_feature', 'n_estimators', 'criterion', - 'max_depth', 'min_samples_split', 'min_samples_leaf', 'max_features', @@ -59,538 +54,124 @@ def __init__(self,config=None): if config.get(k) is None: msg = f"cluster trip model config missing expected key {k}" raise KeyError(msg) - - self.loc_feature = config['loc_feature'] - self.radius = config['radius'] - self.size_thresh = config['size_thresh'] - self.purity_thresh = config['purity_thresh'] - self.gamma = config['gamma'] - self.C = config['C'] - self.n_estimators = config['n_estimators'] - self.criterion =config['criterion'] - self.max_depth = config['max_depth'] if config['max_depth'] != 'null' else None - self.min_samples_split = config['min_samples_split'] - self.min_samples_leaf = config['min_samples_leaf'] - self.max_features = config['max_features'] - self.bootstrap = config['bootstrap'] - self.random_state = config['random_state'] - # self.drop_unclustered = drop_unclustered - self.use_start_clusters = config['use_start_clusters'] - self.use_trip_clusters = config['use_trip_clusters'] - self.base_features = [ - 'duration', - 'distance', - 'start_local_dt_year', - 'start_local_dt_month', - 'start_local_dt_day', - 'start_local_dt_hour', - 'start_local_dt_weekday', - 'end_local_dt_year', # most likely the same as the start year - 'end_local_dt_month', # most likely the same as the start month - 'end_local_dt_day', - 'end_local_dt_hour', - 'end_local_dt_weekday', - ] - self.targets = ['mode_true', 'purpose_true', 'replaced_true'] - - if self.loc_feature == 'cluster': - # clustering algorithm to generate end clusters - self.end_cluster_model = eamtd.DBSCANSVMCluster( - loc_type='end', - radius=self.radius, - size_thresh=self.size_thresh, - purity_thresh=self.purity_thresh, - gamma=self.gamma, - C=self.C) - - if self.use_start_clusters or self.use_trip_clusters: - # clustering algorithm to generate start clusters - self.start_cluster_model = eamtd.DBSCANSVMCluster( - loc_type='start', - radius=self.radius, - size_thresh=self.size_thresh, - purity_thresh=self.purity_thresh, - gamma=self.gamma, - C=self.C) - - if self.use_trip_clusters: - # helper class to generate trip-level clusters - self.trip_grouper = eamtd.TripGrouper( - start_cluster_col='start_cluster_idx', - end_cluster_col='end_cluster_idx') - - # wrapper class to generate one-hot encodings for cluster indices - self.cluster_enc = eamtu.OneHotWrapper(sparse=False, - handle_unknown='ignore') - - # wrapper class to generate one-hot encodings for purposes and modes - self.purpose_enc = eamtu.OneHotWrapper(impute_missing=True, - sparse=False, - handle_unknown='error') - self.mode_enc = eamtu.OneHotWrapper(impute_missing=True, - sparse=False, - handle_unknown='error') - - # ensemble classifiers for each label category - self.purpose_predictor = RandomForestClassifier( - n_estimators=self.n_estimators, - criterion=self.criterion, - max_depth=self.max_depth, - min_samples_split=self.min_samples_split, - min_samples_leaf=self.min_samples_leaf, - max_features=self.max_features, - bootstrap=self.bootstrap, - random_state=self.random_state) - self.mode_predictor = RandomForestClassifier( - n_estimators=self.n_estimators, - criterion=self.criterion, - max_depth=self.max_depth, - min_samples_split=self.min_samples_split, - min_samples_leaf=self.min_samples_leaf, - max_features=self.max_features, - bootstrap=self.bootstrap, - random_state=self.random_state) - self.replaced_predictor = RandomForestClassifier( - n_estimators=self.n_estimators, - criterion=self.criterion, - max_depth=self.max_depth, - min_samples_split=self.min_samples_split, - min_samples_leaf=self.min_samples_leaf, - max_features=self.max_features, - bootstrap=self.bootstrap, - random_state=self.random_state) + self.model=ForestClassifierModel(config=config) def fit(self,trips: List[ecwc.Confirmedtrip]): - # get location features + ''' + trips : List of Entry type data + ''' + # check and raise exception if no data to fit logging.debug(f'fit called with {len(trips)} trips') unlabeled = list(filter(lambda t: len(t['data']['user_input']) == 0, trips)) if len(unlabeled) > 0: msg = f'model.fit cannot be called with unlabeled trips, found {len(unlabeled)}' - raise Exception(msg) - data_df = estb.BuiltinTimeSeries.to_data_df("analysis/confirmed_trip",trips) - - if self.loc_feature == 'cluster': - # fit clustering model(s) and one-hot encode their indices - # TODO: consolidate start/end_cluster_model in a single instance - # that has a location_type parameter in the fit() method - self.end_cluster_model.fit(data_df) - - clusters_to_encode = self.end_cluster_model.train_df[[ - 'end_cluster_idx' - ]].copy() # copy is to avoid SettingWithCopyWarning - - if self.use_start_clusters or self.use_trip_clusters: - self.start_cluster_model.fit(data_df) - - if self.use_start_clusters: - clusters_to_encode = pd.concat([ - clusters_to_encode, - self.start_cluster_model.train_df[['start_cluster_idx']] - ], - axis=1) - if self.use_trip_clusters: - start_end_clusters = pd.concat([ - self.end_cluster_model.train_df[['end_cluster_idx']], - self.start_cluster_model.train_df[['start_cluster_idx']] - ], - axis=1) - trip_cluster_idx = self.trip_grouper.fit_transform( - start_end_clusters) - clusters_to_encode.loc[:, - 'trip_cluster_idx'] = trip_cluster_idx - - loc_features_df = self.cluster_enc.fit_transform( - clusters_to_encode.astype(int)) - - # clean the df again because we need it in the next step - # TODO: remove redundancy - self.train_df = self._clean_data(data_df) - - # TODO: move below code into a reusable function - if self.train_df.purpose_true.isna().any(): - num_nan = self.train_df.purpose_true.value_counts( - dropna=False).loc[np.nan] - logging.info( - f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels' - ) - self.train_df = self.train_df.dropna( - subset=['purpose_true']).reset_index(drop=True) - if len(self.train_df) == 0: - # i.e. no valid trips after removing all nans - raise Exception('no valid trips; nothing to fit') - - else: # self.loc_feature == 'coordinates' - self.train_df = self._clean_data(data_df) - - # TODO: move below code into a reusable function - if self.train_df.purpose_true.isna().any(): - num_nan = self.train_df.purpose_true.value_counts( - dropna=False).loc[np.nan] - logging.info( - f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels' - ) - self.train_df = self.train_df.dropna( - subset=['purpose_true']).reset_index(drop=True) - if len(self.train_df) == 0: - # i.e. no valid trips after removing all nans - raise Exception('no valid trips; nothing to fit') - - loc_features_df = self.train_df[[ - 'start_lon', 'start_lat', 'end_lon', 'end_lat' - ]] - - # prepare data for the ensemble classifiers - - # note that we want to use purpose data to aid our mode predictions, - # and use both purpose and mode data to aid our replaced-mode - # predictions - # thus, we want to one-hot encode the purpose and mode as data - # features, but also preserve an unencoded copy for the target columns - - # dataframe holding all features and targets - self.Xy_train = pd.concat( - [self.train_df[self.base_features + self.targets], loc_features_df], - axis=1) - - # encode purposes and modes - onehot_purpose_df = self.purpose_enc.fit_transform( - self.Xy_train[['purpose_true']], output_col_prefix='purpose') - onehot_mode_df = self.mode_enc.fit_transform( - self.Xy_train[['mode_true']], output_col_prefix='mode') - self.Xy_train = pd.concat( - [self.Xy_train, onehot_purpose_df, onehot_mode_df], axis=1) - - # for predicting purpose, drop encoded purpose and mode features, as - # well as all target labels - self.X_purpose = self.Xy_train.dropna(subset=['purpose_true']).drop( - labels=self.targets + self.purpose_enc.onehot_encoding_cols + - self.mode_enc.onehot_encoding_cols, - axis=1) - - # for predicting mode, we want to keep purpose data - self.X_mode = self.Xy_train.dropna(subset=['mode_true']).drop( - labels=self.targets + self.mode_enc.onehot_encoding_cols, axis=1) - - # for predicting replaced-mode, we want to keep purpose and mode data - self.X_replaced = self.Xy_train.dropna(subset=['replaced_true']).drop( - labels=self.targets, axis=1) - - self.y_purpose = self.Xy_train['purpose_true'].dropna() - self.y_mode = self.Xy_train['mode_true'].dropna() - self.y_replaced = self.Xy_train['replaced_true'].dropna() - - # fit classifiers - if len(self.X_purpose) > 0: - self.purpose_predictor.fit(self.X_purpose, self.y_purpose) - if len(self.X_mode) > 0: - self.mode_predictor.fit(self.X_mode, self.y_mode) - if len(self.X_replaced) > 0: - self.replaced_predictor.fit(self.X_replaced, self.y_replaced) - logging.info(f"Forest model fit to {len(trips)} rows of trip data") - - def predict(self, trips: List[float]) -> Tuple[List[Dict], int]: - logging.debug(f"forest classifier predict called with {len(trips)} trips") - - if len(trips) == 0: - msg = f'model.predict cannot be called with 0 trips' raise Exception(msg) - # CONVERT TRIPS TO dataFrame - test_df = estb.BuiltinTimeSeries.to_data_df("analysis/confirmed_trip",trips) - - self.X_test_for_purpose = self._get_X_test_for_purpose(test_df) - - ######################## - ### make predictions ### - ######################## - # note that we want to use purpose data to aid our mode predictions, - # and use both purpose and mode data to aid our replaced-mode - # predictions - try: - purpose_proba_raw = self.purpose_predictor.predict_proba( - self.X_test_for_purpose) - purpose_proba = pd.DataFrame( - purpose_proba_raw, columns=self.purpose_predictor.classes_) - purpose_pred = purpose_proba.idxmax(axis=1) - - # update X_test with one-hot-encoded purpose predictions to aid - # mode predictor - onehot_purpose_df = self.purpose_enc.transform( - pd.DataFrame(purpose_pred).set_index( - self.X_test_for_purpose.index)) - self.X_test_for_mode = pd.concat( - [self.X_test_for_purpose, onehot_purpose_df], axis=1) - - mode_proba, replaced_proba = self._try_predict_proba_mode_replaced() - - except NotFittedError as e: - # if we can't predict purpose, we can still try to predict mode and - # replaced-mode without one-hot encoding the purpose - - purpose_pred = np.full((len(self.X_test_for_purpose), ), np.nan) - purpose_proba_raw = np.full((len(self.X_test_for_purpose), 1), 0) - purpose_proba = pd.DataFrame(purpose_proba_raw, columns=[np.nan]) - - self.X_test_for_mode = self.X_test_for_purpose - mode_proba, replaced_proba = self._try_predict_proba_mode_replaced() - - mode_pred = mode_proba.idxmax(axis=1) - replaced_pred = replaced_proba.idxmax(axis=1) - - if (purpose_pred.dtype == np.float64 and mode_pred.dtype == np.float64 - and replaced_pred.dtype == np.float64): - # this indicates that all the predictions are np.nan so none of the - # random forest classifiers were fitted - raise NotFittedError - - proba_dfs = [] - for label_type, proba in zip( - ['purpose', 'mode', 'replaced'], - [purpose_proba, mode_proba, replaced_proba]): - proba['top_pred'] = proba.idxmax(axis=1) - proba['top_proba'] = proba.max(axis=1, skipna=True) - proba['clusterable'] = self._clusterable( - self.X_test_for_purpose).astype(bool) - proba = pd.concat([proba], keys=[label_type], axis=1) - proba_dfs += [proba] - - self.proba_df = pd.concat(proba_dfs, axis=1) - return self.proba_df - - def _try_predict_proba_mode_replaced(self): - """ Try to predict mode and replaced-mode. Handles error in case the - ensemble algorithms were not fitted. + #Convert List of Entry to dataframe + data_df = estb.BuiltinTimeSeries.to_data_df("analysis/confirmed_trip",trips) + labeled_trip_df = esdtq.filter_labeled_trips(data_df) + expanded_labeled_trip_df= esdtq.expand_userinputs(labeled_trip_df) + #fit models on dataframe + self.model.fit(expanded_labeled_trip_df) + + + def predict(self, trip: List[float]) -> Tuple[List[Dict], int]: + ''' + trip : A single trip whose mode, pupose and replaced mode are required + returns. + ''' + + #check if theres no trip to predict + logging.debug(f"forest classifier predict called with {len(trip)} trips") + if len(trip) == 0: + msg = f'model.predict cannot be called with an empty trips' + raise Exception(msg) + # CONVERT LIST OF TRIPS TO dataFrame + test_df = estb.BuiltinTimeSeries.to_data_df("analysis/confirmed_trip",[trip]) + labeled_trip_df = esdtq.filter_labeled_trips(test_df) + expanded_labeled_trip_df= esdtq.expand_userinputs(labeled_trip_df) + predcitions_df= self.model.predict(expanded_labeled_trip_df) + + # the predictions_df currently holds the highest probable options + # individually in all three categories. the predictions_df are in the form + # + # purpose_pred | purpose_proba | mode_pred | mode_proba | replaced_pred | replaced proba + # dog-park | 1.0 | e-bike | 0.99 | walk | 1.1 + # + # + # However, to keep the trip model general, the forest model is expected to return + # + #PREDICTIONS [ {'labels': {'mode_confirm': 'e-bike', 'replaced_mode': 'walk', 'purpose_confirm': 'dog-park'}, + # 'p': ( Currently average of the 3 probabilities)}] + labels= { + 'mode_confirm': predcitions_df['mode_pred'].iloc[0], + 'replaced_mode' : predcitions_df['replaced_pred'].iloc[0], + 'purpose_confirm' : predcitions_df['purpose_pred'].iloc[0] + } - Requires self.X_test_for_mode to have already been set. (These are - the DataFrames containing the test data to be passed into self. - mode_predictor.) - - Returns: mode_proba and replaced_proba, two DataFrames containing - class probabilities for mode and replaced-mode respectively + avg_proba = predcitions_df[['purpose_proba','mode_proba','replaced_proba']].mean(axis=1).iloc[0] + predictions =[{ + 'labels' : labels, + 'p' : avg_proba + }] + return predictions, len(predictions) + + def to_dict(self): """ - - try: - # predict mode - mode_proba_raw = self.mode_predictor.predict_proba( - self.X_test_for_mode) - mode_proba = pd.DataFrame(mode_proba_raw, - columns=self.mode_predictor.classes_) - mode_pred = mode_proba.idxmax(axis=1) - - # update X_test with one-hot-encoded mode predictions to aid - # replaced-mode predictor - onehot_mode_df = self.mode_enc.transform( - pd.DataFrame(mode_pred).set_index(self.X_test_for_mode.index)) - self.X_test_for_replaced = pd.concat( - [self.X_test_for_mode, onehot_mode_df], axis=1) - replaced_proba = self._try_predict_proba_replaced() - - except NotFittedError as e: - mode_proba_raw = np.full((len(self.X_test_for_mode), 1), 0) - mode_proba = pd.DataFrame(mode_proba_raw, columns=[np.nan]) - - # if we don't have mode predictions, we *could* still try to - # predict replaced mode (but if the user didn't input mode labels - # then it's unlikely they would input replaced-mode) - self.X_test_for_replaced = self.X_test_for_mode - replaced_proba = self._try_predict_proba_replaced() - - return mode_proba, replaced_proba - - def _get_X_test_for_purpose(self, test_df): - """ Do the pre-processing to get data that we can then pass into the - ensemble classifiers. + Convert the model to a dictionary suitable for storage. """ - if self.loc_feature == 'cluster': - # get clusters - self.end_cluster_model.predict(test_df) - clusters_to_encode = self.end_cluster_model.test_df[[ - 'end_cluster_idx' - ]].copy() # copy is to avoid SettingWithCopyWarning - - if self.use_start_clusters or self.use_trip_clusters: - self.start_cluster_model.predict(test_df) - - if self.use_start_clusters: - clusters_to_encode = pd.concat([ - clusters_to_encode, - self.start_cluster_model.test_df[['start_cluster_idx']] - ], - axis=1) - if self.use_trip_clusters: - start_end_clusters = pd.concat([ - self.end_cluster_model.test_df[['end_cluster_idx']], - self.start_cluster_model.test_df[['start_cluster_idx']] - ], - axis=1) - trip_cluster_idx = self.trip_grouper.transform( - start_end_clusters) - clusters_to_encode.loc[:, - 'trip_cluster_idx'] = trip_cluster_idx - - # one-hot encode the cluster indices - loc_features_df = self.cluster_enc.transform(clusters_to_encode) - else: # self.loc_feature == 'coordinates' - test_df = self._clean_data(test_df) - loc_features_df = test_df[[ - 'start_lon', 'start_lat', 'end_lon', 'end_lat' - ]] - - # extract the desired data - X_test = pd.concat([ - test_df[self.base_features].reset_index(drop=True), - loc_features_df.reset_index(drop=True) - ], - axis=1) - - return X_test - - - def _clusterable(self, test_df): - """ Check if the end points can be clustered (i.e. are within - meters of an end point from the training set) + data={} + attr=[ 'purpose_predictor','mode_predictor','replaced_predictor','purpose_enc','mode_enc','train_df'] + if self.model.loc_feature == 'cluster': + ## confirm this includes all the extra encoders/models + attr.extend([ 'cluster_enc','end_cluster_model','start_cluster_model','trip_grouper']) + for attribute_name in attr: + buffer=BytesIO() + joblib.dump(getattr(self.model,attribute_name),buffer) + buffer.seek(0) + data[attribute_name]=buffer.getvalue() + + return data + + def from_dict(self,model: Dict): """ - if self.loc_feature == 'cluster': - return self.end_cluster_model.test_df.end_cluster_idx >= 0 - - n_samples = test_df.shape[0] - clustered = np.ones(shape=n_samples, dtype=int) * False - - train_coordinates = self.train_df[['end_lat', 'end_lon']] - train_radians = np.radians(train_coordinates) - - for idx, row in test_df.reset_index(drop=True).iterrows(): - # calculate the distances between the ith test data and all points, - # then find the minimum distance for each point and check if it's - # within the distance threshold. - # unfortunately, pairwise_distances_argmin() does not support - # haversine distance, so we have to reimplement it ourselves - new_loc_radians = np.radians(row[["end_lat", "end_lon"]].to_list()) - new_loc_radians = np.reshape(new_loc_radians, (1, 2)) - dist_matrix_meters = haversine_distances( - new_loc_radians, train_radians) * EARTH_RADIUS - - shortest_dist = np.min(dist_matrix_meters) - if shortest_dist < self.radius: - clustered[idx] = True - - return clustered - - def _clean_data(self, df): - """ Clean a dataframe of trips. - (Drop trips with missing start/end locations, expand the user input - columns, ensure all essential columns are present) - - Args: - df: a dataframe of trips. must contain the columns 'start_loc', - 'end_loc', and should also contain the user input columns - ('mode_confirm', 'purpose_confirm', 'replaced_mode') if - available - """ - assert 'start_loc' in df.columns and 'end_loc' in df.columns - - # clean up the dataframe by dropping entries with NaN locations and - # reset index - num_nan = 0 - if df.start_loc.isna().any(): - num_nan += df.start_loc.value_counts(dropna=False).loc[np.nan] - df = df.dropna(subset=['start_loc']) - if df.end_loc.isna().any(): - num_nan += df.end_loc.value_counts(dropna=False).loc[np.nan] - df = df.dropna(subset=['end_loc']) - - # expand the 'start_loc' and 'end_loc' column into 'start_lat', - # 'start_lon', 'end_lat', and 'end_lon' columns - df = self.expand_coords(df) - - # drop trips with missing coordinates - if df.start_lat.isna().any(): - num_nan += df.start_lat.value_counts(dropna=False).loc[np.nan] - df = df.dropna(subset=['start_lat']) - if df.start_lon.isna().any(): - num_nan += df.start_lon.value_counts(dropna=False).loc[np.nan] - df = df.dropna(subset=['start_lon']) - if df.end_lat.isna().any(): - num_nan += df.end_lat.value_counts(dropna=False).loc[np.nan] - df = df.dropna(subset=['end_lat']) - if df.end_lon.isna().any(): - num_nan = df.end_lon.value_counts(dropna=False).loc[np.nan] - df += df.dropna(subset=['end_lon']) - if num_nan > 0: - logging.info( - f'dropped {num_nan} trips that are missing location coordinates' - ) - - df = df.rename( - columns={ - 'mode_confirm': 'mode_true', - 'purpose_confirm': 'purpose_true', - 'replaced_mode': 'replaced_true' - }) - - for category in ['mode_true', 'purpose_true', 'replaced_true']: - if category not in df.columns: - # for example, if a user labels all their trip modes but none of their trip purposes - df.loc[:, category] = np.nan - - return df.reset_index(drop=True) - - def expand_coords(exp_df, purpose=None): + Load the model from a dictionary. """ - copied and modifed from get_loc_df_for_purpose() in the 'Radius - selection' notebook + attr=[ 'purpose_predictor','mode_predictor','replaced_predictor','purpose_enc','mode_enc','train_df'] + if self.model.loc_feature == 'cluster': + ## TODO : confirm this includes all the extra encoders/models + attr.extend([ 'cluster_enc','end_cluster_model','start_cluster_model','trip_grouper']) + for attribute_name in attr: + try: + if attribute_name in model: + buffer = BytesIO(model[attribute_name]) + setattr(self.model,attribute_name, joblib.load(buffer)) + except Exception as e: + print(f"Error loading {attribute_name}: {str(e)}") + # If we do not wish to raise the exception after logging the error, comment the line below + raise e + + def extract_features(self, trip: ecwc.Confirmedtrip) -> List[float]: """ - purpose_trips = exp_df - if purpose is not None: - purpose_trips = exp_df[exp_df.purpose_confirm == purpose] - - dfs = [purpose_trips] - for loc_type in ['start', 'end']: - df = pd.DataFrame( - purpose_trips[loc_type + - "_loc"].apply(lambda p: p["coordinates"]).to_list(), - columns=[loc_type + "_lon", loc_type + "_lat"]) - df = df.set_index(purpose_trips.index) - dfs.append(df) - - # display.display(end_loc_df.head()) - return pd.concat(dfs, axis=1) - -def to_dict(self): - """ - Convert the model to a dictionary suitable for storage. - """ - data = { - 'purpose_predictor': joblib.dumps(self.purpose_predictor).hex(), - 'mode_predictor': joblib.dumps(self.mode_predictor).hex(), - 'replaced_predictor': joblib.dumps(self.replaced_predictor).hex(), - 'cluster_enc': joblib.dumps(self.cluster_enc).hex(), - 'purpose_enc': joblib.dumps(self.purpose_enc).hex(), - 'mode_enc': joblib.dumps(self.mode_enc).hex(), - } - - if self.loc_feature == 'cluster': - data.update({ - 'end_cluster_model' : joblib.dumps(self.end_cluster_model).hex(), - 'start_cluster_model': joblib.dumps(self.start_cluster_model).hex(), - 'trip_grouper': joblib.dumps(self.trip_grouper).hex()}) + extract the relevant features for learning from a trip for this model instance - return data + :param trip: the trip to extract features from + :type trip: Confirmedtrip + :return: a vector containing features to predict from + :rtype: List[float] + """ + pass -def from_dict(self, model_data: Dict): - """ - Load the model from a dictionary. - """ - self.purpose_predictor = joblib.loads(bytes.fromhex(model_data['purpose_predictor'])) - self.mode_predictor = joblib.loads(bytes.fromhex(model_data['mode_predictor'])) - self.replaced_predictor = joblib.loads(bytes.fromhex(model_data['replaced_predictor'])) - self.cluster_enc = joblib.loads(bytes.fromhex(model_data['cluster_enc'])) - self.purpose_enc = joblib.loads(bytes.fromhex(model_data['purpose_enc'])) - self.mode_enc = joblib.loads(bytes.fromhex(model_data['mode_enc'])) - if self.loc_feature == 'cluster': - self.end_cluster_model = joblib.loads(bytes.fromhex(model_data['end_cluster_model'])) - self.start_cluster_model = joblib.loads(bytes.fromhex(model_data['start_cluster_model'])) - self.trip_grouper = joblib.loads(bytes.fromhex(model_data['trip_grouper'])) + def is_incremental(self) -> bool: + """ + whether this model requires the complete user history to build (False), + or, if only the incremental data since last execution is required (True). + :return: if the model is incremental. the current timestamp will be recorded + in the analysis pipeline. the next call to this model will only include + trip data for trips later than the recorded timestamp. + :rtype: bool + """ + pass \ No newline at end of file diff --git a/emission/analysis/modelling/trip_model/model_type.py b/emission/analysis/modelling/trip_model/model_type.py index 2d7e6f743..56268a51a 100644 --- a/emission/analysis/modelling/trip_model/model_type.py +++ b/emission/analysis/modelling/trip_model/model_type.py @@ -26,17 +26,16 @@ def build(self, config=None) -> eamuu.TripModel: :raises KeyError: if the requested model name does not exist """ # Dict[ModelType, TripModel] - MODELS = { - #ModelType.GREEDY_SIMILARITY_BINNING: eamug.GreedySimilarityBinning(config), - ModelType.RANDOM_FOREST_CLASSIFIER: eamuf.ForestClassifier(config) - } + MODELS = { + ModelType.GREEDY_SIMILARITY_BINNING: eamug.GreedySimilarityBinning, + ModelType.RANDOM_FOREST_CLASSIFIER: eamuf.ForestClassifier + } model = MODELS.get(self) if model is None: - model_names = list(lambda e: e.name, MODELS.keys()) - models = ",".join(model_names) - raise KeyError(f"ModelType {self.name} not found in factory, please add to build method") - - return model + available_models = ', '.join([ e.name for e in ModelType]) + raise KeyError(f"ModelType {self.name} not found in factory, Available models are {available_models}."\ + "Otherwise please add new model to build method") + return model(config) @classmethod def names(cls): diff --git a/emission/analysis/modelling/trip_model/models.py b/emission/analysis/modelling/trip_model/models.py new file mode 100644 index 000000000..a8da464c4 --- /dev/null +++ b/emission/analysis/modelling/trip_model/models.py @@ -0,0 +1,1194 @@ +######################################################################## +## Copied from /e-mission-eval-private-data/TRB_label_assist/models.py## +######################################################################## + + + +import pandas as pd +import numpy as np +from abc import ABCMeta, abstractmethod # to define abstract class "blueprints" +import logging +import copy + +# sklearn imports +from sklearn.pipeline import make_pipeline +from sklearn.preprocessing import StandardScaler, OneHotEncoder +from sklearn.impute import SimpleImputer +from sklearn.metrics.pairwise import haversine_distances +from sklearn.cluster import DBSCAN +from sklearn import svm +from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier +from sklearn.tree import DecisionTreeClassifier +from sklearn.exceptions import NotFittedError + +# NOTE: tour_model_extended.similarity is on the +# eval-private-data-compatibility branch in e-mission-server + +# logging.basicConfig(level=logging.DEBUG) + +EARTH_RADIUS = 6371000 + +############################# +## define abstract classes ## +############################# + + +class SetupMixin(metaclass=ABCMeta): + """ class containing code to be reused when setting up estimators. """ + + def _clean_data(self, df): + """ Clean a dataframe of trips. + (Drop trips with missing start/end locations, expand the user input + columns, ensure all essential columns are present) + + Args: + df: a dataframe of trips. must contain the columns 'start_loc', + 'end_loc', and should also contain the user input columns + ('mode_confirm', 'purpose_confirm', 'replaced_mode') if + available + """ + assert 'start_loc' in df.columns and 'end_loc' in df.columns + + # clean up the dataframe by dropping entries with NaN locations and + # reset index + num_nan = 0 + if df.start_loc.isna().any(): + num_nan += df.start_loc.value_counts(dropna=False).loc[np.nan] + df = df.dropna(subset=['start_loc']) + if df.end_loc.isna().any(): + num_nan += df.end_loc.value_counts(dropna=False).loc[np.nan] + df = df.dropna(subset=['end_loc']) + + # expand the 'start_loc' and 'end_loc' column into 'start_lat', + # 'start_lon', 'end_lat', and 'end_lon' columns + df = self.expand_coords(df) + + # drop trips with missing coordinates + if df.start_lat.isna().any(): + num_nan += df.start_lat.value_counts(dropna=False).loc[np.nan] + df = df.dropna(subset=['start_lat']) + if df.start_lon.isna().any(): + num_nan += df.start_lon.value_counts(dropna=False).loc[np.nan] + df = df.dropna(subset=['start_lon']) + if df.end_lat.isna().any(): + num_nan += df.end_lat.value_counts(dropna=False).loc[np.nan] + df = df.dropna(subset=['end_lat']) + if df.end_lon.isna().any(): + num_nan = df.end_lon.value_counts(dropna=False).loc[np.nan] + df += df.dropna(subset=['end_lon']) + if num_nan > 0: + logging.info( + f'dropped {num_nan} trips that are missing location coordinates' + ) + + df = df.rename( + columns={ + 'mode_confirm': 'mode_true', + 'purpose_confirm': 'purpose_true', + 'replaced_mode': 'replaced_true' + }) + + for category in ['mode_true', 'purpose_true', 'replaced_true']: + if category not in df.columns: + # for example, if a user labels all their trip modes but none of their trip purposes + df.loc[:, category] = np.nan + + return df.reset_index(drop=True) + + def expand_coords(self,exp_df, purpose=None): + """ + copied and modifed from get_loc_df_for_purpose() in the 'Radius + selection' notebook + """ + purpose_trips = exp_df + if purpose is not None: + purpose_trips = exp_df[exp_df.purpose_confirm == purpose] + + dfs = [purpose_trips] + for loc_type in ['start', 'end']: + df = pd.DataFrame( + purpose_trips[loc_type + + "_loc"].apply(lambda p: p["coordinates"]).to_list(), + columns=[loc_type + "_lon", loc_type + "_lat"]) + df = df.set_index(purpose_trips.index) + dfs.append(df) + + # display.display(end_loc_df.head()) + return pd.concat(dfs, axis=1) + + +class Cluster(SetupMixin, metaclass=ABCMeta): + """ blueprint for clustering models. """ + + @abstractmethod + def fit(self, train_df,ct_entry=None): + """ Fit the clustering algorithm. + + Args: + train_df (DataFrame): dataframe of labeled trips + ct_entry (List) : A list of Entry type of labeled and unlabeled trips + + Returns: + self + """ + raise NotImplementedError + + @abstractmethod + def predict(self, test_df): + """ Predict cluster indices for trips, if possible. Trips that could + not be clustered will have the index -1. + + Args: + test_df (DataFrame): dataframe of test trips + + Returns: + pd DataFrame containing one column, 'start_cluster_idx' or + 'end_cluster_idx' + """ + raise NotImplementedError + + def fit_predict(self, train_df): + """ Fit the clustering algorithm and predict cluster indices for trips, + if possible. Trips that could not be clustered will have the index -1. + + Args: + train_df (DataFrame): dataframe of labeled trips + + Returns: + pd DataFrame containing one column, 'start_cluster_idx' or + 'end_cluster_idx' + """ + self.fit(train_df) + return self.predict(train_df) + + +class TripClassifier(SetupMixin, metaclass=ABCMeta): + + @abstractmethod + def fit(self, train_df,ct_entry=None): + """ Fit a classification model. + + Args: + train_df (DataFrame): dataframe of labeled trips + ct_entry (List) : A list of Entry type of labeled and unlabeled trips + + Returns: + self + """ + raise NotImplementedError + + def predict(self, test_df): + """ Predict trip labels. + + Args: + test_df (DataFrame): dataframe of trips + + Returns: + DataFrame containing the following columns: + 'purpose_pred', 'mode_pred', 'replaced_pred', + 'purpose_proba', 'mode_proba', 'replaced_proba' + the *_pred columns contain the most-likely label prediction + (string for a label or float for np.nan). + the *_proba columns contain the probability of the most-likely + prediction. + """ + proba_df = self.predict_proba(test_df) + prediction_df = proba_df.loc[:, [('purpose', 'top_pred'), + ('purpose', 'top_proba'), + ('mode', 'top_pred'), + ('mode', 'top_proba'), + ('replaced', 'top_pred'), + ('replaced', 'top_proba')]] + + prediction_df.columns = prediction_df.columns.to_flat_index() + prediction_df = prediction_df.rename( + columns={ + ('purpose', 'top_pred'): 'purpose_pred', + ('purpose', 'top_proba'): 'purpose_proba', + ('mode', 'top_pred'): 'mode_pred', + ('mode', 'top_proba'): 'mode_proba', + ('replaced', 'top_pred'): 'replaced_pred', + ('replaced', 'top_proba'): 'replaced_proba', + }) + + return prediction_df + + def fit_predict(self, train_df): + """ Fit a classification model and predict trip labels. + + Args: + train_df (DataFrame): dataframe of labeled trips + + Returns: + DataFrame containing the following columns: + 'purpose_pred', 'mode_pred', 'replaced_pred', + 'purpose_proba', 'mode_proba', 'replaced_proba' + the *_pred columns contain the most-likely label prediction + (string for a label or float for np.nan). + the *_proba columns contain the probability of the most-likely + prediction. + """ + self.fit(train_df) + return self.predict(train_df) + + @abstractmethod + def predict_proba(self, test_df): + """ Predict class probabilities for each trip. + + NOTE: check the specific model to see if the class probabilities + have confidence-discounting or not. + + Args: + test_df (DataFrame): dataframe of trips + + Returns: + DataFrame with multiindexing. Each row represents a trip. There + are 3 columns at level 1, one for each label category + ('purpose', 'mode', 'replaced'). Within each category, there is + a column for each label, with the row's entry being the + probability that the trip has the label. There are three + additional columns within each category, one indicating the + most-likely label, one indicating the probability of the + most-likely label, and one indicating whether or not the trip + can be clustered. + TODO: add a fourth optional column for the number of trips in + the cluster (if clusterable) + + Level 1 columns are: purpose, mode, replaced + Lebel 2 columns are: + , , ... top_pred, top_proba, clusterable + , , ... top_pred, top_proba, clusterable + , , ... top_pred, top_proba, clusterable + """ + raise NotImplementedError + + +class DBSCANSVMCluster(Cluster): + """ DBSCAN-based clustering algorithm that optionally implements SVM + sub-clustering. + + Args: + loc_type (str): 'start' or 'end', the type of point to cluster + radius (int): max distance between two points in each other's + neighborhood, i.e. DBSCAN's eps value. does not strictly + dictate final cluster size + size_thresh (int): the min number of trips a cluster must have + to be considered for SVM sub-division + purity_thresh (float): the min purity a cluster must have + to be sub-divided using SVM + gamma (float): coefficient for the rbf kernel in SVM + C (float): regularization hyperparameter for SVM + + Attributes: + loc_type (str) + radius (int) + size_thresh (int) + purity_thresh (float) + gamma (float) + C (float) + train_df (DataFrame) + test_df (DataFrame) + base_model (sklearn Estimator) + """ + + def __init__(self, + loc_type='end', + radius=100, + svm=True, + size_thresh=1, + purity_thresh=1.0, + gamma=0.05, + C=1): + logging.info("PERF: Initializing DBSCANSVMCluster") + self.loc_type = loc_type + self.radius = radius + self.svm = svm + self.size_thresh = size_thresh + self.purity_thresh = purity_thresh + self.gamma = gamma + self.C = C + + def set_params(self, params): + if 'loc_type' in params.keys(): self.loc_type = params['loc_type'] + if 'radius' in params.keys(): self.radius = params['radius'] + if 'svm' in params.keys(): self.svm = params['svm'] + if 'size_thresh' in params.keys(): + self.size_thresh = params['size_thresh'] + if 'purity_thresh' in params.keys(): + self.purity_thresh = params['purity_thresh'] + if 'gamma' in params.keys(): self.gamma = params['gamma'] + + return self + + def fit(self, train_df,ct_entry=None): + """ Creates clusters of trip points. + self.train_df will be updated with columns containing base and + final clusters. + + TODO: perhaps move the loc_type argument to fit() so we can use a + single class instance to cluster both start and end points. This + will also help us reduce duplicate data. + + Args: + train_df (dataframe): dataframe of labeled trips + ct_entry (List) : A list of Entry type of labeled and unlabeled trips + """ + ################## + ### clean data ### + ################## + logging.info("PERF: Fitting DBSCANSVMCluster") + self.train_df = self._clean_data(train_df) + + # we can use all trips as long as they have purpose labels. it's ok if + # they're missing mode/replaced-mode labels, because they aren't as + # strongly correlated with location compared to purpose + # TODO: actually, we may want to rethink this. for example, it will + # probably be helpful to include trips that are missing purpose labels + # but still have mode labels. + if self.train_df.purpose_true.isna().any(): + num_nan = self.train_df.purpose_true.value_counts( + dropna=False).loc[np.nan] + logging.info( + f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels' + ) + self.train_df = self.train_df.dropna( + subset=['purpose_true']).reset_index(drop=True) + if len(self.train_df) == 0: + # i.e. no valid trips after removing all nans + raise Exception('no valid trips; nothing to fit') + + ######################### + ### get base clusters ### + ######################### + dist_matrix_meters = get_distance_matrix(self.train_df, self.loc_type) + self.base_model = DBSCAN(self.radius, + metric="precomputed", + min_samples=1).fit(dist_matrix_meters) + base_clusters = self.base_model.labels_ + + self.train_df.loc[:, + f'{self.loc_type}_base_cluster_idx'] = base_clusters + + ######################## + ### get sub-clusters ### + ######################## + # copy base cluster column into final cluster column + self.train_df.loc[:, f'{self.loc_type}_cluster_idx'] = self.train_df[ + f'{self.loc_type}_base_cluster_idx'] + + if self.svm: + c = 0 # count of how many clusters we have iterated over + + # iterate over all clusters and subdivide them with SVM. the while + # loop is so we can do multiple iterations of subdividing if needed + while c < self.train_df[f'{self.loc_type}_cluster_idx'].max(): + points_in_cluster = self.train_df[ + self.train_df[f'{self.loc_type}_cluster_idx'] == c] + + # only do SVM if we have the minimum num of trips in the cluster + if len(points_in_cluster) < self.size_thresh: + c += 1 + continue + + # only do SVM if purity is below threshold + purity = single_cluster_purity(points_in_cluster, + label_col='purpose_true') + if purity < self.purity_thresh: + X = points_in_cluster[[ + f"{self.loc_type}_lon", f"{self.loc_type}_lat" + ]] + y = points_in_cluster.purpose_true.to_list() + + svm_model = make_pipeline( + StandardScaler(), + svm.SVC( + kernel='rbf', + gamma=self.gamma, + C=self.C, + )).fit(X, y) + labels = svm_model.predict(X) + unique_labels = np.unique(labels) + + # if the SVM predicts that all points in the cluster have + # the same label, just ignore it and don't reindex. + # this also helps us to handle the possibility that a + # cluster may be impure but inherently inseparable, e.g. an + # end cluster at a user's home, containing 50% trips from + # work to home and 50% round trips that start and end at + # home. we don't want to reindex otherwise the low purity + # will trigger SVM again, and we will attempt & fail to + # split the cluster ad infinitum + if len(unique_labels) > 1: + # map purpose labels to new cluster indices + # we offset indices by the max existing index so that we + # don't run into any duplicate indices + max_existing_idx = self.train_df[ + f'{self.loc_type}_cluster_idx'].max() + label_to_cluster = { + unique_labels[i]: i + max_existing_idx + 1 + for i in range(len(unique_labels)) + } + # update trips with their new cluster indices + indices = np.array( + [label_to_cluster[l] for l in labels]) + self.train_df.loc[ + self.train_df[f'{self.loc_type}_cluster_idx'] == c, + f'{self.loc_type}_cluster_idx'] = indices + + c += 1 + # TODO: make things categorical at the end? or maybe at the start of the decision tree pipeline + + return self + + def fit_predict(self, train_df): + """ Override to avoid unnecessarily computation of distance matrices. + """ + self.fit(train_df) + return self.train_df[[f'{self.loc_type}_cluster_idx']] + + def predict(self, test_df): + logging.info("PERF: Predicting DBSCANSVMCluster") + # TODO: store clusters as polygons so the prediction is faster + # TODO: we probably don't want to store test_df in self to be more memory-efficient + self.test_df = self._clean_data(test_df) + pred_clusters = self._NN_predict(self.test_df) + + self.test_df.loc[:, f'{self.loc_type}_cluster_idx'] = pred_clusters + + return self.test_df[[f'{self.loc_type}_cluster_idx']] + + def _NN_predict(self, test_df): + """ Generate base-cluster predictions for the test data using a + nearest-neighbor approach. + + sklearn doesn't implement predict() for DBSCAN, which is why we + need a custom method. + """ + logging.info("PERF: NN_predicting DBSCANSVMCluster") + n_samples = test_df.shape[0] + labels = np.ones(shape=n_samples, dtype=int) * -1 + + # get coordinates of core points (we can't use model.components_ + # because our input feature was a distance matrix and doesn't contain + # info about the raw coordinates) + # NOTE: technically, every single point in a cluster is a core point + # because it has at least minPts (2) points, including itself, in its + # radius + train_coordinates = self.train_df[[ + f'{self.loc_type}_lat', f'{self.loc_type}_lon' + ]] + train_radians = np.radians(train_coordinates) + + for idx, row in test_df.reset_index(drop=True).iterrows(): + # calculate the distances between the ith test data and all points, + # then find the index of the closest point. if the ith test data is + # within epsilon of the point, then assign its cluster to the ith + # test data (otherwise, leave it as -1, indicating noise). + # unfortunately, pairwise_distances_argmin() does not support + # haversine distance, so we have to reimplement it ourselves + new_loc_radians = np.radians( + row[[self.loc_type + "_lat", self.loc_type + "_lon"]].to_list()) + new_loc_radians = np.reshape(new_loc_radians, (1, 2)) + dist_matrix_meters = haversine_distances( + new_loc_radians, train_radians) * EARTH_RADIUS + + shortest_dist_idx = np.argmin(dist_matrix_meters) + if dist_matrix_meters[0, shortest_dist_idx] < self.radius: + labels[idx] = self.train_df.reset_index( + drop=True).loc[shortest_dist_idx, + f'{self.loc_type}_cluster_idx'] + + return labels + + + +class EnsembleClassifier(TripClassifier, metaclass=ABCMeta): + """ Template class for trip classifiers using ensemble algorithms. + + Required args: + loc_feature (str): 'coordinates' or 'cluster' + """ + base_features = [ + 'duration', + 'distance', + 'start_local_dt_year', + 'start_local_dt_month', + 'start_local_dt_day', + 'start_local_dt_hour', + # 'start_local_dt_minute', + 'start_local_dt_weekday', + 'end_local_dt_year', # most likely the same as the start year + 'end_local_dt_month', # most likely the same as the start month + 'end_local_dt_day', + 'end_local_dt_hour', + # 'end_local_dt_minute', + 'end_local_dt_weekday', + ] + targets = ['mode_true', 'purpose_true', 'replaced_true'] + + # required instance attributes + loc_feature = NotImplemented + purpose_enc = NotImplemented + mode_enc = NotImplemented + purpose_predictor = NotImplemented + mode_predictor = NotImplemented + replaced_predictor = NotImplemented + + # required methods + def fit(self, train_df,ct_entry=None): + # get location features + if self.loc_feature == 'cluster': + # fit clustering model(s) and one-hot encode their indices + # TODO: consolidate start/end_cluster_model in a single instance + # that has a location_type parameter in the fit() method + self.end_cluster_model.fit(train_df) + + clusters_to_encode = self.end_cluster_model.train_df[[ + 'end_cluster_idx' + ]].copy() # copy is to avoid SettingWithCopyWarning + + if self.use_start_clusters or self.use_trip_clusters: + self.start_cluster_model.fit(train_df) + + if self.use_start_clusters: + clusters_to_encode = pd.concat([ + clusters_to_encode, + self.start_cluster_model.train_df[['start_cluster_idx']] + ], + axis=1) + if self.use_trip_clusters: + start_end_clusters = pd.concat([ + self.end_cluster_model.train_df[['end_cluster_idx']], + self.start_cluster_model.train_df[['start_cluster_idx']] + ], + axis=1) + trip_cluster_idx = self.trip_grouper.fit_transform( + start_end_clusters) + clusters_to_encode.loc[:, + 'trip_cluster_idx'] = trip_cluster_idx + + loc_features_df = self.cluster_enc.fit_transform( + clusters_to_encode.astype(int)) + + # clean the df again because we need it in the next step + # TODO: remove redundancy + self.train_df = self._clean_data(train_df) + + # TODO: move below code into a reusable function + if self.train_df.purpose_true.isna().any(): + num_nan = self.train_df.purpose_true.value_counts( + dropna=False).loc[np.nan] + logging.info( + f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels' + ) + self.train_df = self.train_df.dropna( + subset=['purpose_true']).reset_index(drop=True) + if len(self.train_df) == 0: + # i.e. no valid trips after removing all nans + raise Exception('no valid trips; nothing to fit') + + else: # self.loc_feature == 'coordinates' + self.train_df = self._clean_data(train_df) + + # TODO: move below code into a reusable function + if self.train_df.purpose_true.isna().any(): + num_nan = self.train_df.purpose_true.value_counts( + dropna=False).loc[np.nan] + logging.info( + f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels' + ) + self.train_df = self.train_df.dropna( + subset=['purpose_true']).reset_index(drop=True) + if len(self.train_df) == 0: + # i.e. no valid trips after removing all nans + raise Exception('no valid trips; nothing to fit') + + loc_features_df = self.train_df[[ + 'start_lon', 'start_lat', 'end_lon', 'end_lat' + ]] + + # prepare data for the ensemble classifiers + + # note that we want to use purpose data to aid our mode predictions, + # and use both purpose and mode data to aid our replaced-mode + # predictions + # thus, we want to one-hot encode the purpose and mode as data + # features, but also preserve an unencoded copy for the target columns + + # dataframe holding all features and targets + self.Xy_train = pd.concat( + [self.train_df[self.base_features + self.targets], loc_features_df], + axis=1) + + # encode purposes and modes + onehot_purpose_df = self.purpose_enc.fit_transform( + self.Xy_train[['purpose_true']], output_col_prefix='purpose') + onehot_mode_df = self.mode_enc.fit_transform( + self.Xy_train[['mode_true']], output_col_prefix='mode') + self.Xy_train = pd.concat( + [self.Xy_train, onehot_purpose_df, onehot_mode_df], axis=1) + + # for predicting purpose, drop encoded purpose and mode features, as + # well as all target labels + self.X_purpose = self.Xy_train.dropna(subset=['purpose_true']).drop( + labels=self.targets + self.purpose_enc.onehot_encoding_cols + + self.mode_enc.onehot_encoding_cols, + axis=1) + + # for predicting mode, we want to keep purpose data + self.X_mode = self.Xy_train.dropna(subset=['mode_true']).drop( + labels=self.targets + self.mode_enc.onehot_encoding_cols, axis=1) + + # for predicting replaced-mode, we want to keep purpose and mode data + self.X_replaced = self.Xy_train.dropna(subset=['replaced_true']).drop( + labels=self.targets, axis=1) + + self.y_purpose = self.Xy_train['purpose_true'].dropna() + self.y_mode = self.Xy_train['mode_true'].dropna() + self.y_replaced = self.Xy_train['replaced_true'].dropna() + + # fit classifiers + if len(self.X_purpose) > 0: + self.purpose_predictor.fit(self.X_purpose, self.y_purpose) + if len(self.X_mode) > 0: + self.mode_predictor.fit(self.X_mode, self.y_mode) + if len(self.X_replaced) > 0: + self.replaced_predictor.fit(self.X_replaced, self.y_replaced) + + return self + + def predict_proba(self, test_df): + """ NOTE: these class probabilities do NOT have a + confidence-discounting heuristic applied. + """ + ################ + ### get data ### + ################ + self.X_test_for_purpose = self._get_X_test_for_purpose(test_df) + + ######################## + ### make predictions ### + ######################## + # note that we want to use purpose data to aid our mode predictions, + # and use both purpose and mode data to aid our replaced-mode + # predictions + + # TODO: some of the code across the try and except blocks can be + # consolidated by considering one-hot encoding fully np.nan arrays + try: + purpose_proba_raw = self.purpose_predictor.predict_proba( + self.X_test_for_purpose) + purpose_proba = pd.DataFrame( + purpose_proba_raw, columns=self.purpose_predictor.classes_) + purpose_pred = purpose_proba.idxmax(axis=1) + + # update X_test with one-hot-encoded purpose predictions to aid + # mode predictor + # TODO: converting purpose_pred to a DataFrame feels super + # unnecessary, make this more efficient + onehot_purpose_df = self.purpose_enc.transform( + pd.DataFrame(purpose_pred).set_index( + self.X_test_for_purpose.index)) + self.X_test_for_mode = pd.concat( + [self.X_test_for_purpose, onehot_purpose_df], axis=1) + + mode_proba, replaced_proba = self._try_predict_proba_mode_replaced() + + except NotFittedError as e: + # if we can't predict purpose, we can still try to predict mode and + # replaced-mode without one-hot encoding the purpose + + purpose_pred = np.full((len(self.X_test_for_purpose), ), np.nan) + purpose_proba_raw = np.full((len(self.X_test_for_purpose), 1), 0) + purpose_proba = pd.DataFrame(purpose_proba_raw, columns=[np.nan]) + + self.X_test_for_mode = self.X_test_for_purpose + mode_proba, replaced_proba = self._try_predict_proba_mode_replaced() + + mode_pred = mode_proba.idxmax(axis=1) + replaced_pred = replaced_proba.idxmax(axis=1) + + if (purpose_pred.dtype == np.float64 and mode_pred.dtype == np.float64 + and replaced_pred.dtype == np.float64): + # this indicates that all the predictions are np.nan so none of the + # random forest classifiers were fitted + raise NotFittedError + + # TODO: move this to a Mixin for cluster-based predictors and use the + # 'cluster' column of the proba_df outputs + # if self.drop_unclustered: + # # TODO: actually, we should only drop purpose predictions. we can + # # then impute the missing entries in the purpose feature and still + # # try to predict mode and replaced-mode without it + # self.predictions.loc[ + # self.end_cluster_model.test_df['end_cluster_idx'] == -1, + # ['purpose_pred', 'mode_pred', 'replaced_pred']] = np.nan + + proba_dfs = [] + for label_type, proba in zip( + ['purpose', 'mode', 'replaced'], + [purpose_proba, mode_proba, replaced_proba]): + proba['top_pred'] = proba.idxmax(axis=1) + proba['top_proba'] = proba.max(axis=1, skipna=True) + proba['clusterable'] = self._clusterable( + self.X_test_for_purpose).astype(bool) + proba = pd.concat([proba], keys=[label_type], axis=1) + proba_dfs += [proba] + + self.proba_df = pd.concat(proba_dfs, axis=1) + return self.proba_df + + def _get_X_test_for_purpose(self, test_df): + """ Do the pre-processing to get data that we can then pass into the + ensemble classifiers. + """ + if self.loc_feature == 'cluster': + # get clusters + self.end_cluster_model.predict(test_df) + clusters_to_encode = self.end_cluster_model.test_df[[ + 'end_cluster_idx' + ]].copy() # copy is to avoid SettingWithCopyWarning + + if self.use_start_clusters or self.use_trip_clusters: + self.start_cluster_model.predict(test_df) + + if self.use_start_clusters: + clusters_to_encode = pd.concat([ + clusters_to_encode, + self.start_cluster_model.test_df[['start_cluster_idx']] + ], + axis=1) + if self.use_trip_clusters: + start_end_clusters = pd.concat([ + self.end_cluster_model.test_df[['end_cluster_idx']], + self.start_cluster_model.test_df[['start_cluster_idx']] + ], + axis=1) + trip_cluster_idx = self.trip_grouper.transform( + start_end_clusters) + clusters_to_encode.loc[:, + 'trip_cluster_idx'] = trip_cluster_idx + + # one-hot encode the cluster indices + loc_features_df = self.cluster_enc.transform(clusters_to_encode) + else: # self.loc_feature == 'coordinates' + test_df = self._clean_data(test_df) + loc_features_df = test_df[[ + 'start_lon', 'start_lat', 'end_lon', 'end_lat' + ]] + + # extract the desired data + X_test = pd.concat([ + test_df[self.base_features].reset_index(drop=True), + loc_features_df.reset_index(drop=True) + ], + axis=1) + + return X_test + + def _try_predict_proba_mode_replaced(self): + """ Try to predict mode and replaced-mode. Handles error in case the + ensemble algorithms were not fitted. + + Requires self.X_test_for_mode to have already been set. (These are + the DataFrames containing the test data to be passed into self. + mode_predictor.) + + Returns: mode_proba and replaced_proba, two DataFrames containing + class probabilities for mode and replaced-mode respectively + """ + + try: + # predict mode + mode_proba_raw = self.mode_predictor.predict_proba( + self.X_test_for_mode) + mode_proba = pd.DataFrame(mode_proba_raw, + columns=self.mode_predictor.classes_) + mode_pred = mode_proba.idxmax(axis=1) + + # update X_test with one-hot-encoded mode predictions to aid + # replaced-mode predictor + onehot_mode_df = self.mode_enc.transform( + pd.DataFrame(mode_pred).set_index(self.X_test_for_mode.index)) + self.X_test_for_replaced = pd.concat( + [self.X_test_for_mode, onehot_mode_df], axis=1) + replaced_proba = self._try_predict_proba_replaced() + + except NotFittedError as e: + mode_proba_raw = np.full((len(self.X_test_for_mode), 1), 0) + mode_proba = pd.DataFrame(mode_proba_raw, columns=[np.nan]) + + # if we don't have mode predictions, we *could* still try to + # predict replaced mode (but if the user didn't input mode labels + # then it's unlikely they would input replaced-mode) + self.X_test_for_replaced = self.X_test_for_mode + replaced_proba = self._try_predict_proba_replaced() + + return mode_proba, replaced_proba + + def _try_predict_proba_replaced(self): + """ Try to predict replaced mode. Handles error in case the + replaced_predictor was not fitted. + + Requires self.X_test_for_replaced to have already been set. (This + is the DataFrame containing the test data to be passed into self. + replaced_predictor.) + + Returns: replaced_proba, DataFrame containing class probabilities + for replaced-mode + """ + try: + replaced_proba_raw = self.replaced_predictor.predict_proba( + self.X_test_for_replaced + ) # has shape (len_trips, number of replaced_mode classes) + replaced_proba = pd.DataFrame( + replaced_proba_raw, columns=self.replaced_predictor.classes_) + + except NotFittedError as e: + replaced_proba_raw = np.full((len(self.X_test_for_replaced), 1), 0) + replaced_proba = pd.DataFrame(replaced_proba_raw, columns=[np.nan]) + + return replaced_proba + + def _clusterable(self, test_df): + """ Check if the end points can be clustered (i.e. are within + meters of an end point from the training set) + """ + if self.loc_feature == 'cluster': + return self.end_cluster_model.test_df.end_cluster_idx >= 0 + + n_samples = test_df.shape[0] + clustered = np.ones(shape=n_samples, dtype=int) * False + + train_coordinates = self.train_df[['end_lat', 'end_lon']] + train_radians = np.radians(train_coordinates) + + for idx, row in test_df.reset_index(drop=True).iterrows(): + # calculate the distances between the ith test data and all points, + # then find the minimum distance for each point and check if it's + # within the distance threshold. + # unfortunately, pairwise_distances_argmin() does not support + # haversine distance, so we have to reimplement it ourselves + new_loc_radians = np.radians(row[["end_lat", "end_lon"]].to_list()) + new_loc_radians = np.reshape(new_loc_radians, (1, 2)) + dist_matrix_meters = haversine_distances( + new_loc_radians, train_radians) * EARTH_RADIUS + + shortest_dist = np.min(dist_matrix_meters) + if shortest_dist < self.radius: + clustered[idx] = True + + return clustered + + +class ForestClassifierModel(EnsembleClassifier): + """ Random forest-based trip classifier. + + Args: + loc_feature (str): 'coordinates' or 'cluster'; whether to use lat/ + lon coordinates or cluster indices for the location feature + radius (int): radius for DBSCAN clustering. only if + loc_feature=='cluster' + size_thresh (int): the min number of trips a cluster must have to + be considered for sub-division via SVM. only if + loc_feature=='cluster' + purity_thresh (float): the min purity a cluster must have to be + sub-divided via SVM. only if loc_feature=='cluster' + gamma (float): coefficient for the rbf kernel in SVM. only if + loc_feature=='cluster' + C (float): regularization hyperparameter for SVM. only if + loc_feature=='cluster' + n_estimators (int): number of estimators in the random forest + criterion (str): function to measure the quality of a split in the + random forest + max_depth (int): max depth of a tree in the random forest. + unlimited if None. + min_samples_split (int): min number of samples required to split an + internal node in a decision tree + min_samples_leaf (int): min number of samples required for a leaf + node in a decision tree + max_features (str): number of features to consider when looking for + the best split in a decision tree + bootstrap (bool): whether bootstrap samples are used when building + decision trees + random_state (int): random state for deterministic random forest + construction + use_start_clusters (bool): whether or not to use start clusters as + input features to the ensemble classifier. only if + loc_feature=='cluster' + use_trip_clusters (bool): whether or not to use trip-level clusters + as input features to the ensemble classifier. only if + loc_feature=='cluster' + """ + + def __init__(self,config): + + self.loc_feature = config['loc_feature'] + self.radius = config['radius'] + self.size_thresh = config['size_thresh'] + self.purity_thresh = config['purity_thresh'] + self.gamma = config['gamma'] + self.C = config['C'] + self.n_estimators = config['n_estimators'] + self.criterion =config['criterion'] + self.max_depth = config['max_depth'] if config['max_depth'] != 'null' else None + self.min_samples_split = config['min_samples_split'] + self.min_samples_leaf = config['min_samples_leaf'] + self.max_features = config['max_features'] + self.bootstrap = config['bootstrap'] + self.random_state = config['random_state'] + # self.drop_unclustered = drop_unclustered + self.use_start_clusters = config['use_start_clusters'] + self.use_trip_clusters = config['use_trip_clusters'] + self.base_features = [ + 'duration', + 'distance', + 'start_local_dt_year', + 'start_local_dt_month', + 'start_local_dt_day', + 'start_local_dt_hour', + 'start_local_dt_weekday', + 'end_local_dt_year', # most likely the same as the start year + 'end_local_dt_month', # most likely the same as the start month + 'end_local_dt_day', + 'end_local_dt_hour', + 'end_local_dt_weekday', + ] + self.targets = ['mode_true', 'purpose_true', 'replaced_true'] + + if self.loc_feature == 'cluster': + # clustering algorithm to generate end clusters + self.end_cluster_model = DBSCANSVMCluster( + loc_type='end', + radius=self.radius, + size_thresh=self.size_thresh, + purity_thresh=self.purity_thresh, + gamma=self.gamma, + C=self.C) + + if self.use_start_clusters or self.use_trip_clusters: + # clustering algorithm to generate start clusters + self.start_cluster_model = DBSCANSVMCluster( + loc_type='start', + radius=self.radius, + size_thresh=self.size_thresh, + purity_thresh=self.purity_thresh, + gamma=self.gamma, + C=self.C) + + if self.use_trip_clusters: + # helper class to generate trip-level clusters + self.trip_grouper = TripGrouper( + start_cluster_col='start_cluster_idx', + end_cluster_col='end_cluster_idx') + + # wrapper class to generate one-hot encodings for cluster indices + self.cluster_enc = OneHotWrapper(sparse=False, + handle_unknown='ignore') + + # wrapper class to generate one-hot encodings for purposes and modes + self.purpose_enc = OneHotWrapper(impute_missing=True, + sparse=False, + handle_unknown='error') + self.mode_enc = OneHotWrapper(impute_missing=True, + sparse=False, + handle_unknown='error') + + # ensemble classifiers for each label category + self.purpose_predictor = RandomForestClassifier( + n_estimators=self.n_estimators, + criterion=self.criterion, + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + min_samples_leaf=self.min_samples_leaf, + max_features=self.max_features, + bootstrap=self.bootstrap, + random_state=self.random_state) + self.mode_predictor = RandomForestClassifier( + n_estimators=self.n_estimators, + criterion=self.criterion, + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + min_samples_leaf=self.min_samples_leaf, + max_features=self.max_features, + bootstrap=self.bootstrap, + random_state=self.random_state) + self.replaced_predictor = RandomForestClassifier( + n_estimators=self.n_estimators, + criterion=self.criterion, + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + min_samples_leaf=self.min_samples_leaf, + max_features=self.max_features, + bootstrap=self.bootstrap, + random_state=self.random_state) + + +class TripGrouper(): + """ Helper class to get trip clusters from start and end clusters. + + Args: + start_cluster_col (str): name of the column containing start + cluster indices + end_cluster_col (str): name of the column containing end cluster + indices + """ + + def __init__(self, + start_cluster_col='start_cluster_idx', + end_cluster_col='end_cluster_idx'): + self.start_cluster_col = start_cluster_col + self.end_cluster_col = end_cluster_col + + def fit_transform(self, trip_df): + """ Fit and remember possible trip clusters. + + Args: + trip_df (DataFrame): DataFrame containing trips. must have + columns and + """ + trip_groups = trip_df.groupby( + [self.start_cluster_col, self.end_cluster_col]) + + # need dict so we can access the trip indices of all the trips in each + # group. the key is the group tuple and the value is the list of trip + # indices in the group. + self.trip_groups_dict = dict(trip_groups.groups) + + # we want to convert trip-group tuples to to trip-cluster indices, + # hence the pd Series + trip_groups_series = pd.Series(list(self.trip_groups_dict.keys())) + + trip_cluster_idx = np.empty(len(trip_df)) + + for group_idx in range(len(trip_groups_series)): + group_tuple = trip_groups_series[group_idx] + trip_idxs_in_group = self.trip_groups_dict[group_tuple] + trip_cluster_idx[trip_idxs_in_group] = group_idx + + return trip_cluster_idx + + def transform(self, new_trip_df): + """ Get trip clusters for a new set of trips. + + Args: + new_trip_df (DataFrame): DataFrame containing trips. must have + columns and + """ + prediction_trip_groups = new_trip_df.groupby( + [self.start_cluster_col, self.end_cluster_col]) + + # need dict so we can access the trip indices of all the trips in each + # group. the key is the group tuple and the value is the list of trip + # indices in the group. + prediction_trip_groups_dict = dict(prediction_trip_groups.groups) + trip_groups_series = pd.Series(list(self.trip_groups_dict.keys())) + trip_cluster_idx = np.empty(len(new_trip_df)) + + for group_tuple in dict(prediction_trip_groups.groups).keys(): + # check if the trip cluster exists in the training set + trip_idxs_in_group = prediction_trip_groups_dict[group_tuple] + if group_tuple in self.trip_groups_dict.keys(): + # look up the group index from the series we created when we + # fit the model + group_idx = trip_groups_series[trip_groups_series == + group_tuple].index[0] + else: + group_idx = -1 + + trip_cluster_idx[trip_idxs_in_group] = group_idx + + return trip_cluster_idx + + + +class OneHotWrapper(): + """ Helper class to streamline one-hot encoding. + + Args: + impute_missing (bool): whether or not to impute np.nan values. + sparse (bool): whether or not to return a sparse matrix. + handle_unknown (str): specifies the way unknown categories are + handled during transform. + """ + + def __init__( + self, + impute_missing=False, + sparse=False, + handle_unknown='ignore', + ): + self.impute_missing = impute_missing + if self.impute_missing: + self.encoder = make_pipeline( + SimpleImputer(missing_values=np.nan, + strategy='constant', + fill_value='missing'), + OneHotEncoder(sparse=False, handle_unknown=handle_unknown)) + else: + self.encoder = OneHotEncoder(sparse=sparse, + handle_unknown=handle_unknown) + + def fit_transform(self, train_df, output_col_prefix=None): + """ Establish one-hot encoded variables. + + Args: + train_df (DataFrame): DataFrame containing train trips. + output_col_prefix (str): only if train_df is a single column + """ + # TODO: handle pd series + + train_df = train_df.copy() # to avoid SettingWithCopyWarning + + # if imputing, the dtype of each column must be string/object and not + # numerical, otherwise the SimpleImputer will fail + if self.impute_missing: + for col in train_df.columns: + train_df[col] = train_df[col].astype(object) + onehot_encoding = self.encoder.fit_transform(train_df) + self.onehot_encoding_cols_all = [] + for col in train_df.columns: + if train_df.shape[1] > 1 or output_col_prefix is None: + output_col_prefix = col + self.onehot_encoding_cols_all += [ + f'{output_col_prefix}_{val}' + for val in np.sort(train_df[col].dropna().unique()) + ] + # we handle np.nan separately because it is of type float, and may + # cause issues with np.sort if the rest of the unique values are + # strings + if any((train_df[col].isna())): + self.onehot_encoding_cols_all += [f'{output_col_prefix}_nan'] + + onehot_encoding_df = pd.DataFrame( + onehot_encoding, + columns=self.onehot_encoding_cols_all).set_index(train_df.index) + + # ignore the encoded columns for missing entries + self.onehot_encoding_cols = copy.deepcopy(self.onehot_encoding_cols_all) + for col in self.onehot_encoding_cols_all: + if col.endswith('_nan'): + onehot_encoding_df = onehot_encoding_df.drop(columns=[col]) + self.onehot_encoding_cols.remove(col) + + return onehot_encoding_df.astype(int) + + def transform(self, test_df): + """ One-hot encoded features in accordance with features seen in the + train set. + + Args: + test_df (DataFrame): DataFrame of trips. + """ + # TODO: rename test_df, this one doesn't necessarily need to be a df + onehot_encoding = self.encoder.transform(test_df) + onehot_encoding_df = pd.DataFrame( + onehot_encoding, + columns=self.onehot_encoding_cols_all).set_index(test_df.index) + + # ignore the encoded columns for missing entries + for col in self.onehot_encoding_cols_all: + if col.endswith('_nan'): + onehot_encoding_df = onehot_encoding_df.drop(columns=[col]) + + return onehot_encoding_df.astype(int) diff --git a/emission/core/wrapper/entry.py b/emission/core/wrapper/entry.py index b4d8520f7..a11eaac8c 100644 --- a/emission/core/wrapper/entry.py +++ b/emission/core/wrapper/entry.py @@ -182,6 +182,9 @@ def create_fake_entry(user_id, key, data, write_ts, create_id=False): result_entry.user_id = user_id result_entry.metadata = ecwm.Metadata.create_metadata_for_fake_result(key, write_ts) result_entry.data = data + #necessary values required by forest model + result_entry['data']['start_local_dt']=result_entry['metadata']['write_local_dt'] + result_entry['data']['end_local_dt']=result_entry['metadata']['write_local_dt'] result_entry._populateDependencies() return result_entry diff --git a/emission/tests/modellingTests/TestForestModel.py b/emission/tests/modellingTests/TestForestModel.py index f477f1ab9..8895cb366 100644 --- a/emission/tests/modellingTests/TestForestModel.py +++ b/emission/tests/modellingTests/TestForestModel.py @@ -119,54 +119,55 @@ def tearDown(self): # logging.debug("Model predictions are consistent with previously stored predictions.") - - def test_regression(self): - """ - Regression test to ensure consistent model results. - """ - # Load the previously stored predictions (if any) - previous_predictions = self.load_previous_predictions() +## TODO : Fix regression Tests + + # def test_regression(self): + # """ + # Regression test to ensure consistent model results. + # """ + # # Load the previously stored predictions (if any) + # previous_predictions = self.load_previous_predictions() - # Run the current model to get predictions - current_predictions = self.run_current_model() - - # If there are no previous predictions, store the current predictions - if previous_predictions is None: - self.store_predictions(current_predictions) - else: - # Compare the current predictions with the previous predictions - self.assertPredictionsMatch(previous_predictions, current_predictions) - - def load_previous_predictions(self): - # Retrieve stored predictions from the database - # Using get_analysis_timeseries_db as an example, replace with the correct method if needed - db = edb.get_analysis_timeseries_db() - predictions = db.find_one({"user_id": self.user_id, "metadata.key": "predictions"}) - return predictions - - def run_current_model(self): - # Placeholder: Run the current model and get predictions - # Replace this with the actual model running code - predictions = None - return predictions - - def store_predictions(self, predictions): - # Store the predictions in the database - # Using get_analysis_timeseries_db as an example, replace with the correct method if needed - db = edb.get_analysis_timeseries_db() - entry = { - "user_id": self.user_id, - "metadata": { - "key": "predictions", - "write_ts": pd.Timestamp.now().timestamp() # Using pandas timestamp as an example - }, - "data": predictions - } - db.insert_one(entry) - - def assertPredictionsMatch(self, prev, curr): - # Placeholder: Check if the predictions match - # This will depend on the format and type of your predictions - # For example, if predictions are lists or arrays, you can use numpy - if not np.array_equal(prev, curr): - self.fail("Current model predictions do not match previously stored predictions!") + # # Run the current model to get predictions + # current_predictions = self.run_current_model() + + # # If there are no previous predictions, store the current predictions + # if previous_predictions is None: + # self.store_predictions(current_predictions) + # else: + # # Compare the current predictions with the previous predictions + # self.assertPredictionsMatch(previous_predictions, current_predictions) + + # def load_previous_predictions(self): + # # Retrieve stored predictions from the database + # # Using get_analysis_timeseries_db as an example, replace with the correct method if needed + # db = edb.get_analysis_timeseries_db() + # predictions = db.find_one({"user_id": self.user_id, "metadata.key": "predictions"}) + # return predictions + + # def run_current_model(self): + # # Placeholder: Run the current model and get predictions + # # Replace this with the actual model running code + # predictions = None + # return predictions + + # def store_predictions(self, predictions): + # # Store the predictions in the database + # # Using get_analysis_timeseries_db as an example, replace with the correct method if needed + # db = edb.get_analysis_timeseries_db() + # entry = { + # "user_id": self.user_id, + # "metadata": { + # "key": "predictions", + # "write_ts": pd.Timestamp.now().timestamp() # Using pandas timestamp as an example + # }, + # "data": predictions + # } + # db.insert_one(entry) + + # def assertPredictionsMatch(self, prev, curr): + # # Placeholder: Check if the predictions match + # # This will depend on the format and type of your predictions + # # For example, if predictions are lists or arrays, you can use numpy + # if not np.array_equal(prev, curr): + # self.fail("Current model predictions do not match previously stored predictions!") diff --git a/emission/tests/modellingTests/TestRunForestModel.py b/emission/tests/modellingTests/TestRunForestModel.py index 382ef4074..2edcc92de 100644 --- a/emission/tests/modellingTests/TestRunForestModel.py +++ b/emission/tests/modellingTests/TestRunForestModel.py @@ -140,8 +140,8 @@ def testTrainForestModelWithZeroTrips(self): "pipeline should not have a current timestamp for the test user") - def testPredictForestModelWithZeroTrips(self): - """ + def test1RoundPredictForestModel(self): + """ forest model takes config arguments via the constructor for testing purposes but will load from a file in /conf/analysis/ which is tested here """ @@ -175,13 +175,9 @@ def testPredictForestModelWithZeroTrips(self): ) logging.debug(f'(TEST) testing prediction of stored model') - test = etmm.build_mock_trip( - user_id=self.user_id, - origin=self.origin, - destination=self.destination - ) + test = esda.get_entries(key="analysis/confirmed_trip", user_id=self.user_id, time_query=None) prediction, n = eamur.predict_labels_with_n( - trip = test, + trip = test[0], model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, model_config=forest_model_config diff --git a/emission/tests/modellingTests/modellingTestAssets.py b/emission/tests/modellingTests/modellingTestAssets.py index 252b2ad34..2bb1a958e 100644 --- a/emission/tests/modellingTests/modellingTestAssets.py +++ b/emission/tests/modellingTests/modellingTestAssets.py @@ -166,7 +166,10 @@ def build_mock_trip( "type": "Point", "coordinates": destination }, - "user_input": labels + #necessary valued for random forest model + "user_input": labels, + "duration": end_ts-start_ts, + "distance": ecc.calDistance(origin,destination) } return ecwe.Entry.create_fake_entry(user_id, key, data, write_ts=time.time())