diff --git a/bin/build_label_model.py b/bin/build_label_model.py new file mode 100644 index 000000000..7ba3fe066 --- /dev/null +++ b/bin/build_label_model.py @@ -0,0 +1,67 @@ +""" +Script to build and save the labeling model. +""" +import logging + +import argparse +import uuid +import copy + +import emission.pipeline.reset as epr +import emission.core.get_database as edb +import emission.core.wrapper.user as ecwu +import emission.storage.timeseries.abstract_timeseries as esta +import emission.analysis.modelling.tour_model_first_only.build_save_model as eamtb + +def _get_user_list(args): + if args.all: + return _find_all_users() + elif args.platform: + return _find_platform_users(args.platform) + elif args.email_list: + return _email_2_user_list(args.email_list) + else: + assert args.user_list is not None + return [uuid.UUID(u) for u in args.user_list] + +def _find_platform_users(platform): + # Since all new clients register a profile with the server, we don't have + # to run a 'distinct' query over the entire contents of the timeseries. + # Instead, we can simply query from the profile users, which is + # significantly faster + # Use the commented out line instead for better performance. + # Soon, we can move to the more performant option, because there will be + # no users that don't have a profile + # return edb.get_timeseries_db().find({'metadata.platform': platform}).distinct( + # 'user_id') + return edb.get_profile_db().find({"curr_platform": platform}).distinct("user_id") + +def _find_all_users(): + return esta.TimeSeries.get_uuid_list() + +def _email_2_user_list(email_list): + return [ecwu.User.fromEmail(e).uuid for e in email_list] + +if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', + level=logging.DEBUG) + + parser = argparse.ArgumentParser() + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("-a", "--all", action="store_true", default=False, + help="build the model for all users") + group.add_argument("-p", "--platform", choices = ['android', 'ios'], + help="build the model for all on the specified platform") + group.add_argument("-u", "--user_list", nargs='+', + help="user ids to build the model for") + group.add_argument("-e", "--email_list", nargs='+', + help="email addresses to build the model for") + + args = parser.parse_args() + print(args) + + user_list = _get_user_list(args) + logging.info("received list with %s users" % user_list) + for user_id in user_list: + logging.info("building model for user %s" % user_id) + eamtb.build_user_model(user_id) diff --git a/bin/debug/label_stats.py b/bin/debug/label_stats.py new file mode 100644 index 000000000..efcebea5f --- /dev/null +++ b/bin/debug/label_stats.py @@ -0,0 +1,41 @@ +import emission.core.get_database as edb +import uuid +import argparse + + +parser = argparse.ArgumentParser(prog="intake_single_user") +group = parser.add_mutually_exclusive_group(required=True) +group.add_argument("-e", "--user_email") +group.add_argument("-u", "--user_uuid") + +args = parser.parse_args() +if args.user_uuid: + sel_uuid = uuid.UUID(args.user_uuid) +else: + sel_uuid = ecwu.User.fromEmail(args.user_email).uuid + +print("All inferred trips %s" % edb.get_analysis_timeseries_db().count_documents({"metadata.key": "analysis/inferred_trip", "user_id": sel_uuid})) + +print("Inferred trips with inferences %s" % edb.get_analysis_timeseries_db().find({"metadata.key": "analysis/inferred_trip", "user_id": sel_uuid, "data.inferred_labels": {"$ne": []}}).count()) + +print("All expected trips %s" % edb.get_analysis_timeseries_db().count_documents({"metadata.key": "analysis/expected_trip", "user_id": sel_uuid})) + +print("Expected trips with inferences %s" % edb.get_analysis_timeseries_db().find({"metadata.key": "analysis/expected_trip", "user_id": sel_uuid, "data.expectation": {"$exists": True}}).count()) + +for t in list(edb.get_analysis_timeseries_db().find({"metadata.key": "analysis/inferred_trip", "user_id": sel_uuid})): + if t["data"]["inferred_labels"] != []: + confirmed_trip = edb.get_analysis_timeseries_db().find_one({"user_id": t["user_id"], + "metadata.key": "analysis/confirmed_trip", + "data.start_ts": t["data"]["start_ts"]}) + if confirmed_trip is None: + print("No matching confirmed trip for %s" % t["data"]["start_fmt_time"]) + continue + + if confirmed_trip["data"]["user_input"] == {}: + print("Found confirmed trip with matching inferred trip, without user labels") + +print("all inferred trips %s" % (edb.get_analysis_timeseries_db().find({"metadata.key": "analysis/inferred_trip", "user_id": sel_uuid}).count())) +print("all confirmed trips %s" % (edb.get_analysis_timeseries_db().find({"metadata.key": "analysis/confirmed_trip", "user_id": sel_uuid}).count())) +print("confirmed trips with inferred labels %s" % (edb.get_analysis_timeseries_db().find({"metadata.key": "analysis/confirmed_trip", "user_id": sel_uuid, "data.inferred_labels": {"$ne": []}}).count())) +print("confirmed trips without inferred labels %s" % (edb.get_analysis_timeseries_db().find({"metadata.key": "analysis/confirmed_trip", "user_id": sel_uuid, "data.inferred_labels": []}).count())) +print("confirmed trips with expectation %s" % (edb.get_analysis_timeseries_db().find({"metadata.key": "analysis/confirmed_trip", "user_id": sel_uuid, "data.expectation": {"$exists": True}}).count())) diff --git a/bin/debug/reset_partial_label_testing.py b/bin/debug/reset_partial_label_testing.py new file mode 100644 index 000000000..1735b86b3 --- /dev/null +++ b/bin/debug/reset_partial_label_testing.py @@ -0,0 +1,29 @@ +import json +import logging +import argparse +import numpy as np +import uuid + +import emission.core.get_database as edb +import emission.storage.decorations.analysis_timeseries_queries as esda + + +parser = argparse.ArgumentParser(prog="reset_partial_label_testing") +group = parser.add_mutually_exclusive_group(required=True) +group.add_argument("-i", "--inferred", action='store_true') +group.add_argument("-c", "--confirmed", action='store_true') + +args = parser.parse_args() + +if args.inferred: + print(edb.get_analysis_timeseries_db().delete_many({"metadata.key": esda.INFERRED_TRIP_KEY}).raw_result) + print(edb.get_analysis_timeseries_db().delete_many({"metadata.key": esda.EXPECTED_TRIP_KEY}).raw_result) + print(edb.get_analysis_timeseries_db().delete_many({"metadata.key": "inference/labels"}).raw_result) + print(edb.get_analysis_timeseries_db().delete_many({"metadata.key": "analysis/inferred_labels"}).raw_result) + print(edb.get_pipeline_state_db().delete_many({"pipeline_stage": {"$in": [14,15]}}).raw_result) + +if args.confirmed: + print(edb.get_analysis_timeseries_db().delete_many({"metadata.key": esda.EXPECTED_TRIP_KEY}).raw_result) + print(edb.get_analysis_timeseries_db().delete_many({"metadata.key": esda.CONFIRMED_TRIP_KEY}).raw_result) + print(edb.get_pipeline_state_db().delete_many({"pipeline_stage": {"$in": [13]}}).raw_result) + diff --git a/emission/analysis/classification/inference/labels/ensembles.py b/emission/analysis/classification/inference/labels/ensembles.py new file mode 100644 index 000000000..3da7b0cbd --- /dev/null +++ b/emission/analysis/classification/inference/labels/ensembles.py @@ -0,0 +1,30 @@ +# This file encapsulates the various ensemble algorithms that take a trip and a list of primary predictions and return a label data structure + +import copy +import logging + +import emission.core.wrapper.labelprediction as ecwl + +# This placeholder ensemble simply returns the first prediction run +def ensemble_first_prediction(trip, predictions): + # Since this is not a real ensemble yet, we will not mark it as such + # algorithm_id = ecwl.AlgorithmTypes.ENSEMBLE + algorithm_id = ecwl.AlgorithmTypes(predictions[0]["algorithm_id"]); + prediction = copy.copy(predictions[0]["prediction"]) + return algorithm_id, prediction + +# If we get a real prediction, use it, otherwise fallback to the placeholder +def ensemble_real_and_placeholder(trip, predictions): + if predictions[0]["prediction"] != []: + sel_prediction = predictions[0] + logging.debug(f"Found real prediction {sel_prediction}, using that preferentially") + # assert sel_prediction.algorithm_id == ecwl.AlgorithmTypes.TWO_STAGE_BIN_CLUSTER + else: + sel_prediction = predictions[1] + logging.debug(f"No real prediction found, using placeholder prediction {sel_prediction}") + # Use a not equal assert since we may want to change the placeholder + assert sel_prediction.algorithm_id != ecwl.AlgorithmTypes.TWO_STAGE_BIN_CLUSTER + + algorithm_id = ecwl.AlgorithmTypes(sel_prediction["algorithm_id"]) + prediction = copy.copy(sel_prediction["prediction"]) + return algorithm_id, prediction \ No newline at end of file diff --git a/emission/analysis/classification/inference/labels/inferrers.py b/emission/analysis/classification/inference/labels/inferrers.py new file mode 100644 index 000000000..6ce4c7702 --- /dev/null +++ b/emission/analysis/classification/inference/labels/inferrers.py @@ -0,0 +1,153 @@ +# This file encapsulates the various prediction algorithms that take a trip and return a label data structure +# Named "inferrers.py" instead of "predictors.py" to avoid a name collection in our abbreviated import convention + +import logging +import random +import copy + +import emission.analysis.modelling.tour_model_first_only.load_predict as lp + +# A set of placeholder predictors to allow pipeline development without a real inference algorithm. +# For the moment, the system is configured to work with two labels, "mode_confirm" and +# "purpose_confirm", so I'll do that. + +# The first placeholder scenario represents a case where it is hard to distinguish between +# biking and walking (e.g., because the user is a very slow biker) and hard to distinguish +# between work and shopping at the grocery store (e.g., because the user works at the +# grocery store), but whenever the user bikes to the location it is to work and whenever +# the user walks to the location it is to shop (e.g., because they don't have a basket on +# their bike), and the user bikes to the location four times more than they walk there. +# Obviously, it is a simplification. +def placeholder_predictor_0(trip): + return [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} + ] + + +# The next placeholder scenario provides that same set of labels in 75% of cases and no +# labels in the rest. +def placeholder_predictor_1(trip): + return [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} + ] if random.random() > 0.25 else [] + + +# This third scenario provides labels designed to test the soundness and resilience of +# the client-side inference processing algorithms. +def placeholder_predictor_2(trip): + # Timestamp2index gives us a deterministic way to match test trips with labels + # Hardcoded to match "test_july_22" -- clearly, this is just for testing + timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0} + timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"] + index = timestamp2index[timestamp] if timestamp in timestamp2index else 0 + return [ + [ + + ], + [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} + ], + [ + {"labels": {"mode_confirm": "drove_alone"}, "p": 0.8}, + ], + [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} + ], + [ + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35}, + {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15}, + {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05} + ], + [ + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35}, + {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15}, + {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05} + ] + ][index] + + +# This fourth scenario provides labels designed to test the expectation and notification system. +def placeholder_predictor_3(trip): + timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0} + timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"] + index = timestamp2index[timestamp] if timestamp in timestamp2index else 0 + return [ + [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20} + ], + [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20} + ], + [ + {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "entertainment"}, "p": 0.70}, + ], + [ + {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.96}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.04} + ], + [ + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35}, + {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15}, + {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05} + ], + [ + {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.60}, + {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.25}, + {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.11}, + {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.04} + ] + ][index] + +# Placeholder that is suitable for a demo. +# Finds all unique label combinations for this user and picks one randomly +def placeholder_predictor_demo(trip): + import random + + import emission.core.get_database as edb + user = trip["user_id"] + unique_user_inputs = edb.get_analysis_timeseries_db().find({"user_id": user}).distinct("data.user_input") + if len(unique_user_inputs) == 0: + return [] + random_user_input = random.choice(unique_user_inputs) if random.randrange(0,10) > 0 else [] + + logging.debug(f"In placeholder_predictor_demo: found {len(unique_user_inputs)} for user {user}, returning value {random_user_input}") + return [{"labels": random_user_input, "p": random.random()}] + +# Non-placeholder implementation. First bins the trips, and then clusters every bin +# See emission.analysis.modelling.tour_model for more details +# Assumes that pre-built models are stored in working directory +# Models are built using evaluation_pipeline.py and build_save_model.py +# This algorithm is now DEPRECATED in favor of predict_cluster_confidence_discounting (see https://github.com/e-mission/e-mission-docs/issues/663) +def predict_two_stage_bin_cluster(trip): + return lp.predict_labels(trip) + +# Reduce the confidence of the clustering prediction when the number of trips in the cluster is small +# See https://github.com/e-mission/e-mission-docs/issues/663 +def n_to_confidence_coeff(n, max_confidence=None, first_confidence=None, confidence_multiplier=None): + if max_confidence is None: max_confidence = 0.99 # Confidence coefficient for n approaching infinity -- in the GitHub issue, this is 1-A + if first_confidence is None: first_confidence = 0.80 # Confidence coefficient for n = 1 -- in the issue, this is B + if confidence_multiplier is None: confidence_multiplier = 0.30 # How much of the remaining removable confidence to remove between n = k and n = k+1 -- in the issue, this is C + return max_confidence-(max_confidence-first_confidence)*(1-confidence_multiplier)**(n-1) # This is the u = ... formula in the issue + +# predict_two_stage_bin_cluster but with the above reduction in confidence +def predict_cluster_confidence_discounting(trip, max_confidence=None, first_confidence=None, confidence_multiplier=None): + labels, n = lp.predict_labels_with_n(trip) + if n <= 0: # No model data or trip didn't match a cluster + logging.debug(f"In predict_cluster_confidence_discounting: n={n}; returning as-is") + return labels + + confidence_coeff = n_to_confidence_coeff(n, max_confidence, first_confidence, confidence_multiplier) + logging.debug(f"In predict_cluster_confidence_discounting: n={n}; discounting with coefficient {confidence_coeff}") + + labels = copy.deepcopy(labels) + for l in labels: l["p"] *= confidence_coeff + return labels diff --git a/emission/analysis/classification/inference/labels/pipeline.py b/emission/analysis/classification/inference/labels/pipeline.py index b8295059a..e7bda7b2c 100644 --- a/emission/analysis/classification/inference/labels/pipeline.py +++ b/emission/analysis/classification/inference/labels/pipeline.py @@ -9,6 +9,21 @@ import emission.storage.decorations.analysis_timeseries_queries as esda import emission.core.wrapper.labelprediction as ecwl import emission.core.wrapper.entry as ecwe +import emission.analysis.classification.inference.labels.inferrers as eacili +import emission.analysis.classification.inference.labels.ensembles as eacile + + +# For each algorithm in ecwl.AlgorithmTypes that runs on a trip (e.g., not the ensemble, which +# runs on the results of other algorithms), primary_algorithms specifies a corresponding +# function in eacili to run. This makes it easy to plug in additional algorithms later. +primary_algorithms = { + ecwl.AlgorithmTypes.CONFIDENCE_DISCOUNTED_CLUSTER: eacili.predict_cluster_confidence_discounting +} + +# ensemble specifies which algorithm in eacile to run. +# This makes it easy to test various ways of combining various algorithms. +ensemble = eacile.ensemble_first_prediction + # Does all the work necessary for a given user def infer_labels(user_id): @@ -24,114 +39,6 @@ def infer_labels(user_id): logging.exception("Error while inferring labels, timestamp is unchanged") epq.mark_label_inference_failed(user_id) -# A set of placeholder predictors to allow pipeline development without a real inference algorithm. -# For the moment, the system is configured to work with two labels, "mode_confirm" and -# "purpose_confirm", so I'll do that. - -# The first placeholder scenario represents a case where it is hard to distinguish between -# biking and walking (e.g., because the user is a very slow biker) and hard to distinguish -# between work and shopping at the grocery store (e.g., because the user works at the -# grocery store), but whenever the user bikes to the location it is to work and whenever -# the user walks to the location it is to shop (e.g., because they don't have a basket on -# their bike), and the user bikes to the location four times more than they walk there. -# Obviously, it is a simplification. -def placeholder_predictor_0(trip): - return [ - {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} - ] - - -# The next placeholder scenario provides that same set of labels in 75% of cases and no -# labels in the rest. -def placeholder_predictor_1(trip): - return [ - {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} - ] if random.random() > 0.25 else [] - - -# This third scenario provides labels designed to test the soundness and resilience of -# the client-side inference processing algorithms. -def placeholder_predictor_2(trip): - # Timestamp2index gives us a deterministic way to match test trips with labels - # Hardcoded to match "test_july_22" -- clearly, this is just for testing - timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0} - timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"] - index = timestamp2index[timestamp] if timestamp in timestamp2index else 0 - return [ - [ - - ], - [ - {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} - ], - [ - {"labels": {"mode_confirm": "drove_alone"}, "p": 0.8}, - ], - [ - {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2} - ], - [ - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35}, - {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15}, - {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05} - ], - [ - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35}, - {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15}, - {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05} - ] - ][index] - - -# This fourth scenario provides labels designed to test the expectation and notification system. -def placeholder_predictor_3(trip): - timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0} - timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"] - index = timestamp2index[timestamp] if timestamp in timestamp2index else 0 - return [ - [ - {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20} - ], - [ - {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20} - ], - [ - {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "entertainment"}, "p": 0.70}, - ], - [ - {"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.96}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.04} - ], - [ - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35}, - {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15}, - {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05} - ], - [ - {"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.60}, - {"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.25}, - {"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.11}, - {"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.04} - ] - ][index] - - -# For each algorithm in ecwl.AlgorithmTypes that runs on a trip (e.g., not the ensemble, which -# runs on the results of other algorithms), primary_algorithms specifies a corresponding -# function to run. This makes it easy to plug in additional algorithms later. -primary_algorithms = { - ecwl.AlgorithmTypes.PLACEHOLDER_2: placeholder_predictor_2 -} - # Code structure based on emission.analysis.classification.inference.mode.pipeline # and emission.analysis.classification.inference.mode.rule_engine class LabelInferencePipeline: @@ -188,13 +95,8 @@ def compute_and_save_algorithms(self, trip): def compute_and_save_ensemble(self, trip, predictions): il = ecwl.Labelprediction() il.trip_id = trip.get_id() - # Since this is not a real ensemble yet, we will not mark it as such - # il.algorithm_id = ecwl.AlgorithmTypes.ENSEMBLE - il.algorithm_id = ecwl.AlgorithmTypes(predictions[0]["algorithm_id"]) il.start_ts = trip["data"]["start_ts"] il.end_ts = trip["data"]["end_ts"] - - il.prediction = copy.copy(predictions[0]["prediction"]) - + (il.algorithm_id, il.prediction) = ensemble(trip, predictions) self.ts.insert_data(self.user_id, "analysis/inferred_labels", il) return il diff --git a/emission/analysis/modelling/tour_model/build_save_model.py b/emission/analysis/modelling/tour_model/build_save_model.py new file mode 100644 index 000000000..ac1fba2e9 --- /dev/null +++ b/emission/analysis/modelling/tour_model/build_save_model.py @@ -0,0 +1,155 @@ +# Standard imports +import copy +import pandas as pd +import logging +import jsonpickle as jpickle +import sklearn.cluster as sc + +# Our imports +import emission.storage.timeseries.abstract_timeseries as esta + +import emission.analysis.modelling.tour_model.get_scores as gs +import emission.analysis.modelling.tour_model.get_users as gu +import emission.analysis.modelling.tour_model.label_processing as lp +import emission.analysis.modelling.tour_model.evaluation_pipeline as ep +import emission.analysis.modelling.tour_model.load_predict as load +import emission.analysis.modelling.tour_model.data_preprocessing as preprocess + + +def find_best_split_and_parameters(user,test_data): + # find the best score + filename = "user_"+str(user)+".csv" + df = pd.read_csv(filename, index_col='split') + scores = df['scores'].tolist() + best_split_idx = scores.index(max(scores)) + # use the position of best_score to find best_split + best_split = test_data[best_split_idx] + # use best_split_idx to find the best parameters + low = df.loc[best_split_idx, 'lower boundary'] + dist_pct = df.loc[best_split_idx, 'distance percentage'] + return best_split,best_split_idx,low,dist_pct + + +# def find_best_parameters(user,best_split_idx): +# tradeoff_filename = 'tradeoff_' + str(user) +# tradeoff_1user = load.loadModelStage(tradeoff_filename) +# best_parameters = tradeoff_1user[best_split_idx] +# return best_parameters + + +def save_models(obj_name,obj,user): + obj_capsule = jpickle.dumps(obj) + filename = obj_name + '_' + str(user) + with open(filename, "w") as fd: + fd.write(obj_capsule) + + +def main(): + all_users = esta.TimeSeries.get_uuid_list() + radius = 100 + for a in range(len(all_users)): + user = all_users[a] + trips = preprocess.read_data(user) + filter_trips = preprocess.filter_data(trips, radius) + # filter out users that don't have enough valid labeled trips + if not gu.valid_user(filter_trips, trips): + logging.debug("This user doesn't have enough valid trips for further analysis.") + continue + tune_idx, test_idx = preprocess.split_data(filter_trips) + test_data = preprocess.get_subdata(filter_trips, tune_idx) + + # find the best split and parameters, and use them to build the model + best_split, best_split_idx, low, dist_pct = find_best_split_and_parameters(user,test_data) + + # run the first round of clustering + sim, bins, bin_trips, filter_trips = ep.first_round(best_split, radius) + # It is possible that the user doesn't have common trips. Here we only build models for user that has common trips. + if len(bins) is not 0: + gs.compare_trip_orders(bins, bin_trips, filter_trips) + first_labels = ep.get_first_label(bins) + first_label_set = list(set(first_labels)) + + # second round of clustering + model_coll = {} + bin_loc_feat = {} + fitst_round_labels = {} + for fl in first_label_set: + # store second round trips data + second_round_trips = [] + for index, first_label in enumerate(first_labels): + if first_label == fl: + second_round_trips.append(bin_trips[index]) + x = preprocess.extract_features(second_round_trips) + # collect location features of the bin from the first round of clustering + # feat[0:4] are start/end coordinates + bin_loc_feat[str(fl)] = [feat[0:4] for feat in x] + # here we pass in features(x) from selected second round trips to build the model + method = 'single' + clusters = lp.get_second_labels(x, method, low, dist_pct) + n_clusters = len(set(clusters)) + # build the model + kmeans = sc.KMeans(n_clusters=n_clusters, random_state=0).fit(x) + # collect all models, the key is the label from the 1st found + # e.g.{'0': KMeans(n_clusters=2, random_state=0)} + model_coll[str(fl)] = kmeans + # get labels from the 2nd round of clustering + second_labels = kmeans.labels_ + + first_label_obj = [] + + # save user labels for every cluster + second_label_set = list(set(second_labels)) + sec_round_labels = {} + for sl in second_label_set: + sec_sel_trips = [] + sec_label_obj = [] + for idx, second_label in enumerate(second_labels): + if second_label == sl: + sec_sel_trips.append(second_round_trips[idx]) + user_label_df = pd.DataFrame([trip['data']['user_input'] for trip in sec_sel_trips]) + user_label_df = lp.map_labels(user_label_df) + # compute the sum of trips in this cluster + sum_trips = len(user_label_df) + # compute unique label sets and their probabilities in one cluster + # 'p' refers to probability + unique_labels = user_label_df.groupby(user_label_df.columns.tolist()).size().reset_index(name='uniqcount') + unique_labels['p'] = unique_labels.uniqcount / sum_trips + labels_columns = user_label_df.columns.to_list() + for i in range(len(unique_labels)): + one_set_labels = {} + # e.g. labels_only={'mode_confirm': 'pilot_ebike', 'purpose_confirm': 'work', 'replaced_mode': 'walk'} + labels_only = {column: unique_labels.iloc[i][column] for column in labels_columns} + one_set_labels["labels"] = labels_only + one_set_labels['p'] = unique_labels.iloc[i]['p'] + # e.g. one_set_labels = {'labels': {'mode_confirm': 'walk', 'replaced_mode': 'walk', 'purpose_confirm': 'exercise'}, 'p': 1.0} + # in case append() method changes the dict, we use deepcopy here + labels_set = copy.deepcopy(one_set_labels) + sec_label_obj.append(labels_set) + + # put user labels from the 2nd round into a dict, the key is the label from the 2nd round of clustering + #e.g. {'0': [{'labels': {'mode_confirm': 'bus', 'replaced_mode': 'bus', 'purpose_confirm': 'home'}, 'p': 1.0}]} + sec_round_labels[str(sl)] = sec_label_obj + sec_round_collect = copy.deepcopy(sec_round_labels) + # collect all user labels from the 2nd round, the key is to the label from the 1st round + # e.g. fitst_round_labels = {'0': [{'0': [{'labels': {'mode_confirm': 'drove_alone', 'purpose_confirm': 'work', 'replaced_mode': 'drove_alone'}, 'p': 1.0}]}]} + first_label_obj.append(sec_round_collect) + fitst_round_labels[str(fl)] = first_label_obj + # wrap up all labels + # e.g. all_labels = [{'first_label': [{'second_label': [{'labels': {'mode_confirm': 'shared_ride', + # 'purpose_confirm': 'home', 'replaced_mode': 'drove_alone'}, 'p': 1.0}]}]}] + all_labels = [fitst_round_labels] + + # save all user labels + save_models('user_labels',all_labels,user) + + # save models from the 2nd round of clustering + save_models('models',[model_coll],user) + + # save location features of all bins + save_models('locations',[bin_loc_feat],user) + + +if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', + level=logging.DEBUG) + main() diff --git a/emission/analysis/modelling/tour_model/data_preprocessing.py b/emission/analysis/modelling/tour_model/data_preprocessing.py new file mode 100644 index 000000000..23100c544 --- /dev/null +++ b/emission/analysis/modelling/tour_model/data_preprocessing.py @@ -0,0 +1,58 @@ +import emission.storage.decorations.analysis_timeseries_queries as esda +import emission.analysis.modelling.tour_model.cluster_pipeline as pipeline +import emission.analysis.modelling.tour_model.similarity as similarity +import pandas as pd +from sklearn.model_selection import KFold + + +# read data that have user labels +def read_data(user): + trips = pipeline.read_data(uuid=user, key=esda.CONFIRMED_TRIP_KEY) + return trips + + +# - trips: all trips read from database +# - filter_trips: valid trips that have user labels and are not points +def filter_data(trips,radius): + non_empty_trips = [t for t in trips if t["data"]["user_input"] != {}] + non_empty_trips_df = pd.DataFrame(t["data"]["user_input"] for t in non_empty_trips) + valid_trips_df = non_empty_trips_df.dropna(axis=0, how='any', thresh=None, subset=None, inplace=False) + valid_trips_idx_ls = valid_trips_df.index.tolist() + valid_trips = [non_empty_trips[i]for i in valid_trips_idx_ls] + + # similarity codes can filter out trips that are points in valid_trips + filter_trips = similarity.filter_too_short(valid_trips, radius) + return filter_trips + + +# use KFold (n_splits=5) to split the data into 5 models (5 training sets, 5 test sets) +def extract_features(filter_trips): + X = [] + for trip in filter_trips: + start = trip.data.start_loc["coordinates"] + end = trip.data.end_loc["coordinates"] + distance = trip.data.distance + duration = trip.data.duration + X.append([start[0], start[1], end[0], end[1], distance, duration]) + return X + +def split_data(filter_trips): + X = extract_features(filter_trips) + kf = KFold(n_splits=5, shuffle=True, random_state=3) + train_idx = [] + test_idx = [] + for train_index, test_index in kf.split(X): + train_idx.append(train_index) + test_idx.append(test_index) + return train_idx, test_idx + + +# collect a set of data(training/test set) after splitting +def get_subdata(filter_trips,train_test_set): + collect_sub_data = [] + for train_test_subset in train_test_set: + sub_data = [] + for idx in train_test_subset: + sub_data.append(filter_trips[idx]) + collect_sub_data.append(sub_data) + return collect_sub_data diff --git a/emission/analysis/modelling/tour_model/evaluation_pipeline.py b/emission/analysis/modelling/tour_model/evaluation_pipeline.py new file mode 100644 index 000000000..2dff88aba --- /dev/null +++ b/emission/analysis/modelling/tour_model/evaluation_pipeline.py @@ -0,0 +1,203 @@ +# Standard imports +import numpy as np +import pandas as pd +import jsonpickle as jpickle +import logging + +# Our imports +import emission.storage.timeseries.abstract_timeseries as esta + +import emission.analysis.modelling.tour_model.similarity as similarity +import emission.analysis.modelling.tour_model.get_request_percentage as grp +import emission.analysis.modelling.tour_model.get_scores as gs +import emission.analysis.modelling.tour_model.label_processing as lp +import emission.analysis.modelling.tour_model.data_preprocessing as preprocess +import emission.analysis.modelling.tour_model.second_round_of_clustering as sr +import emission.analysis.modelling.tour_model.get_users as gu + +def second_round(bin_trips,filter_trips,first_labels,track,low,dist_pct,sim,kmeans): + sec = sr.SecondRoundOfClustering(bin_trips,first_labels) + first_label_set = list(set(first_labels)) + for l in first_label_set: + sec.get_sel_features_and_trips(first_labels,l) + sec.hierarcial_clustering(low, dist_pct) + if kmeans: + sec.kmeans_clustering() + new_labels = sec.get_new_labels(first_labels) + track = sec.get_new_track(track) + # get request percentage for the subset for the second round + percentage_second = grp.get_req_pct(new_labels, track, filter_trips, sim) + # get homogeneity score for the second round + homo_second = gs.score(bin_trips, new_labels) + return percentage_second,homo_second + + +# we use functions in similarity to build the first round of clustering +def first_round(data,radius): + sim = similarity.similarity(data, radius) + filter_trips = sim.data + sim.bin_data() + sim.delete_bins() + bins = sim.bins + bin_trips = sim.newdata + return sim, bins, bin_trips, filter_trips + + +def get_first_label(bins): + # get first round labels + # the labels from the first round are the indices of bins + # e.g. in bin 0 [trip1, trip2, trip3], the labels of this bin is [0,0,0] + first_labels = [] + for b in range(len(bins)): + for trip in bins[b]: + first_labels.append(b) + return first_labels + + +def get_track(bins, first_labels): + # create a list idx_labels_track to store indices and labels + # the indices of the items will be the same in the new label list after the second round clustering + # item[0] is the original index of the trip in filter_trips + # item[1] is the label from the first round of clustering + idx_labels_track = [] + for bin in bins: + for ori_idx in bin: + idx_labels_track.append([ori_idx]) + # store first round labels in idx_labels_track list + for i in range(len(first_labels)): + idx_labels_track[i].append(first_labels[i]) + + return idx_labels_track + + +def get_first_label_and_track(bins,bin_trips,filter_trips): + gs.compare_trip_orders(bins, bin_trips, filter_trips) + first_labels = get_first_label(bins) + track = get_track(bins, first_labels) + return first_labels,track + + +def tune(data,radius,kmeans): + sim, bins, bin_trips, filter_trips = first_round(data, radius) + # it is possible that we don't have common trips for tuning or testing + # bins contain common trips indices + if len(bins) is not 0: + first_labels, track = get_first_label_and_track(bins,bin_trips,filter_trips) + # collect tuning scores and parameters + tune_score = {} + for dist_pct in np.arange(0.15, 0.6, 0.02): + for low in range(250, 600): + + percentage_second, homo_second = second_round(bin_trips,filter_trips,first_labels,track,low,dist_pct, + sim,kmeans) + + curr_score = gs.get_score(homo_second, percentage_second) + if curr_score not in tune_score: + tune_score[curr_score] = (low, dist_pct) + + best_score = max(tune_score) + sel_tradeoffs = tune_score[best_score] + low = sel_tradeoffs[0] + dist_pct = sel_tradeoffs[1] + else: + low = 0 + dist_pct = 0 + + return low,dist_pct + + +def test(data,radius,low,dist_pct,kmeans): + sim, bins, bin_trips, filter_trips = first_round(data, radius) + # it is possible that we don't have common trips for tuning or testing + # bins contain common trips indices + if len(bins) is not 0: + first_labels, track = get_first_label_and_track(bins,bin_trips,filter_trips) + # new_labels temporary stores the labels from the first round, but later the labels in new_labels will be + # updated with the labels after two rounds of clustering. + new_labels = first_labels.copy() + # get request percentage for the subset for the first round + percentage_first = grp.get_req_pct(new_labels, track, filter_trips, sim) + # get homogeneity score for the subset for the first round + homo_first = gs.score(bin_trips, first_labels) + percentage_second, homo_second = second_round(bin_trips, filter_trips, first_labels, track, low, dist_pct, + sim, kmeans) + else: + percentage_first = 1 + homo_first = 1 + percentage_second = 1 + homo_second = 1 + scores = gs.get_score(homo_second, percentage_second) + return homo_first,percentage_first,homo_second,percentage_second,scores + + +def main(all_users): + radius = 100 + all_filename = [] + for a, user in enumerate(all_users): + logging.info(f"Starting evaluation for {user}") + df = pd.DataFrame(columns=['user','user_id','percentage of 1st round','homogeneity socre of 1st round', + 'percentage of 2nd round','homogeneity socre of 2nd roun','scores','lower boundary', + 'distance percentage']) + logging.info(f"At stage: Reading data") + trips = preprocess.read_data(user) + logging.info(f"At stage: Filtering data") + filter_trips = preprocess.filter_data(trips, radius) + # filter out users that don't have enough valid labeled trips + if not gu.valid_user(filter_trips, trips): + logging.warn(f"User {user} is invalid, early return") + continue + logging.info(f"At stage: Splitting data") + tune_idx, test_idx = preprocess.split_data(filter_trips) + # choose tuning/test set to run the model + # this step will use KFold (5 splits) to split the data into different subsets + # - tune: tuning set + # - test: test set + # Here we user a bigger part of the data for testing and a smaller part for tuning + tune_data = preprocess.get_subdata(filter_trips, test_idx) + test_data = preprocess.get_subdata(filter_trips, tune_idx) + + # tune data + for i, curr_tune in enumerate(tune_data): + logging.info(f"At stage: starting tuning for stage {i}") + # for tuning, we don't add kmeans for re-clustering. We just need to get tuning parameters + # - low: the lower boundary of the dendrogram. If the final distance of the dendrogram is lower than "low", + # this bin no need to be re-clutered. + # - dist_pct: the higher boundary of the dendrogram. If the final distance is higher than "low", + # the cutoff of the dendrogram is (the final distance of the dendrogram * dist_pct) + low, dist_pct = tune(curr_tune, radius, kmeans=False) + df.loc[i,'lower boundary']=low + df.loc[i,'distance percentage']=dist_pct + + # testing + for i, curr_test in enumerate(test_data): + logging.info(f"At stage: starting testing for stage {i}") + low = df.loc[i,'lower boundary'] + dist_pct = df.loc[i,'distance percentage'] + + # for testing, we add kmeans to re-build the model + homo_first, percentage_first, homo_second, percentage_second, scores = test(curr_test,radius,low, + dist_pct,kmeans=True) + df.loc[i, 'percentage of 1st round'] = percentage_first + df.loc[i, 'homogeneity socre of 1st round'] = homo_first + df.loc[i, 'percentage of 2nd round'] = percentage_second + df.loc[i, 'homogeneity socre of 2nd round'] = homo_second + df.loc[i, 'scores'] = scores + df['user_id'] = user + df['user']='user'+str(a+1) + + logging.info(f"At stage: parameter selection outputs complete") + filename = "user_" + str(user) + ".csv" + all_filename.append(filename) + df.to_csv(filename, index=True, index_label='split') + + # collect filename in a file, use it to plot the scatter + collect_filename = jpickle.dumps(all_filename) + with open("collect_filename", "w") as fd: + fd.write(collect_filename) + + +if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', + level=logging.DEBUG) + all_users = esta.TimeSeries.get_uuid_list() + main(all_users) diff --git a/emission/analysis/modelling/tour_model/get_plot.py b/emission/analysis/modelling/tour_model/get_plot.py new file mode 100644 index 000000000..30039ab3b --- /dev/null +++ b/emission/analysis/modelling/tour_model/get_plot.py @@ -0,0 +1,76 @@ +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +from matplotlib import cm +import folium +import branca.colormap as clm +import load_predict as predict + +pd.set_option('display.max_columns', None) +pd.set_option('display.max_rows', None) + +def get_scatter(valid_users,all_filename,first_round,second_round): + sc = [] + cmp = cm.get_cmap('Dark2', len(valid_users)) + for i in range(len(valid_users)): + df = pd.read_csv(all_filename[i], index_col='split') + color = cmp.colors[i] + plot_scatter(df, sc, color, first_round, second_round) + plt.legend(sc, valid_users, markerscale=0.8, scatterpoints=1, bbox_to_anchor=(1.23, 1)) + plt.xlabel('user input request percentage',fontsize=16) + plt.ylabel('homogeneity score',fontsize=16) + plt.xticks(np.arange(0.4,1.1,step=0.1),fontsize=14) + plt.yticks(np.arange(0.2,1.1,step=0.1),fontsize=14) + plt.show() + + +def plot_scatter(result_df,sc,color,first_round,second_round): + if first_round: + x = result_df['percentage of 1st round'] + y = result_df['homogeneity socre of 1st round'] + elif second_round: + x = result_df['percentage of 2nd round'] + y = result_df['homogeneity socre of 2nd round'] + point = plt.scatter(x, y, color=color, s=70, alpha=0.7) + sc.append(point) + + +def same_cluster_map(cluster,filter_trips,bins): + color_map = clm.linear.Set1_07.to_step(len(bins), index=[i for i in range(len(bins)+1)]) + first_trip = filter_trips[cluster[0]] + map = folium.Map(location=[first_trip.data.start_loc["coordinates"][1], first_trip.data.start_loc["coordinates"][0]], + zoom_start=12, max_zoom=30, control_scale=True) + + zoom_points = [] + for curr_trip_index in cluster: + for i in range(len(bins)): + curr_trip = filter_trips[curr_trip_index] + if curr_trip_index in bins[i]: + # We need polyline to plot the trip according to end_loc + # We are more interested in the end points of particular trips(e.g. home) than the start points + # Flip indices because points are in geojson (i.e. lon, lat),folium takes [lat,lon] + end_points = folium.CircleMarker([curr_trip.data.end_loc["coordinates"][1], curr_trip.data.end_loc["coordinates"][0]], + radius=3,color='red',fill=True,fill_color='red',fill_opacity=1) + end_points.add_to(map) + zoom_points.append([curr_trip.data.start_loc["coordinates"][1], + curr_trip.data.start_loc["coordinates"][0]]) + zoom_points.append([curr_trip.data.end_loc["coordinates"][1], + curr_trip.data.end_loc["coordinates"][0]]) + df = pd.DataFrame(zoom_points, columns=['Lat', 'Long']) + sw = df[['Lat', 'Long']].min().values.tolist() + ne = df[['Lat', 'Long']].max().values.tolist() + map.fit_bounds([sw, ne]) + + return map + +if __name__ == '__main__': + # modify valid_users list for the user results you have saved + valid_users=['user1', 'user3', 'user4', 'user6', 'user8', 'user9', 'user10', 'user13'] + # - collect_filename: filenames of all results from 2 rounds of clustering + collect_filename = predict.loadModelStage("collect_filename") + # scatter plot from the result of the 1st round + plt.figure() + get_scatter(valid_users, collect_filename, first_round=True, second_round=False) + # scatter plot from the result of the 2nd round + plt.figure() + get_scatter(valid_users, collect_filename, first_round=False, second_round=True) diff --git a/emission/analysis/modelling/tour_model/get_request_percentage.py b/emission/analysis/modelling/tour_model/get_request_percentage.py new file mode 100644 index 000000000..7e5f8639b --- /dev/null +++ b/emission/analysis/modelling/tour_model/get_request_percentage.py @@ -0,0 +1,135 @@ +import label_processing as label_pro +import copy +import itertools + + +# This function is to compare a trip with a group of trips to see if they happened in a same day +def match_day(trip,bin,filter_trips): + if bin: + t = filter_trips[bin[0]] + if trip['data']['start_local_dt']['year']==t['data']['start_local_dt']['year']\ + and trip['data']['start_local_dt']['month']==t['data']['start_local_dt']['month']\ + and trip['data']['start_local_dt']['day']==t['data']['start_local_dt']['day']: + return True + return False + + +# This function is to compare a trip with a group of trips to see if they happened in a same month +def match_month(trip,bin,filter_trips): + if bin: + t = filter_trips[bin[0]] + if trip['data']['start_local_dt']['year']==t['data']['start_local_dt']['year']\ + and trip['data']['start_local_dt']['month']==t['data']['start_local_dt']['month']: + return True + return False + + +# This function bins trips according to ['start_local_dt'] +def bin_date(trip_ls,filter_trips,day=None,month=None): + bin_date = [] + for trip_index in trip_ls: + added = False + trip = filter_trips[trip_index] + + for bin in bin_date: + if day: + if match_day(trip,bin,filter_trips): + bin.append(trip_index) + added = True + break + if month: + if match_month(trip,bin,filter_trips): + bin.append(trip_index) + added = True + break + + if not added: + bin_date.append([trip_index]) + + return bin_date + + +def find_first_trip(filter_trips,bin): + trip_ts = [filter_trips[trip_idx]['data']["start_ts"] for trip_idx in bin] + # - early_idx_in_bin: the earliest trip position in the bin + # ts = [20,10,40,5,100] + # early_idx_in_bin = 3 + # early trip_index = 5 + early_idx_in_bin = trip_ts.index(min(trip_ts)) + # - early_trip_index: the original index of the earliest trip + early_trip_index = bin[early_idx_in_bin] + return early_trip_index + + +# collect requested trips and common trips(no need to request) indices above cutoff +def requested_trips_ab_cutoff(new_bins, filter_trips): + # collect requested trip indices above cutoff + ab_trip_ls = [] + # collect common trip indices above cutoff + no_req_trip_ls = [] + for bin in new_bins: + early_trip_index = find_first_trip(filter_trips, bin) + ab_trip_ls.append(early_trip_index) + + # The following loop collects the original indices of the rest of the trips in the bin. Since they are not the + # earliest one, we don't need to request for user labels + # >>> x = [100,200,300] + # >>> x.remove(100); x + # [200, 300] + no_req_trip_subls = copy.copy(bin) + no_req_trip_subls.remove(early_trip_index) + # >>> x = [1,2,3] + # >>> x.extend([4,5,6]); x + # [1, 2, 3, 4, 5, 6] + no_req_trip_ls.extend(no_req_trip_subls) + return ab_trip_ls, no_req_trip_ls + + +# collect requested trips indices below cutoff +def requested_trips_bl_cutoff(sim): + # bins below cutoff + bl_bins = sim.below_cutoff + + # collect requested trips indices below cutoff + # effectively, bl_trip_ls = flatten(bl_bins) + # >>> bl_bins = [[1,2],[3,4],[5,6]] + # >>> bl_trip_ls = [item for sublist in bl_bins for item in sublist] + # >>> bl_trip_ls + # [1, 2, 3, 4, 5, 6] + # the reason for flattening: we need to have a whole flatten list of requested trips, then compute the percentage + bl_trip_ls = [item for sublist in bl_bins for item in sublist] + return bl_trip_ls + + +# a list of all requested trips indices +# - filter_trips: we need to use timestamp in filter_trips here, +# in requested_trips_ab_cutoff, we need to get the first trip of the bin, +# and we need to collect original trip indices from filter_trips +# - sim: we need to use code in similarity to find trips below cutoff +# Since the indices from similarity code are original (trips below cutoff), +# we need to have original indices of all requested trips, +# so we use filter_trips for finding the requested common trips +# new_bins: bins that have original indices of similar trips. They only represent common trips +def get_requested_trips(new_bins,filter_trips,sim): + ab_trip_ls,no_req_trip_ls = requested_trips_ab_cutoff(new_bins,filter_trips) + bl_trip_ls = requested_trips_bl_cutoff(sim) + req_trips_ls = ab_trip_ls+bl_trip_ls + return req_trips_ls + + +# get request percentage based on the number of requested trips and the total number of trips +def get_req_pct(new_labels,track,filter_trips,sim): + # - new_bins: bins with original indices of similar trips from common trips + # - new_label: For the first round, new_label is the copy of the first round labels, e.g. [1,1,1,2,2,2]. + # For the second round, new_label is that the first round label concatenate the second round label. + # e.g.the label from the second round is [1,2,1,2,3,3], new_label will turn to [11,12,11,22,23,23] + # - track: at this point, each item in the track contains the original index of a trip, + # and the latest label of it. e.g. [ori_idx, latest_label] + # concretely, please look at "group_similar_trips" function in label_processing.py + # If new_label is [11,12,11,22,23,23] and the original indices of the trips is [1,2,3,4,5,6], + # new_bins will be [[1,3],[2],[4],[5,6]] + new_bins = label_pro.group_similar_trips(new_labels,track) + req_trips = get_requested_trips(new_bins,filter_trips,sim) + pct = len(req_trips)/len(filter_trips) + pct = float('%.3f' % pct) + return pct diff --git a/emission/analysis/modelling/tour_model/get_scores.py b/emission/analysis/modelling/tour_model/get_scores.py new file mode 100644 index 000000000..30e969392 --- /dev/null +++ b/emission/analysis/modelling/tour_model/get_scores.py @@ -0,0 +1,85 @@ +import logging +import pandas as pd +import pandas.testing as pdt +import label_processing as label_pro +import sklearn.metrics as skm +import itertools + + +# compare the trip orders in bin_trips with those in filter_trips above cutoff +def compare_trip_orders(bins,bin_trips,filter_trips): + bin_trips_ts = pd.DataFrame(data=[trip["data"]["start_ts"] for trip in bin_trips]) + bin_ls = list(itertools.chain(*bins)) + bins_ts = pd.DataFrame(data=[filter_trips[i]["data"]["start_ts"] for i in bin_ls]) + # compare two data frames, the program will continue to score calculation if two data frames are the same + pdt.assert_frame_equal(bins_ts, bin_trips_ts) + + +# This function is to get homogeneity score after the first/second round of clustering +# It is based on bin_trips, which are common trips. bin_trips are collected according to the indices of the trips +# in bins above cutoff +# More info about bin_trips is in similarity.py (delete_bins) +# The homogeneity score reflects the degree to which a cluster consists only of trips with similar ground truthed labels. +# In the following examples, "A","B","C" are user labels. +# The labels can be drawn from different sets as long as the mapping is unique (e.g. ["A", "A", "C"] matches perfectly +# with [0,0,1]). +# Ideally, there would be 1:1 mapping between labels and clusters - e.g. ["A", "A", "A"] maps to [1,1,1] +# This can break in two ways: +# user label A maps to different clusters - e.g. ["A", "A", "A"] maps to [1,2,3]. +# In this case, the homogeneity score will still be 1.0, since each cluster only has label "A". +# For our problem, this would typically map to the use case where trips with same user labels are actually to different +# destinations. For `medical` or `personal` locations, for example, users could actually go to multiple medical +# facilities or friends' houses. In this case, the trips will be in different clusters, but since the destinations are in +# fact different, this would actually be the correct behavior. +# The trips could also be to the same location, but be clustered differently due to minor variations in duration or +# distance (maybe due to traffic conditions). This could result in multiple clusters for what is essentially the same +# trip. We capture this difference through the request percentage metric, which will result in three queries for +# [1,2,3] and only one for [1,1,1] +# two different labels map to the same cluster - e.g. ["A", "A", "B"] maps to [1,1,1]. This is the case captured by the +# homogeneity score, which will be less than 1.0 (0 representes inhomogeneous, 1.0 represents homogeneous). +# This maps well to our use case because in this case, assigning the same label to all trips in the cluster would +# be incorrect. In particular, if we did not have the ground truth, the third trip would be labeled "A", +# which would lower the accuracy. +# At this point, we didn't make user_input have same labels for labels_true and labels_pred. +# For example, in the second round, user labels are [("home", "ebike", "bus"),("home", "walk", "bus"), +# ("home", "ebike", "bus")], the labels_pred can be [0,1,0], or [1,0,1] or represented by other numeric labels. +def score(bin_trips, labels_pred): + logging.debug(f"Calculating score for {len(bin_trips)} and {len(labels_pred)}") + bin_trips_user_input_df = pd.DataFrame(data=[trip["data"]["user_input"] for trip in bin_trips]) + bin_trips_user_input_df = label_pro.map_labels(bin_trips_user_input_df) + + # turn all user_input into list without binning + bin_trips_user_input_ls = bin_trips_user_input_df.values.tolist() + # drop duplicate user_input + no_dup_df = bin_trips_user_input_df.drop_duplicates() + # turn non-duplicate user_input into list + no_dup_list = no_dup_df.values.tolist() + + # collect labels_true based on user_input + # To compute labels_true, we need to find out non-duplicate user labels, and use the index of the unique user label + # to label the whole trips + # If user labels are [(purpose, confirmed_mode, replaced_mode)] + # e.g.,[("home","ebike","bus"),("work","walk","bike"),("home","ebike","bus"),("home","ebike","bus"), + # ("work","walk","bike"),("exercise","ebike","walk")], + # the unique label list is [0,1,2], labels_true will be [0,1,0,0,1,2] + # labels_pred is the flattened list of labels of all common trips, e.g.[1,1,11,12,13,22,23] + labels_true = [] + for userinput_dict in bin_trips_user_input_ls: + if userinput_dict in no_dup_list: + labels_true.append(no_dup_list.index(userinput_dict)) + + labels_pred = labels_pred + homo_score = skm.homogeneity_score(labels_true, labels_pred) + homo_score = float('%.3f' % homo_score) + return homo_score + + +# This function compute a score for every model. +# It is used for tuning and finding the best model after two rounds of clustering +# - homo_second: the homogeneity score after the second round of clustering +# - percentage_second: the user labels request percentage +def get_score(homo_second,percentage_second): + curr_score = 0.5 * homo_second + 0.5 * (1 - percentage_second) + curr_score = float('%.3f' % curr_score) + return curr_score + diff --git a/emission/analysis/modelling/tour_model/get_users.py b/emission/analysis/modelling/tour_model/get_users.py new file mode 100644 index 000000000..fc540b4aa --- /dev/null +++ b/emission/analysis/modelling/tour_model/get_users.py @@ -0,0 +1,31 @@ +import emission.analysis.modelling.tour_model.data_preprocessing as preprocess + + +# to determine if the user is valid: +# valid user should have >= 10 trips for further analysis and the proportion of filter_trips is >=50% +def valid_user(filter_trips,trips): + valid = False + if len(filter_trips) >= 10 and len(filter_trips) / len(trips) >= 0.5: + valid = True + return valid + + +# - user_ls: a list of strings representing short user names, such as [user1, user2, user3...] +# - valid_user_ls: a subset of `user_ls` for valid users, so also string representation of user names +# - all_users: a collection of all user ids, in terms of user id objects +def get_user_ls(all_users,radius): + user_ls = [] + valid_user_ls = [] + for i in range(len(all_users)): + curr_user = 'user' + str(i + 1) + user = all_users[i] + trips = preprocess.read_data(user) + filter_trips = preprocess.filter_data(trips,radius) + if valid_user(filter_trips,trips): + valid_user_ls.append(curr_user) + user_ls.append(curr_user) + else: + user_ls.append(curr_user) + continue + return user_ls,valid_user_ls + diff --git a/emission/analysis/modelling/tour_model/label_processing.py b/emission/analysis/modelling/tour_model/label_processing.py new file mode 100644 index 000000000..e69707305 --- /dev/null +++ b/emission/analysis/modelling/tour_model/label_processing.py @@ -0,0 +1,148 @@ +import logging +import scipy.cluster.hierarchy as sch +import sklearn.cluster as sc + +# to map the user labels +# - user_input_df: pass in original user input dataframe, return changed user input dataframe +# - sp2en: change Spanish to English +def map_labels_sp2en(user_input_df): + # Spanish words to English + span_eng_dict = {'revisado_bike': 'test ride with bike', 'placas_de carro': 'car plates', 'aseguranza': 'insurance', + 'iglesia': 'church', 'curso': 'course', + 'mi_hija recién aliviada': 'my daughter just had a new baby', + 'servicio_comunitario': 'community service', 'pago_de aseguranza': 'insurance payment', + 'grupo_comunitario': 'community group', 'caminata_comunitaria': 'community walk'} + + # change language + user_input_df = user_input_df.replace(span_eng_dict) + return user_input_df + + +# to map purposes and replaced mode in user inputs +# - cvt_pur_mo: convert purposes and replaced mode +def map_labels_purpose(user_input_df): + # Convert purpose + map_pur_dict = {'course': 'school', 'work_- lunch break': 'lunch_break', 'on_the way home': 'home', + 'insurance_payment': 'insurance'} + + # convert purpose + user_input_df = user_input_df.replace(map_pur_dict) + return user_input_df + + +def map_labels_mode(user_input_df): + # convert mode + if "replaced_mode" in user_input_df.columns: + same_mode_df = user_input_df[user_input_df.replaced_mode == "same_mode"] + logging.debug("The following rows will be changed %s" % + same_mode_df.index) + for a in range(len(user_input_df)): + if user_input_df.iloc[a]["replaced_mode"] == "same_mode": + # to see which row will be converted + # logging.debug("The following rows will be changed: %s", user_input_df.iloc[a]) + user_input_df.iloc[a]["replaced_mode"] = user_input_df.iloc[a]['mode_confirm'] + logging.debug("Finished changing all rows") + else: + logging.info("map_labels_mode: no replaced mode column found, early return") + return user_input_df + + +# this function will change Spanish to English, convert purposes, and convert modes +def map_labels(user_input_df): + # Note that the spanish -> english conversion MUST currently happen before the other + # mode and purpose mappings + user_input_df = map_labels_sp2en(user_input_df) + user_input_df = map_labels_purpose(user_input_df) + user_input_df = map_labels_mode(user_input_df) + return user_input_df + +# use hierarchical clustering to get labels of the second round +# - sch.linkage: perform hierarchical(agglomerative) clustering +# In this function, we set a low bound and a higher bound(cutoff) of distance in the dendrogram +# - last_d: the distance of the last cluster in the dendrogram +# - low: the lower bound of distance +# e.g., if low = 300, last_d = 250, we will assign 0s as labels for the points, irrespective of the first round labels. +# and the list of second round labels will be like [0,0,0,0,0]. +# It means the points are already similar to each other after the first round of clustering, they don't need to +# go through the second round. +# - max_d: the cutoff of distance +# - dist_pct: the percentage of the last distance in the dendrogram +# - sch.fcluster: form clusters from the hierarchical clustering defined by the given linkage matrix +# e.g., if last_d = 10000, dist_pct = 0.4, max_d = 400, clusters will be assigned at the distance of 400 +# by default, using scipy hierarchical clustering +def get_second_labels(x,method,low,dist_pct): + z = sch.linkage(x, method=method, metric='euclidean') + last_d = z[-1][2] + clusters = [] + if last_d < low: + for i in range(len(x)): + clusters.append(0) + else: + max_d = last_d * dist_pct + clusters = sch.fcluster(z, max_d, criterion='distance') + return clusters + +# using kmeans to build the model +def kmeans_clusters(clusters,x): + n_clusters = len(set(clusters)) + kmeans = sc.KMeans(n_clusters=n_clusters, random_state=0).fit(x) + k_clusters = kmeans.labels_ + return k_clusters + + +# this function includes hierarchical clustering and changing labels from the first round to get appropriate labels for +# the second round of clustering +# appropriate labels are label from the first round concatenate label from the second round +# (e.g. label from first round is 1, label from second round is 2, the new label will be 12) +# - second_round_idx_labels: a list to store the indices and labels from the first round. +# - second_labels: labels from the second round of clustering +def get_new_labels(second_labels,second_round_idx_labels,new_labels): + for i in range(len(second_labels)): + first_index = second_round_idx_labels[i][0] + new_label = second_round_idx_labels[i][1] + # concatenate labels from two rounds + new_label = int(str(new_label) + str(second_labels[i])) + for k in range(len(new_labels)): + if k == first_index: + new_labels[k] = new_label + break + return new_labels + + +# group similar trips according to new_labels, store the original indices of the trips +def group_similar_trips(new_labels,track): + bin_sim_trips_idx = [] + + # find the unique set of bins and store their indices into `bin_sim_trips` + label_set = set(new_labels) + # convert the set of unique labels into their indices + # concretely, if the input labels are ['a','a','a','b','b','b'] + # the unique labels are ['a', 'b'] + for sel_label in label_set: + # for the first iteration, bin = [0,1,2] + # for the second iteration, bin = [3,4,5] + bin = [index for (index, label) in enumerate(new_labels) if label == sel_label] + bin_sim_trips_idx.append(bin) + # At the end, bin_sim_trips_idx = [[0,1,2],[3,4,5]] + + # using track to replace the current indices with original indices + for bin in bin_sim_trips_idx: + # in the first iteration, bin = [0,1,2] + # in the first iteration of that, we map the trip index of the + # common trip (e.g. 0) to the original index for that trip from the track (e.g. 42) + for i in range(len(bin)): + bin[i] = track[bin[i]][0] + # At this point, the bin_sim_trips_idx will have original indices for the trips + return bin_sim_trips_idx + + + +# replace the first round labels with new labels +# - track: a list to store the indices and labels from the first round of clustering +# for item in track, item[0] is the original index of the trip in filter_trips +# item[1] is the label after the first round of clustering +# we change the labels from the first round with new labels from the second round here +def change_track_labels(track,new_labels): + for i in range(len(new_labels)): + track[i][1] = new_labels[i] + return track diff --git a/emission/analysis/modelling/tour_model/load_predict.py b/emission/analysis/modelling/tour_model/load_predict.py new file mode 100644 index 000000000..862dba0a1 --- /dev/null +++ b/emission/analysis/modelling/tour_model/load_predict.py @@ -0,0 +1,162 @@ +# Standard imports +import jsonpickle as jpickle +import logging + +# Our imports +import emission.storage.timeseries.abstract_timeseries as esta +import emission.analysis.modelling.tour_model.similarity as similarity +import emission.analysis.modelling.tour_model.similarity as similarity +import emission.analysis.modelling.tour_model.data_preprocessing as preprocess + + +def loadModelStage(filename): + import jsonpickle.ext.numpy as jsonpickle_numpy + jsonpickle_numpy.register_handlers() + model = loadModel(filename) + return model + + +def loadModel(filename): + fd = open(filename, "r") + all_model = fd.read() + all_model = jpickle.loads(all_model) + fd.close() + return all_model + + +def in_bin(bin_location_features,new_trip_location_feat,radius): + start_b_lon = new_trip_location_feat[0] + start_b_lat = new_trip_location_feat[1] + end_b_lon = new_trip_location_feat[2] + end_b_lat = new_trip_location_feat[3] + for feat in bin_location_features: + start_a_lon = feat[0] + start_a_lat = feat[1] + end_a_lon = feat[2] + end_a_lat = feat[3] + start = similarity.within_radius(start_a_lat, start_a_lon, start_b_lat, start_b_lon,radius) + end = similarity.within_radius(end_a_lat, end_a_lon, end_b_lat, end_b_lon, radius) + if start and end: + continue + else: + return False + return True + + +def predict_labels(trip): + radius = 100 + user = trip['user_id'] + logging.debug(f"At stage: extracting features") + trip_feat = preprocess.extract_features([trip])[0] + trip_loc_feat = trip_feat[0:4] + logging.debug(f"At stage: loading model") + try: + # load locations of bins(1st round of clustering) + # e.g.{'0': [[start lon1, start lat1, end lon1, end lat1],[start lon, start lat, end lon, end lat]]} + # another explanation: -'0': label from the 1st round + # - the value of key '0': all trips that in this bin + # - for every trip: the coordinates of start/end locations + bin_locations = loadModelStage('locations_' + str(user))[0] + + # load models from the 2nd round of clustering + # we use Kmeans to build the model in the previous model building step + # assume that we have 2 clusters from the 1st round(that means 2 bins), + # the following is an example of the saved models. + # e.g. {'0': KMeans(n_clusters=2, random_state=0), '1': KMeans(n_clusters=5, random_state=0)} + models = loadModelStage('models_' + str(user))[0] + + # load user labels in all clusters + # assume that we have 1 cluster(bin) from the 1st round of clustering, which has label '0', + # and we have 1 cluster from the 2nd round, which has label '1' + # the value of key '0' contains all 2nd round clusters + # the value of key '1' contains all user labels and probabilities in this cluster + # e.g. {'0': [{'1': [{'labels': {'mode_confirm': 'shared_ride', 'purpose_confirm': 'home', 'replaced_mode': 'drove_alone'}}]}]} + user_labels = loadModelStage('user_labels_' + str(user))[0] + + except IOError as e: + logging.exception(e) + return [] + + logging.debug(f"At stage: first round labeling") + first_round_label_set = list(bin_locations.keys()) + sel_fl = None + for fl in first_round_label_set: + # extract location features of selected bin + sel_loc_feat = bin_locations[fl] + # Check if start/end locations of the new trip and every start/end locations in this bin are within the range of + # radius. If so, the new trip falls in this bin. Then predict the second round label of the new trip + # using this bin's model + if in_bin(sel_loc_feat, trip_loc_feat, radius): + sel_fl = fl + break + if not sel_fl: + logging.debug(f"sel_fl = {sel_fl}, early return") + return [] + logging.debug(f"At stage: second round labeling") + # choose selected model + sel_model = models[sel_fl] + # predict 2nd label for the new trip + # the value of sel_model.predict([trip_feat]) by default is numpy.ndarray, e.g.[1] + # so we need to turn it into '1' so that it can be the same type as dict key in user_labels dicts + sel_sl = str(sel_model.predict([trip_feat])[0]) + + # - seccond_round_result: values of the key of selected 1st round label + # e.g.{'0': [{'labels': {'mode_confirm': 'shared_ride', 'purpose_confirm': 'home', 'replaced_mode': 'drove_alone'}, + # 'p': 1.0}], + # '1': [{'labels': {'mode_confirm': 'shared_ride', 'purpose_confirm': 'church', 'replaced_mode': 'drove_alone'}, + # 'p': 0.9333333333333333},{'labels': {'mode_confirm': 'shared_ride', 'purpose_confirm': 'entertainment', + # 'replaced_mode': 'drove_alone'},'p': 0.06666666666666667}]} + # more explanation: '0' and '1' are 2nd round clusters from a bin(1st round cluster) + # cluster '0' contains all user label combinations and probabilities in this cluster + seccond_round_result = user_labels[sel_fl][0] + second_label_ls = list(seccond_round_result.keys()) + if sel_sl not in second_label_ls: + return [] + # values of the selected 2nd round label, wrapped in a list + sel_2nd_round_val = seccond_round_result[sel_sl] + logging.debug(f"Found prediction {sel_2nd_round_val}") + + return sel_2nd_round_val + + + +if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', + level=logging.DEBUG) + all_users = esta.TimeSeries.get_uuid_list() + + # case 1: the new trip matches a bin from the 1st round and a cluster from the 2nd round + user = all_users[0] + radius = 100 + trips = preprocess.read_data(user) + filter_trips = preprocess.filter_data(trips, radius) + new_trip = filter_trips[4] + # result is [{'labels': {'mode_confirm': 'shared_ride', 'purpose_confirm': 'church', 'replaced_mode': 'drove_alone'}, + # 'p': 0.9333333333333333}, {'labels': {'mode_confirm': 'shared_ride', 'purpose_confirm': 'entertainment', + # 'replaced_mode': 'drove_alone'}, 'p': 0.06666666666666667}] + pl = predict_labels(new_trip) + assert len(pl) > 0, f"Invalid prediction {pl}" + + # case 2: no existing files for the user who has the new trip: + # 1. the user is invalid(< 10 existing fully labeled trips, or < 50% of trips that fully labeled) + # 2. the user doesn't have common trips + user = all_users[1] + trips = preprocess.read_data(user) + new_trip = trips[0] + # result is [] + pl = predict_labels(new_trip) + assert len(pl) == 0, f"Invalid prediction {pl}" + + # case3: the new trip is novel trip(doesn't fall in any 1st round bins) + user = all_users[0] + radius = 100 + trips = preprocess.read_data(user) + filter_trips = preprocess.filter_data(trips, radius) + new_trip = filter_trips[0] + # result is [] + pl = predict_labels(new_trip) + assert len(pl) == 0, f"Invalid prediction {pl}" + + # case 4: the new trip falls in a 1st round bin, but predict to be a new cluster in the 2nd round + # result is [] + # no example for now diff --git a/emission/analysis/modelling/tour_model/second_round_of_clustering.py b/emission/analysis/modelling/tour_model/second_round_of_clustering.py new file mode 100644 index 000000000..d68dda9fa --- /dev/null +++ b/emission/analysis/modelling/tour_model/second_round_of_clustering.py @@ -0,0 +1,76 @@ +import emission.analysis.modelling.tour_model.label_processing as lp +import emission.analysis.modelling.tour_model.data_preprocessing as preprocess +import copy + +""" +This class is used in evaluation_pipeline. It takes the result from the first round for further analysis. +The logic for the second round of clustering: +Each bin from the first round of clustering goes through the hierarchical(agglomerative)clustering (second round) +and is assigned a label. After that, the number of clusters in each bin (n_cluster) will be passed in Kmeans algorithm. +Then, the data in the bin will be assigned a new label. + +Hierarchical clustering and Kmeans do similar thing here. +Since we use Kmeans for model building later(hierarchical clustering doesn't have "fit" and "predict" separated), +for better evaluation, we use the same clustering algorithm (Kmeans) here. +Also, Kmeans takes n_cluster as parameter. We need to run hierarchical clustering first to get number of clusters. +The tuning parameters passed in the second round are to tune hierarchical clustering. + +The main goal for the second round of clustering is to get homogeneity scores and the percentages of user label request. +""" + + + +class SecondRoundOfClustering(object): + def __init__(self, data, first_labels): + if not data: + self.data = [] + self.data = data + self.new_labels = copy.copy(first_labels) + + + def get_sel_features_and_trips(self,first_labels,l): + # store second round trips data + second_round_trips = [] + # create a track to store indices and labels for the second round + second_round_idx_labels = [] + for index, first_label in enumerate(first_labels): + if first_label == l: + second_round_trips.append(self.data[index]) + second_round_idx_labels.append([index, first_label]) + x = preprocess.extract_features(second_round_trips) + self.x = x + self.second_round_trips = second_round_trips + self.second_round_idx_labels = second_round_idx_labels + + # We choose single-linkage clustering. + # See examples and explanations at https://en.wikipedia.org/wiki/Single-linkage_clustering + # It is based on grouping clusters in bottom-up fashion (agglomerative clustering), + # at each step combining two clusters that contain the closest pair of elements not yet belonging + # to the same cluster as each other. + def hierarcial_clustering(self,low,dist_pct): + method = 'single' + # get the second label from the second round of clustering using hierarchical clustering + second_labels = lp.get_second_labels(self.x, method, low, dist_pct) + self.second_labels = second_labels + + # for test set, we use kmeans to re-run the clustering, then evaluate it later + def kmeans_clustering(self): + second_labels = lp.kmeans_clusters(self.second_labels, self.x) + self.second_labels = second_labels + + # concatenate the first label (label from the first round) and the second label (label + # from the second round) (e.g.first label[1,1,1], second label[1,2,3], new_labels is [11,12,13] + def get_new_labels(self,first_labels): + # new_labels temporary stores the labels from the first round, but later the labels in new_labels will be + # updated with the labels after two rounds of clustering. + new_labels = lp.get_new_labels(self.second_labels, self.second_round_idx_labels, self.new_labels) + self.new_labels = new_labels + return self.new_labels + + # change the labels in track with new_labels + def get_new_track(self,track): + track = lp.change_track_labels(track, self.new_labels) + self.track = track + return self.track + + diff --git a/emission/analysis/modelling/tour_model/similarity.py b/emission/analysis/modelling/tour_model/similarity.py index 8914ea463..58e1fecb4 100644 --- a/emission/analysis/modelling/tour_model/similarity.py +++ b/emission/analysis/modelling/tour_model/similarity.py @@ -12,6 +12,7 @@ import logging import math import numpy +import pandas as pd import copy from sklearn import metrics from numpy.linalg import norm @@ -46,16 +47,12 @@ def filter_too_short(all_trips, radius): # But let's go from working to working valid_trips = copy.copy(all_trips) for t in all_trips: - logging.debug("Considering trip %s" % t) + logging.debug(f"Considering trip {t['_id']}: {t.data.start_fmt_time} -> {t.data.end_fmt_time}, {t.data.start_loc} -> {t.data.end_loc}") try: - start_place = esda.get_entry(esda.CLEANED_PLACE_KEY, - t.data.start_place) - end_place = esda.get_entry(esda.CLEANED_PLACE_KEY, - t.data.end_place) - start_lon = start_place.data.location["coordinates"][0] - start_lat = start_place.data.location["coordinates"][1] - end_lon = end_place.data.location["coordinates"][0] - end_lat = end_place.data.location["coordinates"][1] + start_lon = t.data.start_loc["coordinates"][0] + start_lat = t.data.start_loc["coordinates"][1] + end_lon = t.data.end_loc["coordinates"][0] + end_lat = t.data.end_loc["coordinates"][1] logging.debug("endpoints are = (%s, %s) and (%s, %s)" % (start_lon, start_lat, end_lon, end_lat)) if within_radius(start_lat, start_lon, end_lat, end_lon, radius): @@ -68,15 +65,55 @@ def filter_too_short(all_trips, radius): return valid_trips class similarity(object): - def __init__(self, data, radius): - inData = data + def __init__(self, data, radius, shouldFilter=True, cutoff=True): if not data: - inData = [] + self.all_data = [] + # In order to retrofit multiple invocation options without undertaking + # a full restructuring, we will use the following structure for the + # data + # self.data will always represent the current state of the trips + # self.bins will always represent the current state of the bins + # In addition, self.all_data will represent all input trips + # In addition, self.filtered_data will represent the output of "filter_too_short" + # In addition, self.data_above_cutoff will represent the output of "delete_bins" + # so even after finishing all three steps, we will have access to the + # original input data + # since we are only copying the lists (copy), not the objects in the + # lists (deepcopy), this should not involve an undue user burden + # I really don't see why we need to work with indices here, but in the absence of unit tests, + # I am not comfortable with restructuring further + self.all_data = data + self.set_state(self.all_data) self.bins = [] self.radius = float(radius) - self.data = filter_too_short(inData, self.radius) + self.shouldFilter = shouldFilter + self.cutoff = cutoff + + def set_state(self, in_data): + """ + Encapsulates all the state related to this object + so that we don't forget to update everything + """ + self.data = copy.copy(in_data) self.size = len(self.data) + def fit(self): + if self.shouldFilter: + self.filter_trips() + self.bin_data() + if self.cutoff: + self.delete_bins() + self.labels_ = self.get_result_labels() + + + # Pull out the trip filtration code so that we can invoke the code in + # multiple ways (with and without filteration) depending on whether we want + # to focus on common trips or auto-labeling + def filter_trips(self): + self.filtered_data = filter_too_short(self.all_data, self.radius) + self.set_state(self.filtered_data) + + #create bins def bin_data(self): for a in range(self.size): @@ -88,6 +125,7 @@ def bin_data(self): added = True break except: + print(f"Got exception while matching trip {a}, creating new bin") added = False if not added: self.bins.append([a]) @@ -95,13 +133,20 @@ def bin_data(self): def calc_cutoff_bins(self): if len(self.bins) <= 1: + print(f"{len(self.bins)}, no cutoff") self.newdata = self.data + self.data_above_cutoff = self.newdata + self.set_state(self.newdata) + self.num = len(self.bins) return num = self.elbow_distance() + logging.debug("bins = %s, elbow distance = %s" % (self.bins, num)) sum = 0 for i in range(len(self.bins)): sum += len(self.bins[i]) if len(self.bins[i]) <= len(self.bins[num]): + logging.debug("found weird condition, self.bins[i] = %s, self.bins[num] = %s" % + (self.bins[i], self.bins[num])) sum -= len(self.bins[i]) num = i break @@ -120,11 +165,83 @@ def delete_bins(self): for b in bin: d = self.data[b] newdata.append(self.data[b]) - self.newdata = newdata if len(newdata) > 1 else self.data + self.newdata = newdata if len(newdata) > 1 else copy.copy(self.data) + self.data_above_cutoff = self.newdata + self.set_state(self.newdata) self.below_cutoff = below_cutoff self.below_cutoff.sort(key=lambda bin: len(bin), reverse=True) + def get_result_labels(self): + """ + Return "labels" for the trips, to be consistent with sklearn + implementations. This is not otherwise compatible with sklearn, but + would be great to submit it as an example of an absolute radius, even + if it is computationally expensive. + + It would certainly help with: + https://stackoverflow.com/questions/48217127/distance-based-classification + and + https://stackoverflow.com/questions/35971441/how-to-adjust-this-dbscan-algorithm-python + + This function sets labels based on the various states for the trips. + Pulling this out + as a separate function to write unit tests. This would be normally be + trivial - we would just index the all_trip_df on the trips in each bin and + set a unique number. However, if we have filtered before binning, then the + trip indices in the bin are the filtered trip indices, which are different + from all_trips indices. So we need to remap from one set of indices to + another before we can assign the labels. + param: all_trip_df: dataframe of all trips + param: filtered_trip_df: dataframe of trips that were removed as "too short" + param: bins (either all, or above cutoff only) + + Returns: pandas Series with labels: + >=0 for trips in bins. the label is a unique bin id + =-1 for long trips not in bins + =-2 for trips that were too short + """ + # This is a bit tricky wrt indexing, since the indices of the trips in the bin are after filtering, + # so don't match up 1:1 with the indices in the trip dataframe + # since we create a new dataframe for the filtered trips, they should match up with the filtered dataframe + # but the index of the filtered dataframe is a new RangeIndex, so it doesn't work for indexing into the result series + # so we need to follow a two-step process as below + + all_trip_df = pd.DataFrame(self.all_data) + if hasattr(self, "filtered_data"): + filtered_trip_df = pd.DataFrame([e for e in self.filtered_data]) + # print(filtered_trip_df) + else: + filtered_trip_df = None + + # logging.debug(f"lengths: {len(all_trip_df)}, {len(filtered_trip_df) if filtered_trip_df is not None else None}") + + # assume that everything is noise to start with + result_labels = pd.Series([-1] * len(all_trip_df), dtype="int") + + # self.bins contains the bins in the current state (either before or + # after cutoff). Loop will not run if binning is not complete, so all + # trips will be noise + for i, curr_bin in enumerate(self.bins): + if filtered_trip_df is not None and len(filtered_trip_df) > 0: + # get the trip ids of matching filtered trips for the current bin + matching_filtered_trip_ids = filtered_trip_df.loc[curr_bin]._id + # then, match by tripid to find the corresponding entries in the all_trips dataframe + matching_all_trip_ids = all_trip_df[all_trip_df._id.isin(matching_filtered_trip_ids)].index + # then set all those indices to the bin index + result_labels.loc[matching_all_trip_ids] = i + else: + # No filteration, trip indices in the bins are from all trips + # so we can index directly + result_labels.loc[curr_bin] = i + # For now, we also mark the "too short" labels with -2 to help with + # our understanding. Consider removing this code later. This will override + # noisy labels + if filtered_trip_df is not None and len(filtered_trip_df) > 0: + removed_trips = all_trip_df[~all_trip_df._id.isin(filtered_trip_df._id)] + logging.debug("Detected removed trips %s" % removed_trips.index) + result_labels.loc[removed_trips.index] = -2 + return result_labels #calculate the cut-off point in the histogram #This is motivated by the need to calculate the cut-off point @@ -180,14 +297,10 @@ def evaluate_bins(self): for bin in self.bins: for b in bin: tb = self.data[b] - start_place = esda.get_entry(esda.CLEANED_PLACE_KEY, - tb.data.start_place) - end_place = esda.get_entry(esda.CLEANED_PLACE_KEY, - tb.data.end_place) - start_lon = start_place.data.location["coordinates"][0] - start_lat = start_place.data.location["coordinates"][1] - end_lon = end_place.data.location["coordinates"][0] - end_lat = end_place.data.location["coordinates"][1] + start_lon = tb.data.start_loc["coordinates"][0] + start_lat = tb.data.start_loc["coordinates"][1] + end_lon = tb.data.end_loc["coordinates"][0] + end_lat = tb.data.end_loc["coordinates"][1] path = [start_lat, start_lon, end_lat, end_lon] points.append(path) logging.debug("number of labels are %d, number of points are = %d" % diff --git a/emission/analysis/modelling/tour_model_first_only/__init__.py b/emission/analysis/modelling/tour_model_first_only/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/emission/analysis/modelling/tour_model_first_only/build_save_model.py b/emission/analysis/modelling/tour_model_first_only/build_save_model.py new file mode 100644 index 000000000..1277159ed --- /dev/null +++ b/emission/analysis/modelling/tour_model_first_only/build_save_model.py @@ -0,0 +1,78 @@ +# Standard imports +import copy +import pandas as pd +import logging +import jsonpickle as jpickle +import sklearn.cluster as sc + +# Our imports +import emission.storage.timeseries.abstract_timeseries as esta + +import emission.analysis.modelling.tour_model_first_only.get_users as gu +import emission.analysis.modelling.tour_model.label_processing as lp +import emission.analysis.modelling.tour_model_first_only.evaluation_pipeline as ep +import emission.analysis.modelling.tour_model.load_predict as load +import emission.analysis.modelling.tour_model_first_only.data_preprocessing as preprocess + +RADIUS=500 + +def save_models(obj_name,obj,user): + obj_capsule = jpickle.dumps(obj) + filename = obj_name + '_first_round_' + str(user) + with open(filename, "w") as fd: + fd.write(obj_capsule) + +def create_location_map(trip_list, bins): + bin_loc_feat = {} + user_input_map = {} + for i, curr_bin in enumerate(bins): + # print(f"Considering {curr_bin} for trip list of length {len(trip_list)}") + bin_trips = [trip_list[j] for j in curr_bin] + # print(f"Considering {bin_trips} for bin {curr_bin}") + x = preprocess.extract_features(bin_trips) + bin_loc_feat[str(i)] = [feat[0:4] for feat in x] + return bin_loc_feat + +def create_user_input_map(trip_list, bins): + # map from bin index to user input probabilities + # e.g. {"0": [{'labels': {'mode_confirm': 'drove_alone', 'purpose_confirm': 'work', 'replaced_mode': 'drove_alone'}, 'p': 1.0}]} + user_input_map = {} + for b, curr_bin in enumerate(bins): + bin_trips = [trip_list[j] for j in curr_bin] + user_label_df = pd.DataFrame([trip['data']['user_input'] for trip in bin_trips]) + user_label_df = lp.map_labels(user_label_df).dropna() + # compute the sum of trips in this cluster + sum_trips = len(user_label_df) + # compute unique label sets and their probabilities in one cluster + # 'p' refers to probability + unique_labels = user_label_df.groupby(user_label_df.columns.tolist()).size().reset_index(name='uniqcount') + unique_labels['p'] = unique_labels.uniqcount / sum_trips + labels_columns = user_label_df.columns.to_list() + bin_label_combo_list = [] + for i in range(len(unique_labels)): + one_set_labels = {} + # e.g. labels_only={'mode_confirm': 'pilot_ebike', 'purpose_confirm': 'work', 'replaced_mode': 'walk'} + labels_only = {column: unique_labels.iloc[i][column] for column in labels_columns} + one_set_labels["labels"] = labels_only + one_set_labels['p'] = unique_labels.iloc[i]['p'] + # e.g. one_set_labels = {'labels': {'mode_confirm': 'walk', 'replaced_mode': 'walk', 'purpose_confirm': 'exercise'}, 'p': 1.0} + bin_label_combo_list.append(one_set_labels) + user_input_map[str(b)] = bin_label_combo_list + return user_input_map + +def build_user_model(user): + trips = preprocess.read_data(user) + filter_trips = preprocess.filter_data(trips, RADIUS) + # filter out users that don't have enough valid labeled trips + if not gu.valid_user(filter_trips, trips): + logging.debug(f"Total: {len(trips)}, labeled: {len(filter_trips)}, user {user} doesn't have enough valid trips for further analysis.") + return + # run the first round of clustering + sim, bins, bin_trips, filter_trips = ep.first_round(filter_trips, RADIUS) + + # save all user labels + save_models('user_labels',create_user_input_map(filter_trips, bins),user) + + # save location features of all bins + save_models('locations',create_location_map(filter_trips, bins),user) + diff --git a/emission/analysis/modelling/tour_model_first_only/data_preprocessing.py b/emission/analysis/modelling/tour_model_first_only/data_preprocessing.py new file mode 100644 index 000000000..79c210df9 --- /dev/null +++ b/emission/analysis/modelling/tour_model_first_only/data_preprocessing.py @@ -0,0 +1,54 @@ +import emission.storage.decorations.analysis_timeseries_queries as esda +import emission.analysis.modelling.tour_model.cluster_pipeline as pipeline +import emission.analysis.modelling.tour_model.similarity as similarity +import pandas as pd +from sklearn.model_selection import KFold + + +# read data that have user labels +def read_data(user): + trips = pipeline.read_data(uuid=user, key=esda.CONFIRMED_TRIP_KEY) + return trips + + +# - trips: all trips read from database +# - filter_trips: valid trips that have user labels and are not points +def filter_data(trips,radius): + non_empty_trips = [t for t in trips if t["data"]["user_input"] != {}] + non_empty_trips_df = pd.DataFrame(t["data"]["user_input"] for t in non_empty_trips) + valid_trips_idx_ls = non_empty_trips_df.index.tolist() + valid_trips = [non_empty_trips[i]for i in valid_trips_idx_ls] + return valid_trips + + +# use KFold (n_splits=5) to split the data into 5 models (5 training sets, 5 test sets) +def extract_features(filter_trips): + X = [] + for trip in filter_trips: + start = trip.data.start_loc["coordinates"] + end = trip.data.end_loc["coordinates"] + distance = trip.data.distance + duration = trip.data.duration + X.append([start[0], start[1], end[0], end[1], distance, duration]) + return X + +def split_data(filter_trips): + X = extract_features(filter_trips) + kf = KFold(n_splits=5, shuffle=True, random_state=3) + train_idx = [] + test_idx = [] + for train_index, test_index in kf.split(X): + train_idx.append(train_index) + test_idx.append(test_index) + return train_idx, test_idx + + +# collect a set of data(training/test set) after splitting +def get_subdata(filter_trips,train_test_set): + collect_sub_data = [] + for train_test_subset in train_test_set: + sub_data = [] + for idx in train_test_subset: + sub_data.append(filter_trips[idx]) + collect_sub_data.append(sub_data) + return collect_sub_data diff --git a/emission/analysis/modelling/tour_model_first_only/evaluation_pipeline.py b/emission/analysis/modelling/tour_model_first_only/evaluation_pipeline.py new file mode 100644 index 000000000..dddcd7d95 --- /dev/null +++ b/emission/analysis/modelling/tour_model_first_only/evaluation_pipeline.py @@ -0,0 +1,200 @@ +# Standard imports +import numpy as np +import pandas as pd +import jsonpickle as jpickle +import logging + +# Our imports +import emission.storage.timeseries.abstract_timeseries as esta + +import emission.analysis.modelling.tour_model.similarity as similarity +import emission.analysis.modelling.tour_model.label_processing as lp +import emission.analysis.modelling.tour_model.data_preprocessing as preprocess +import emission.analysis.modelling.tour_model.second_round_of_clustering as sr +import emission.analysis.modelling.tour_model.get_users as gu + +def second_round(bin_trips,filter_trips,first_labels,track,low,dist_pct,sim,kmeans): + sec = sr.SecondRoundOfClustering(bin_trips,first_labels) + first_label_set = list(set(first_labels)) + for l in first_label_set: + sec.get_sel_features_and_trips(first_labels,l) + sec.hierarcial_clustering(low, dist_pct) + if kmeans: + sec.kmeans_clustering() + new_labels = sec.get_new_labels(first_labels) + track = sec.get_new_track(track) + # get request percentage for the subset for the second round + percentage_second = grp.get_req_pct(new_labels, track, filter_trips, sim) + # get homogeneity score for the second round + homo_second = gs.score(bin_trips, new_labels) + return percentage_second,homo_second + + +# we use functions in similarity to build the first round of clustering +def first_round(data,radius): + sim = similarity.similarity(data, radius, shouldFilter=False, cutoff=False) + filter_trips = sim.data + sim.fit() + bins = sim.bins + bin_trips = sim.data + return sim, bins, bin_trips, filter_trips + + +def get_first_label(bins): + # get first round labels + # the labels from the first round are the indices of bins + # e.g. in bin 0 [trip1, trip2, trip3], the labels of this bin is [0,0,0] + first_labels = [] + for b in range(len(bins)): + for trip in bins[b]: + first_labels.append(b) + return first_labels + + +def get_track(bins, first_labels): + # create a list idx_labels_track to store indices and labels + # the indices of the items will be the same in the new label list after the second round clustering + # item[0] is the original index of the trip in filter_trips + # item[1] is the label from the first round of clustering + idx_labels_track = [] + for bin in bins: + for ori_idx in bin: + idx_labels_track.append([ori_idx]) + # store first round labels in idx_labels_track list + for i in range(len(first_labels)): + idx_labels_track[i].append(first_labels[i]) + + return idx_labels_track + + +def get_first_label_and_track(bins,bin_trips,filter_trips): + gs.compare_trip_orders(bins, bin_trips, filter_trips) + first_labels = get_first_label(bins) + track = get_track(bins, first_labels) + return first_labels,track + + +def tune(data,radius,kmeans): + sim, bins, bin_trips, filter_trips = first_round(data, radius) + # it is possible that we don't have common trips for tuning or testing + # bins contain common trips indices + if len(bins) is not 0: + first_labels, track = get_first_label_and_track(bins,bin_trips,filter_trips) + # collect tuning scores and parameters + tune_score = {} + for dist_pct in np.arange(0.15, 0.6, 0.02): + for low in range(250, 600): + + percentage_second, homo_second = second_round(bin_trips,filter_trips,first_labels,track,low,dist_pct, + sim,kmeans) + + curr_score = gs.get_score(homo_second, percentage_second) + if curr_score not in tune_score: + tune_score[curr_score] = (low, dist_pct) + + best_score = max(tune_score) + sel_tradeoffs = tune_score[best_score] + low = sel_tradeoffs[0] + dist_pct = sel_tradeoffs[1] + else: + low = 0 + dist_pct = 0 + + return low,dist_pct + + +def test(data,radius,low,dist_pct,kmeans): + sim, bins, bin_trips, filter_trips = first_round(data, radius) + # it is possible that we don't have common trips for tuning or testing + # bins contain common trips indices + if len(bins) is not 0: + first_labels, track = get_first_label_and_track(bins,bin_trips,filter_trips) + # new_labels temporary stores the labels from the first round, but later the labels in new_labels will be + # updated with the labels after two rounds of clustering. + new_labels = first_labels.copy() + # get request percentage for the subset for the first round + percentage_first = grp.get_req_pct(new_labels, track, filter_trips, sim) + # get homogeneity score for the subset for the first round + homo_first = gs.score(bin_trips, first_labels) + percentage_second, homo_second = second_round(bin_trips, filter_trips, first_labels, track, low, dist_pct, + sim, kmeans) + else: + percentage_first = 1 + homo_first = 1 + percentage_second = 1 + homo_second = 1 + scores = gs.get_score(homo_second, percentage_second) + return homo_first,percentage_first,homo_second,percentage_second,scores + + +def main(all_users): + radius = 100 + all_filename = [] + for a, user in enumerate(all_users): + logging.info(f"Starting evaluation for {user}") + df = pd.DataFrame(columns=['user','user_id','percentage of 1st round','homogeneity socre of 1st round', + 'percentage of 2nd round','homogeneity socre of 2nd roun','scores','lower boundary', + 'distance percentage']) + logging.info(f"At stage: Reading data") + trips = preprocess.read_data(user) + logging.info(f"At stage: Filtering data") + filter_trips = preprocess.filter_data(trips, radius) + # filter out users that don't have enough valid labeled trips + if not gu.valid_user(filter_trips, trips): + logging.warn(f"User {user} is invalid, early return") + continue + logging.info(f"At stage: Splitting data") + tune_idx, test_idx = preprocess.split_data(filter_trips) + # choose tuning/test set to run the model + # this step will use KFold (5 splits) to split the data into different subsets + # - tune: tuning set + # - test: test set + # Here we user a bigger part of the data for testing and a smaller part for tuning + tune_data = preprocess.get_subdata(filter_trips, test_idx) + test_data = preprocess.get_subdata(filter_trips, tune_idx) + + # tune data + for i, curr_tune in enumerate(tune_data): + logging.info(f"At stage: starting tuning for stage {i}") + # for tuning, we don't add kmeans for re-clustering. We just need to get tuning parameters + # - low: the lower boundary of the dendrogram. If the final distance of the dendrogram is lower than "low", + # this bin no need to be re-clutered. + # - dist_pct: the higher boundary of the dendrogram. If the final distance is higher than "low", + # the cutoff of the dendrogram is (the final distance of the dendrogram * dist_pct) + low, dist_pct = tune(curr_tune, radius, kmeans=False) + df.loc[i,'lower boundary']=low + df.loc[i,'distance percentage']=dist_pct + + # testing + for i, curr_test in enumerate(test_data): + logging.info(f"At stage: starting testing for stage {i}") + low = df.loc[i,'lower boundary'] + dist_pct = df.loc[i,'distance percentage'] + + # for testing, we add kmeans to re-build the model + homo_first, percentage_first, homo_second, percentage_second, scores = test(curr_test,radius,low, + dist_pct,kmeans=True) + df.loc[i, 'percentage of 1st round'] = percentage_first + df.loc[i, 'homogeneity socre of 1st round'] = homo_first + df.loc[i, 'percentage of 2nd round'] = percentage_second + df.loc[i, 'homogeneity socre of 2nd round'] = homo_second + df.loc[i, 'scores'] = scores + df['user_id'] = user + df['user']='user'+str(a+1) + + logging.info(f"At stage: parameter selection outputs complete") + filename = "user_" + str(user) + ".csv" + all_filename.append(filename) + df.to_csv(filename, index=True, index_label='split') + + # collect filename in a file, use it to plot the scatter + collect_filename = jpickle.dumps(all_filename) + with open("collect_filename", "w") as fd: + fd.write(collect_filename) + + +if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', + level=logging.DEBUG) + all_users = esta.TimeSeries.get_uuid_list() + main(all_users) diff --git a/emission/analysis/modelling/tour_model_first_only/get_users.py b/emission/analysis/modelling/tour_model_first_only/get_users.py new file mode 100644 index 000000000..4e726122b --- /dev/null +++ b/emission/analysis/modelling/tour_model_first_only/get_users.py @@ -0,0 +1,31 @@ +import emission.analysis.modelling.tour_model.data_preprocessing as preprocess + + +# to determine if the user is valid: +# valid user should have >= 10 trips for further analysis and the proportion of filter_trips is >=50% +def valid_user(filter_trips,trips): + valid = False + if len(filter_trips) >= 14: + valid = True + return valid + + +# - user_ls: a list of strings representing short user names, such as [user1, user2, user3...] +# - valid_user_ls: a subset of `user_ls` for valid users, so also string representation of user names +# - all_users: a collection of all user ids, in terms of user id objects +def get_user_ls(all_users,radius): + user_ls = [] + valid_user_ls = [] + for i in range(len(all_users)): + curr_user = 'user' + str(i + 1) + user = all_users[i] + trips = preprocess.read_data(user) + filter_trips = preprocess.filter_data(trips,radius) + if valid_user(filter_trips,trips): + valid_user_ls.append(curr_user) + user_ls.append(curr_user) + else: + user_ls.append(curr_user) + continue + return user_ls,valid_user_ls + diff --git a/emission/analysis/modelling/tour_model_first_only/load_predict.py b/emission/analysis/modelling/tour_model_first_only/load_predict.py new file mode 100644 index 000000000..f3039f04c --- /dev/null +++ b/emission/analysis/modelling/tour_model_first_only/load_predict.py @@ -0,0 +1,151 @@ +# Standard imports +import jsonpickle as jpickle +import logging + +# Our imports +import emission.storage.timeseries.abstract_timeseries as esta +import emission.analysis.modelling.tour_model.similarity as similarity +import emission.analysis.modelling.tour_model.similarity as similarity +import emission.analysis.modelling.tour_model.data_preprocessing as preprocess + +RADIUS=500 + +def loadModelStage(filename): + import jsonpickle.ext.numpy as jsonpickle_numpy + jsonpickle_numpy.register_handlers() + model = loadModel(filename) + return model + + +def loadModel(filename): + fd = open(filename, "r") + all_model = fd.read() + all_model = jpickle.loads(all_model) + fd.close() + return all_model + + +def in_bin(bin_location_features,new_trip_location_feat,radius): + start_b_lon = new_trip_location_feat[0] + start_b_lat = new_trip_location_feat[1] + end_b_lon = new_trip_location_feat[2] + end_b_lat = new_trip_location_feat[3] + for feat in bin_location_features: + start_a_lon = feat[0] + start_a_lat = feat[1] + end_a_lon = feat[2] + end_a_lat = feat[3] + start = similarity.within_radius(start_a_lat, start_a_lon, start_b_lat, start_b_lon,radius) + end = similarity.within_radius(end_a_lat, end_a_lon, end_b_lat, end_b_lon, radius) + if start and end: + continue + else: + return False + return True + +def find_bin(trip, bin_locations, radius): + trip_feat = preprocess.extract_features([trip])[0] + trip_loc_feat = trip_feat[0:4] + first_round_label_set = list(bin_locations.keys()) + sel_fl = None + for fl in first_round_label_set: + # extract location features of selected bin + sel_loc_feat = bin_locations[fl] + # Check if start/end locations of the new trip and every start/end locations in this bin are within the range of + # radius. If so, the new trip falls in this bin. Then predict the second round label of the new trip + # using this bin's model + if in_bin(sel_loc_feat, trip_loc_feat, radius): + sel_fl = fl + break + if not sel_fl: + logging.debug(f"sel_fl = {sel_fl}, early return") + return -1 + return sel_fl + +# Predict labels and also return the number of trips in the matched cluster +def predict_labels_with_n(trip): + user = trip['user_id'] + logging.debug(f"At stage: extracting features") + trip_feat = preprocess.extract_features([trip])[0] + trip_loc_feat = trip_feat[0:4] + logging.debug(f"At stage: loading model") + try: + # load locations of bins(1st round of clustering) + # e.g.{'0': [[start lon1, start lat1, end lon1, end lat1],[start lon, start lat, end lon, end lat]]} + # another explanation: -'0': label from the 1st round + # - the value of key '0': all trips that in this bin + # - for every trip: the coordinates of start/end locations + bin_locations = loadModelStage('locations_first_round_' + str(user)) + + # load user labels in all clusters + # assume that we have 1 cluster(bin) from the 1st round of clustering, which has label '0', + # and we have 1 cluster from the 2nd round, which has label '1' + # the value of key '0' contains all 2nd round clusters + # the value of key '1' contains all user labels and probabilities in this cluster + # e.g. {'0': [{'1': [{'labels': {'mode_confirm': 'shared_ride', 'purpose_confirm': 'home', 'replaced_mode': 'drove_alone'}}]}]} + user_labels = loadModelStage('user_labels_first_round_' + str(user)) + + # Get the number of trips in each cluster from the number of locations in each bin + # This is a bit hacky; in the future, we might want the model stage to save a metadata file with this and potentially other information + cluster_sizes = {k: len(bin_locations[k]) for k in bin_locations} + + except IOError as e: + logging.info(f"No models found for {user}, no prediction") + return [], -1 + + + logging.debug(f"At stage: first round prediction") + pred_bin = find_bin(trip, bin_locations, RADIUS) + logging.debug(f"At stage: matched with bin {pred_bin}") + + if pred_bin == -1: + logging.info(f"No match found for {trip['data']['start_fmt_time']} early return") + return [], 0 + + user_input_pred_list = user_labels[pred_bin] + this_cluster_size = cluster_sizes[pred_bin] + logging.debug(f"At stage: looked up user input {user_input_pred_list}") + return user_input_pred_list, this_cluster_size + +# For backwards compatibility +def predict_labels(trip): + return predict_labels_with_n(trip)[0] + +if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', + level=logging.DEBUG) + all_users = esta.TimeSeries.get_uuid_list() + + # case 1: the new trip matches a bin from the 1st round and a cluster from the 2nd round + user = all_users[0] + trips = preprocess.read_data(user) + filter_trips = preprocess.filter_data(trips, RADIUS) + new_trip = filter_trips[4] + # result is [{'labels': {'mode_confirm': 'shared_ride', 'purpose_confirm': 'church', 'replaced_mode': 'drove_alone'}, + # 'p': 0.9333333333333333}, {'labels': {'mode_confirm': 'shared_ride', 'purpose_confirm': 'entertainment', + # 'replaced_mode': 'drove_alone'}, 'p': 0.06666666666666667}] + pl = predict_labels(new_trip) + assert len(pl) > 0, f"Invalid prediction {pl}" + + # case 2: no existing files for the user who has the new trip: + # 1. the user is invalid(< 10 existing fully labeled trips, or < 50% of trips that fully labeled) + # 2. the user doesn't have common trips + user = all_users[1] + trips = preprocess.read_data(user) + new_trip = trips[0] + # result is [] + pl = predict_labels(new_trip) + assert len(pl) == 0, f"Invalid prediction {pl}" + + # case3: the new trip is novel trip(doesn't fall in any 1st round bins) + user = all_users[0] + trips = preprocess.read_data(user) + filter_trips = preprocess.filter_data(trips, radius) + new_trip = filter_trips[0] + # result is [] + pl = predict_labels(new_trip) + assert len(pl) == 0, f"Invalid prediction {pl}" + + # case 4: the new trip falls in a 1st round bin, but predict to be a new cluster in the 2nd round + # result is [] + # no example for now diff --git a/emission/core/wrapper/labelprediction.py b/emission/core/wrapper/labelprediction.py index a0baf89ff..c74609e5a 100644 --- a/emission/core/wrapper/labelprediction.py +++ b/emission/core/wrapper/labelprediction.py @@ -15,7 +15,10 @@ class AlgorithmTypes(enum.Enum): PLACEHOLDER_0 = 1 PLACEHOLDER_1 = 2 PLACEHOLDER_2 = 3 - PLACEHOLDER_3 = 3 + PLACEHOLDER_3 = 4 + TWO_STAGE_BIN_CLUSTER = 5 + PLACEHOLDER_PREDICTOR_DEMO = 6 + CONFIDENCE_DISCOUNTED_CLUSTER = 7 class Labelprediction(ecwb.WrapperBase): diff --git a/emission/storage/decorations/trip_queries.py b/emission/storage/decorations/trip_queries.py index 9487b604a..bf635bb66 100644 --- a/emission/storage/decorations/trip_queries.py +++ b/emission/storage/decorations/trip_queries.py @@ -8,6 +8,7 @@ import logging import pymongo import arrow +import pandas as pd import emission.storage.timeseries.timequery as estt @@ -191,3 +192,43 @@ def get_trip_for_user_input_obj(ts, ui_obj): ui_obj.data.start_ts + ONE_DAY) potential_candidates = ts.find_entries(["analysis/confirmed_trip"], tq) return final_candidate(valid_trip(ts, ui_obj), potential_candidates) + +def filter_labeled_trips(mixed_trip_df): + """ + mixed_trip_df: a dataframe with mixed labeled and unlabeled entries + Returns only the labeled entries + """ + if len(mixed_trip_df) == 0: + return mixed_trip_df + labeled_ct = mixed_trip_df[mixed_trip_df.user_input != {}] + logging.debug("After filtering, found %s labeled trips" % len(labeled_ct)) + logging.debug(labeled_ct.head()) + return labeled_ct + +def expand_userinputs(labeled_ct): + """ + labeled_ct: a dataframe that contains only labeled trips + Returns a dataframe with the labels expanded into the main dataframe + If the labels are simple, single level kv pairs (e.g. {mode_confirm: + bike}), the expanded columns can be indexed very simply, like the other + columns in the dataframe. + + TODO: Replace by pandas.io.json.json_normalize? + TODO: Currently duplicated from + https://github.com/e-mission/em-public-dashboard/blob/main/viz_scripts/scaffolding.py + + Should remove it from there + """ + if len(labeled_ct) == 0: + return labeled_ct + label_only = pd.DataFrame(labeled_ct.user_input.to_list(), index=labeled_ct.index) + logging.debug(label_only.head()) + expanded_ct = pd.concat([labeled_ct, label_only], axis=1) + assert len(expanded_ct) == len(labeled_ct), \ + ("Mismatch after expanding labels, expanded_ct.rows = %s != labeled_ct.columns %s" % + (len(expanded_ct), len(labeled_ct))) + logging.debug("After expanding, columns went from %s -> %s" % + (len(labeled_ct.columns), len(expanded_ct.columns))) + logging.debug(expanded_ct.head()) + return expanded_ct + diff --git a/emission/storage/timeseries/abstract_timeseries.py b/emission/storage/timeseries/abstract_timeseries.py index 115efb7e3..e2902b4bf 100644 --- a/emission/storage/timeseries/abstract_timeseries.py +++ b/emission/storage/timeseries/abstract_timeseries.py @@ -37,6 +37,40 @@ def get_uuid_list(): import emission.storage.timeseries.builtin_timeseries as bits return bits.BuiltinTimeSeries.get_uuid_list() + # This is a HACK and is very poor practice. + # It imports two other modules and modifies them directly + # It encodes details of their internal structure + # This is BAD + # DO NOT use this as an example for your own code + # However, with the current code structure, I don't have much of a choice + # both the modules include module variables for greater efficiency, and you + # cannot modify a module variable from a function within the module - it will + # treat it as a local variable + # I remember seeing some examples of how to fix this before, but I can't + # find it now. So we import the modules and change the variables here + @staticmethod + def _reset_url(new_url): + """ + Used for federation, to allow us to connect to multiple databases from a + single client instance. + """ + + from pymongo import MongoClient + import emission.core.get_database as edb + + edb.url = new_url + print("Connecting to new URL "+edb.url+" resetting _current_db link") + edb._current_db = MongoClient(edb.url).Stage_database + print("After changing URL, connection is %s" % edb._current_db) + + import emission.storage.timeseries.builtin_timeseries as bits + bits.ts_enum_map = { + EntryType.DATA_TYPE: edb.get_timeseries_db(), + EntryType.ANALYSIS_TYPE: edb.get_analysis_timeseries_db() + } + print("After resetting the timeseries connections, map is %s" % bits.ts_enum_map) + + def find_entries(self, key_list=None, time_query=None, geo_query=None, extra_query_list=None): """ diff --git a/emission/storage/timeseries/builtin_timeseries.py b/emission/storage/timeseries/builtin_timeseries.py index d03415204..287654943 100644 --- a/emission/storage/timeseries/builtin_timeseries.py +++ b/emission/storage/timeseries/builtin_timeseries.py @@ -93,7 +93,7 @@ def __init__(self, user_id): @staticmethod def get_uuid_list(): - return edb.get_timeseries_db().distinct("user_id") + return edb.get_uuid_db().distinct("uuid") def get_timeseries_db(self, key): """ diff --git a/emission/tests/common.py b/emission/tests/common.py index 91f50039b..b902b096a 100644 --- a/emission/tests/common.py +++ b/emission/tests/common.py @@ -106,16 +106,26 @@ def getRealExampleEmail(testObj): def fillExistingUUID(testObj): userObj = ecwu.User.fromEmail(getRealExampleEmail(testObj)) print("Setting testUUID to %s" % userObj.uuid) - testObj.testUUID = userObj.uuid + testObj.testUUID = userObj.uuir -def createAndFillUUID(testObj): +def getRegEmailIfPresent(testObj): if hasattr(testObj, "evaluation") and testObj.evaluation: + logging.info("evaluation, returning email = %s" % reg_email) reg_email = getRealExampleEmail(testObj) - logging.info("registering email = %s" % reg_email) - user = ecwu.User.register(reg_email) + return reg_email + elif hasattr(testObj, "testEmail"): + return testObj.testEmail + else: + return None + +def createAndFillUUID(testObj): + regEmail = getRegEmailIfPresent(testObj) + if regEmail is not None: + logging.info("registering email = %s" % regEmail) + user = ecwu.User.register(regEmail) testObj.testUUID = user.uuid else: - logging.info("No evaluation flag found, not registering email") + logging.info("No reg email found, not registering email") testObj.testUUID = uuid.uuid4() def setupRealExample(testObj, dump_file): diff --git a/emission/tests/modellingTests/TestSimilarityAux.py b/emission/tests/modellingTests/TestSimilarityAux.py new file mode 100644 index 000000000..3cb78c876 --- /dev/null +++ b/emission/tests/modellingTests/TestSimilarityAux.py @@ -0,0 +1,107 @@ +import unittest +import logging +import pymongo +import json +import bson.json_util as bju +import pandas as pd +import numpy as np +import bson.objectid as boi +from uuid import UUID + +import emission.tests.common as etc + +import emission.analysis.modelling.tour_model.similarity as eamts + +# Class to test auxiliary functions (such as returning the result labels) +# in similarity. +# TODO: This should be merged into the full similarity tests once they are complete + +class TestSimilarityAux(unittest.TestCase): + def setUp(self): + # The _id objects don't need to be object ids + # but having them be object ids does not hurt, + # makes it easier to create dataframes, and is + # a little bit more true to life, so we can catch + # regressions in the pandas code wrt matching by objects, for example + N_OBJECTS = 20 + self.all_trips = [{"_id": boi.ObjectId()} for i in range(N_OBJECTS)] + # logging.debug(self.all_trips) + self.too_short_indices = list(range(0,N_OBJECTS,5)) + logging.debug("too_short_indices %s" % self.too_short_indices) + + self.filtered_trips = [t for i, t in enumerate(self.all_trips) if i not in self.too_short_indices] + + self.all_bins = np.array(range(0,N_OBJECTS)).reshape(4,5) + logging.debug("bins are %s" % self.all_bins) + + self.after_filter_bins = np.array(range(0,16)).reshape(4,4) + self.after_filter_bins_all_trips_index = [[t for t in ob if t not in self.too_short_indices] for ob in self.all_bins] + logging.debug(f"before cutoff {self.after_filter_bins}") + + IGNORED = 0 + self.curr_sim = eamts.similarity(self.all_trips, IGNORED) + + + def tearDown(self): + pass + + def testEmptyDataFrame(self): + self.curr_sim = eamts.similarity([], 0) + self.assertEqual(self.curr_sim.get_result_labels().to_list(), []) + + def testEmptyTooShortDataFrame(self): + self.curr_sim = eamts.similarity([{"_id": boi.ObjectId()}], 0) + self.curr_sim.filtered_data = [] + self.assertEqual(self.curr_sim.get_result_labels().to_list(), [-1]) + + def testBeforeFiltering(self): + # before filtering, everything should be noise + result = self.curr_sim.get_result_labels() + # logging.debug(result) + self.assertEqual(result.to_list(), [-1] * len(self.all_trips)) + + def testBeforeFilteringAfterBinning(self): + # before filtering, but after binning + self.curr_sim.bins = self.all_bins + exp_result = [0] * 5 + [1] * 5 + [2] * 5 + [3] * 5 + self.assertEqual(self.curr_sim.get_result_labels().to_list(), exp_result) + + def testAfterFilteringBeforeBinning(self): + # after filtering, but before binning + self.curr_sim.filtered_data = self.filtered_trips + exp_result = pd.Series([-1] * 20) + exp_result.loc[self.too_short_indices] = -2 + # logging.debug(exp_result) + self.assertEqual(self.curr_sim.get_result_labels().to_list(), exp_result.to_list()) + + def testAfterFilteringAfterBinningBeforeCutoff(self): + # after filtering, after binning + self.curr_sim.filtered_data = self.filtered_trips + self.curr_sim.bins = self.after_filter_bins + exp_result = pd.Series([-1] * 20) + for i, b in enumerate(self.after_filter_bins_all_trips_index): + exp_result.loc[b] = i + exp_result.loc[self.too_short_indices] = -2 + self.assertEqual(self.curr_sim.get_result_labels().to_list(), exp_result.to_list()) + + def testBeforeFilteringAfterBinningAfterCutoff(self): + # drop the last two bins to simulate a cutoff + self.curr_sim.bins = self.all_bins[:-2] + exp_result = pd.Series([-1] * 20) + for i, b in enumerate([list(range(5)), list(range(5,10))]): + exp_result.loc[b] = i + self.assertEqual(self.curr_sim.get_result_labels().to_list(), exp_result.to_list()) + + def testAfterFilteringAfterBinningAfterCutoff(self): + # drop the last two bins to simulate a cutoff + self.curr_sim.filtered_data = self.filtered_trips + self.curr_sim.bins = self.after_filter_bins[:-2] + exp_result = pd.Series([-1] * 20) + for i, b in enumerate(self.after_filter_bins_all_trips_index[:-2]): + exp_result.loc[b] = i + exp_result.loc[self.too_short_indices] = -2 + self.assertEqual(self.curr_sim.get_result_labels().to_list(), exp_result.to_list()) + +if __name__ == '__main__': + etc.configLogging() + unittest.main() diff --git a/emission/tests/pipelineTests/TestLabelInferencePipeline.py b/emission/tests/pipelineTests/TestLabelInferencePipeline.py index 04236f9ad..97ad6982b 100644 --- a/emission/tests/pipelineTests/TestLabelInferencePipeline.py +++ b/emission/tests/pipelineTests/TestLabelInferencePipeline.py @@ -5,6 +5,7 @@ import time import emission.analysis.classification.inference.labels.pipeline as eacilp +import emission.analysis.classification.inference.labels.inferrers as eacili import emission.core.wrapper.labelprediction as ecwl import emission.storage.decorations.analysis_timeseries_queries as esda import emission.storage.timeseries.timequery as estt @@ -15,8 +16,8 @@ class TestLabelInferencePipeline(unittest.TestCase): # It is important that these functions be deterministic test_algorithms = { - ecwl.AlgorithmTypes.PLACEHOLDER_0: eacilp.placeholder_predictor_0, - ecwl.AlgorithmTypes.PLACEHOLDER_2: eacilp.placeholder_predictor_2 + ecwl.AlgorithmTypes.PLACEHOLDER_0: eacili.placeholder_predictor_0, + ecwl.AlgorithmTypes.PLACEHOLDER_2: eacili.placeholder_predictor_2 } def setUp(self): diff --git a/emission/tests/storageTests/TestTimeSeries.py b/emission/tests/storageTests/TestTimeSeries.py index 37dedaf83..519e4ce25 100644 --- a/emission/tests/storageTests/TestTimeSeries.py +++ b/emission/tests/storageTests/TestTimeSeries.py @@ -26,9 +26,12 @@ class TestTimeSeries(unittest.TestCase): def setUp(self): + self.testEmail = "user1" etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-21") self.testUUID1 = self.testUUID self.entries1 = self.entries + + self.testEmail = "user2" etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-27") def tearDown(self): diff --git a/emission/tests/storageTests/TestTripQueries.py b/emission/tests/storageTests/TestTripQueries.py index a905e1a07..310cb0a39 100644 --- a/emission/tests/storageTests/TestTripQueries.py +++ b/emission/tests/storageTests/TestTripQueries.py @@ -284,6 +284,90 @@ def testUserInputRealDataPostArrival(self): self.assertEqual(mc_trip_start_fmt_time_list, mode_fmt_times) self.assertEqual(pc_trip_start_fmt_time_list, purpose_fmt_times) + def testFilterLabelInputs(self): + import pandas as pd + + # Test invalid inputs + pd.testing.assert_frame_equal(esdt.filter_labeled_trips(pd.DataFrame()), + pd.DataFrame()) + with self.assertRaises(TypeError): + esdt.filter_labeled_trips(None) + + # Test valid inputs + + # no labeled + test_unlabeled_df = pd.DataFrame([{"user_input": {}}] * 3) + self.assertTrue(esdt.filter_labeled_trips(test_unlabeled_df).empty) + + # all labeled + test_labeled_df = pd.DataFrame([{"user_input": {"mode_confirm": "bike", "purpose_confirm": "shopping"}}] * 3) + pd.testing.assert_frame_equal(esdt.filter_labeled_trips(test_labeled_df), + test_labeled_df) + + # mixed labeled + test_mixed_df = pd.DataFrame(([{"user_input": {"mode_confirm": "bike", "purpose_confirm": "shopping"}}] * 3 + [{"user_input": {}}] * 3)) + result_df = pd.DataFrame([{"user_input": {"mode_confirm": "bike", "purpose_confirm": "shopping"}}] * 3) + pd.testing.assert_frame_equal(esdt.filter_labeled_trips(test_mixed_df), + result_df) + + def testExpandUserInputs(self): + import pandas as pd + import numpy as np + + # Test invalid inputs + pd.testing.assert_frame_equal(esdt.expand_userinputs(pd.DataFrame()), + pd.DataFrame()) + with self.assertRaises(TypeError): + esdt.expand_userinputs(None) + + # Test valid inputs + + # no labeled trips; no additional columns added + logging.debug("About to test unlabeled") + test_unlabeled_df = pd.DataFrame([{"user_input": {}}] * 3) + pd.testing.assert_frame_equal(esdt.expand_userinputs(test_unlabeled_df), + test_unlabeled_df) + + # all labeled; additional columns added with unstructured data + test_labeled_df = pd.DataFrame([{"user_input": {"mode_confirm": "bike", "purpose_confirm": "shopping"}}] * 3) + test_exp_result = pd.DataFrame([{"user_input": {"mode_confirm": "bike", "purpose_confirm": "shopping"}, "mode_confirm": "bike", "purpose_confirm" : "shopping"}] * 3) + logging.debug(test_exp_result) + pd.testing.assert_frame_equal(esdt.expand_userinputs(test_labeled_df), + test_exp_result) + + # mixed labeled; additional columns added but with some N/A + test_mixed_df = pd.DataFrame(([{"user_input": {"mode_confirm": "bike", "purpose_confirm": "shopping"}}] * 3 + [{"user_input": {}}] * 3)) + result_df = pd.DataFrame( + # Three expanded entries + [{"user_input": {"mode_confirm": "bike", "purpose_confirm": "shopping"}, "mode_confirm": "bike", "purpose_confirm" : "shopping"}] * 3 + + # Three partial entries + [{"user_input": {}}] * 3) + actual_result = esdt.expand_userinputs(test_mixed_df) + pd.testing.assert_frame_equal(actual_result, result_df) + # The last three entries are N/A + self.assertTrue(pd.isna(actual_result.loc[3, "mode_confirm"])) + + # mixed labeled with different columns; additional columns added but with some N/A + test_mixed_df = pd.DataFrame(( + [{"user_input": {"mode_confirm": "bike", "purpose_confirm": "shopping"}}] * 3 + + [{"user_input": {"mode_confirm": "bike", "purpose_confirm": "shopping", "replaced_mode": "running"}}] * 3 + + [{"user_input": {}}] * 3)) + result_df = pd.DataFrame( + # Three partially expanded entries + [{"user_input": {"mode_confirm": "bike", "purpose_confirm": "shopping"}, "mode_confirm": "bike", "purpose_confirm" : "shopping"}] * 3 + + # Three fully expanded entries + [{"user_input": {"mode_confirm": "bike", "purpose_confirm": "shopping", "replaced_mode": "running"}, "mode_confirm": "bike", "purpose_confirm" : "shopping", "replaced_mode": "running"}] * 3 + + # Three partial entries + [{"user_input": {}}] * 3) + actual_result = esdt.expand_userinputs(test_mixed_df) + pd.testing.assert_frame_equal(actual_result, result_df) + # The first three entries have N/A replaced mode + logging.debug(pd.isna(actual_result.loc[:2, "replaced_mode"])) + self.assertTrue(pd.isna(actual_result.loc[:2, "replaced_mode"]).all()) + # The last three entries have N/A for all expanded values + logging.debug(pd.isna(actual_result.loc[6:,["mode_confirm", "purpose_confirm", "replaced_mode"]])) + self.assertTrue(pd.isna(actual_result.loc[6:,["mode_confirm", "purpose_confirm", "replaced_mode"]].to_numpy().flatten()).all()) + if __name__ == '__main__': import emission.tests.common as etc etc.configLogging()