diff --git a/matcher/config/.gitkeep b/matcher/config/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/matcher/matcher/__init__.py b/matcher/matcher/__init__.py index 23eec486..e9dac349 100644 --- a/matcher/matcher/__init__.py +++ b/matcher/matcher/__init__.py @@ -1,6 +1,6 @@ # coding: utf-8 -__version__='0.0.1' +__version__ = '0.2' from matcher.tasks import do_match diff --git a/matcher/matcher/blocker.py b/matcher/matcher/blocker.py new file mode 100644 index 00000000..af186a2a --- /dev/null +++ b/matcher/matcher/blocker.py @@ -0,0 +1,61 @@ +# coding: utf-8 + +import datetime +import pandas as pd + +from matcher.logger import logger + + +class Blocker(): + + def __init__(self, blocking_rules:dict=None): + self.blocking_rules = blocking_rules + self.initialization_time = datetime.datetime.now() + self.run_start_time = None + self.num_blocks = None + self.block_keys = None + self.run_end_time = None + + def run(self, preprocessed_df:pd.DataFrame) -> pd.core.groupby.DataFrameGroupBy: + """ Take the preprocessed dataframe, and return a group dictionary of + dataframes where the keys are the blocking values and the values + are dataframes containing only records matching the key. + """ + self.run_start_time = datetime.datetime.now() + + if self.blocking_rules: + logger.debug(f"Blocking by {self.blocking_rules}") + grouped_df = preprocessed_df.groupby([ + self._unpack_blocking_rule(preprocessed_df, column_name, position) + for column_name, position + in self.blocking_rules.items() + ]) + + else: + logger.debug('No blocking rules passed. Matching all record pairs.') + block = pd.Series(data=['all']*len(preprocessed_df), index=preprocessed_df.index) + grouped_df = preprocessed_df.groupby(block) + + logger.debug(f"Blocking is done: got {len(grouped_df)} blocks.") + self.num_blocks = len(grouped_df) + self.block_keys = pd.DataFrame(grouped_df.keys).drop_duplicates().T + self.run_end_time = datetime.datetime.now() + + return grouped_df + + def _unpack_blocking_rule( + self, + df:pd.DataFrame, + column_name:str, + position:int + ) -> pd.Series: + """ Given a blocking rule and a dataframe, convert the relevant column + to a string and return the value of the key for each row. + """ + if position < 0: + return df[column_name].astype(str).str[position:] + elif position > 0: + return df[column_name].astype(str).str[:position] + else: + raise ValueError(f"I cannot split a string at this position: {position}") + diff --git a/matcher/matcher/cluster.py b/matcher/matcher/cluster.py index 591f1f2d..9399fd11 100644 --- a/matcher/matcher/cluster.py +++ b/matcher/matcher/cluster.py @@ -1,101 +1,77 @@ # coding: utf-8 -import pandas as pd +import datetime import numpy as np - - -from sklearn.cluster import DBSCAN - -import matcher.ioutils as ioutils +import pandas as pd +import sklearn from matcher.logger import logger -def cluster( - distances:pd.DataFrame, - eps:float=0.5, - min_samples:int=1, - algorithm:str='auto', - leaf_size:int=30, - n_jobs:int=1 -) -> pd.DataFrame: - """ Cluster the scored entities into individuals. Return the cluster ids - indexed with the source row_id. - """ - - logger.info('Beginning clustering.') - - clusterer = DBSCAN( - eps=eps, - min_samples=min_samples, - metric='precomputed', - metric_params=None, - algorithm=algorithm, - leaf_size=leaf_size, - p=None, - n_jobs=n_jobs - ) - - clusterer.fit(X=distances) - logger.info('Clustering done! Assigning matched ids.') - - return pd.Series( - index=distances.index, - data=clusterer.labels_ - ) - - -def square_distance_matrix(df:pd.DataFrame) -> pd.DataFrame: - # create a copy, swap the indicies - tmp_df = df.copy() - tmp_df.reset_index(inplace=True) - tmp_df.rename( - mapper={'matcher_index_left': 'matcher_index_right', 'matcher_index_right': 'matcher_index_left'}, - axis='columns', - inplace=True - ) - tmp_df.set_index(['matcher_index_left', 'matcher_index_right'], inplace=True) - - # concat original & df with swapped indices; square the matrix, filling in 0 distance for self-pairs - return pd.concat([df, tmp_df]).matches.unstack(level=-1, fill_value=0) - - -def generate_matched_ids( - distances:pd.DataFrame, - DF:pd.DataFrame, - clustering_params:dict, - base_data_directory:str, - match_job_id=str, - block_name='', -) -> pd.DataFrame: - - logger.info('Beginning clustering & id generation.') - distances = square_distance_matrix(distances) - ioutils.write_dataframe(distances.reset_index(), filepath=f'{base_data_directory}/match_cache/square_distances/{match_job_id}/{block_name}') - - ids = cluster( - distances, **clustering_params - ) - ioutils.write_dataframe(ids.reset_index(), filepath=f'{base_data_directory}/match_cache/raw_cluster_ids/{match_job_id}/{block_name}') - max_cluster_id = ids.max() - replacement_ids = pd.Series(range(max_cluster_id + 1, max_cluster_id + len(ids[ids == -1]) + 1), index=ids[ids==-1].index) - ids[ids == -1] = replacement_ids - logger.debug(f'IDs: {ids}') - logger.debug(f'Replaced noisy singleton ids with \n{replacement_ids}') - - logger.debug(f'Adding the block name ({block_name}) to the matched_ids.') - ids = block_name + ids.astype(str) - logger.debug(f'New IDs: \n{ids}') - - df = DF.copy() - - df['matched_id'] = ids - logger.info('Matched ids generated') - - return (df) - -def generate_singleton_id(df:pd.DataFrame, block_name:str) -> pd.DataFrame: - df['matched_id'] = block_name + '0' - logger.info(f'Singleton has id {df.matched_id.values[0]}') - return df +class Clusterer(): + + def __init__(self, clustering_algorithm=sklearn.cluster.DBSCAN, **kwargs): + self.kwargs = kwargs + self.clusterer = clustering_algorithm(**kwargs) + self.initialized_time = datetime.datetime.now() + self.square_distance_matrix = None + self.raw_cluster_ids = None + + def run(self, distances: pd.DataFrame) -> pd.DataFrame: + """ Cluster the scored entities into individuals. Return the cluster ids + indexed with the source row_id. + """ + self.run_start_time = datetime.datetime.now() + logger.info('Beginning clustering & id generation.') + + self.square_distance_matrix = self._square_distance_matrix(distances) + self.square_distance_matrix_created_time = datetime.datetime.now() + self.square_distance_matrix_dimensions = self.square_distance_matrix.shape + + logger.debug('Squared the distances. Beginning clustering.') + self.clusterer.fit(X=self.square_distance_matrix) + self.clusterer_fit_time = datetime.datetime.now() + + logger.debug('Clustering done! Assigning matched ids.') + ids = self._generate_ids( + index=self.square_distance_matrix.index.values, + labels=self.clusterer.labels_ + ).astype(str) + self.run_end_time = datetime.datetime.now() + + return ids + + def _square_distance_matrix(self, df: pd.DataFrame) -> pd.DataFrame: + # create a copy, swap the indicies + tmp_df = df.copy() + tmp_df.reset_index(inplace=True) + tmp_df.rename( + mapper={'matcher_index_left': 'matcher_index_right', 'matcher_index_right': 'matcher_index_left'}, + axis='columns', + inplace=True + ) + tmp_df.set_index(['matcher_index_left', 'matcher_index_right'], inplace=True) + + # concat original & df with swapped indices; + # square (unstack) the matrix, filling in 0 distance for self-pairs + return pd.concat([df, tmp_df]).score.unstack(level=-1, fill_value=0) + + def _generate_ids(self, index: pd.Index, labels: np.array) -> pd.Series: + logger.debug(f'index {len(index)}') + logger.debug(f'labels {len(labels)}') + ids = pd.Series(index=index, data=labels, name='matched_id') + self.raw_cluster_ids = ids.copy() + logger.debug(f'ids {ids}') + max_cluster_id = ids.max() + self.num_clusters = int(max_cluster_id) + self.num_noisy_clusters = len(ids[ids == -1]) + + replacement_ids = pd.Series( + data=range(max_cluster_id + 1, max_cluster_id + len(ids[ids == -1]) + 1), + index=ids[ids == -1].index + ) + ids[ids == -1] = replacement_ids + logger.info('Matched ids generated') + + return ids diff --git a/matcher/matcher/contraster.py b/matcher/matcher/contraster.py index 2d01b8f8..ce95038e 100644 --- a/matcher/matcher/contraster.py +++ b/matcher/matcher/contraster.py @@ -10,7 +10,7 @@ import matcher.utils as utils -def truncate_string(s:pd.Series, n:int) -> pd.Series: +def truncate_string(s: pd.Series, n: int) -> pd.Series: if n > 0: s = s.astype(str).apply(lambda x: x[:n]) elif n < 0: @@ -20,19 +20,19 @@ def truncate_string(s:pd.Series, n:int) -> pd.Series: return s -def compare_exact_n_chars(s1:pd.Series, s2:pd.Series, n:int) -> pd.Series: +def compare_exact_n_chars(s1: pd.Series, s2: pd.Series, n: int) -> pd.Series: logger.debug(f'Doing an exact comparison of {n} characters') s1 = truncate_string(s1, n) s2 = truncate_string(s2, n) return (s1 == s2).astype(float) -def lists_share_any_values(l1:pd.Series, l2:pd.Series) -> pd.Series: +def lists_share_any_values(l1: pd.Series, l2: pd.Series) -> pd.Series: df = pd.concat([l1, l2], axis=1, keys=['l1','l2']) return df.apply(lambda row: any(i in row.l2 for i in row.l1), axis=1).astype(float) -def lists_share_all_values(l1:pd.Series, l2:pd.Series) -> pd.Series: +def lists_share_all_values(l1: pd.Series, l2: pd.Series) -> pd.Series: df = pd.concat([l1, l2], axis=1, keys=['l1','l2']) l1_all = df.apply(lambda row: all(i in row.l2 for i in row.l1), axis=1) l2_all = df.apply(lambda row: all(i in row.l1 for i in row.l2), axis=1) @@ -40,50 +40,87 @@ def lists_share_all_values(l1:pd.Series, l2:pd.Series) -> pd.Series: class Contraster: - def __init__(self, config:dict): + def __init__(self, config: dict): self.contraster = rl.Compare() self.config = config - self.metadata = {'contraster_initialization_time': datetime.datetime.now()} + self.initialized_time = datetime.datetime.now() - def compare_exact(self, col_name:str, n_chars=None): + def compare_exact(self, col_name: str, n_chars: int = None): logger.debug(f'Doing an exact comparison on {col_name}') if n_chars is not None: logger.debug(f'Doing an exact comparison of {n_chars} characters of {col_name}') - self.contraster.compare_vectorized(compare_exact_n_chars, col_name, col_name, n_chars['n_chars'], label=f"{col_name}_exact_{n_chars['n_chars']}_distance") + self.contraster.compare_vectorized( + comp_func=compare_exact_n_chars, + labels_left=col_name, + labels_right=col_name, + n=n_chars['n_chars'], + label=f"{col_name}_exact_{n_chars['n_chars']}_distance" + ) else: logger.debug(f'Doing an exact comparison of all characters in {col_name}') - self.contraster.exact(col_name, col_name, label=f'{col_name}_exact_distance') + self.contraster.exact( + left_on=col_name, + right_on=col_name, + label=f'{col_name}_exact_distance' + ) - def compare_string_distance(self, col_name:str, args:dict): + def compare_string_distance(self, col_name: str, args: dict): logger.debug(f'Doing a comparison of {col_name} using {args}') - self.contraster.string(col_name, col_name, label=f'{col_name}_{utils.convert_dict_to_str(args)}_distance', **args) - - def compare_swap_month_days(self, col_name:str, args:dict): + self.contraster.string( + left_on=col_name, + right_on=col_name, + label=f'{col_name}_{utils.convert_dict_to_str(args)}_distance', + **args + ) + + def compare_swap_month_days(self, col_name: str, args: dict): logger.debug(f'Checking if the month and day are swapped in {col_name}') - self.contraster.date(col_name, col_name, label=f'{col_name}_swap_month_days_distance', **args) - - def compare_numeric_distance(self, col_name:str, args:dict): + self.contraster.date( + left_on=col_name, + right_on=col_name, + label=f'{col_name}_swap_month_days_distance', + **args + ) + + def compare_numeric_distance(self, col_name: str, args: dict): logger.debug(f'Doing a numeric distance calculation on {col_name}') - self.contraster.date(col_name, col_name, label=f'{col_name}_numeric_{utils.convert_dict_to_str(args)}_distance', **args) - - def compare_list(self, col_name:str, args:dict): + self.contraster.numeric( + left_on=col_name, + right_on=col_name, + label=f'{col_name}_numeric_{utils.convert_dict_to_str(args)}_distance', + **args + ) + + def compare_list(self, col_name: str, args: dict): if args['method'] == 'any': logger.debug(f'Checking if {col_name} shares any value.') - self.contraster.compare_vectorized(lists_share_any_values, col_name, col_name, label=f'{col_name}_any_list_item_distance') + self.contraster.compare_vectorized( + comp_func=lists_share_any_values, + labels_left=col_name, + labels_right=col_name, + label=f'{col_name}_any_list_item_distance' + ) + elif args['method'] == 'all': logger.debug(f'Checking if {col_name} shares all values.') - self.contraster.compare_vectorized(lists_share_all_values, col_name, col_name, label=f'{col_name}_all_list_items_distance') + self.contraster.compare_vectorized( + comp_func=lists_share_all_values, + labels_left=col_name, + labels_right=col_name, + label=f'{col_name}_all_list_items_distance' + ) + else: - raise ValueError(f"I don't know how to compare lists with this method ({method}). Please send me 'all' or 'any'.") + raise ValueError(f"I don't know how to compare lists with method {method}. Please send me 'all' or 'any'.") - def make_contrast_metadata(self, contrasts): - contrast_metadata = {} + def describe_contrasts(self, contrasts): + contrast_descriptives = {} for column in contrasts.columns: logger.debug(f'Making you some stats about {column}') - contrast_metadata[column] = utils.summarize_column(contrasts[column]) - self.metadata['contrasts'] = contrast_metadata + contrast_descriptives[column] = utils.summarize_column(contrasts[column]) + self.contrast_descriptives = contrast_descriptives - def run(self, pairs:pd.MultiIndex, df:pd.DataFrame) -> pd.DataFrame: + def run(self, pairs: pd.MultiIndex, df: pd.DataFrame) -> pd.DataFrame: """ Read the config and make the required contrasts. The config dictionary keys are column names. The values define @@ -94,9 +131,9 @@ def run(self, pairs:pd.MultiIndex, df:pd.DataFrame) -> pd.DataFrame: We will loop over the column names and the contrast definitions and call the appropriate method for each. """ - self.metadata['contraster_start_time'] = datetime.datetime.now() + self.run_start_time = datetime.datetime.now() logger.debug(f'Making the following contrasts: \n{self.config}') - + for col_name, contrast_definitions in self.config.items(): logger.debug(f'Found the following contrasts for {col_name}: \n{contrast_definitions}') @@ -111,13 +148,14 @@ def run(self, pairs:pd.MultiIndex, df:pd.DataFrame) -> pd.DataFrame: else: logger.debug(f"Found no arguments for {col_name} {contrast_definition['method']}.") contrast_method(col_name) - + logger.debug('Running all those contrasts!') contrasts = self.contraster.compute(pairs, df) - - self.make_contrast_metadata(contrasts) + logger.debug(f'Contrasts dataframe has dimensions {contrasts.shape}') + self.describe_contrasts(contrasts) - self.metadata['contraster_end_time'] = datetime.datetime.now() + self.contrast_dataframe_dimensions = contrasts.shape + self.run_end_time = datetime.datetime.now() return contrasts diff --git a/matcher/matcher/ioutils.py b/matcher/matcher/ioutils.py deleted file mode 100644 index eeb6b8af..00000000 --- a/matcher/matcher/ioutils.py +++ /dev/null @@ -1,188 +0,0 @@ -# coding: utf-8 - -import os -from os.path import dirname - -import pandas as pd -import s3fs -import yaml -from urllib.parse import urlparse -from contextlib import contextmanager - -from matcher.logger import logger -from matcher import utils - -# load dotenv -from dotenv import load_dotenv -APP_ROOT = os.path.join(os.path.dirname(__file__), '..') -dotenv_path = os.path.join(APP_ROOT, '.env') -load_dotenv(dotenv_path) - - -@contextmanager -def open_sesame(path, *args, **kwargs): - """Opens files either on s3 or a filesystem according to the path's scheme - - Uses s3fs so boto3 is used. - This means mock_s3 can be used for tests, instead of the mock_s3_deprecated - """ - path_parsed = urlparse(path) - scheme = path_parsed.scheme # If '' or 'file' then a regular file; if 's3' then 's3' - - if not scheme or scheme == 'file': # Local file - os.makedirs(dirname(path), exist_ok=True) - with open(path, *args, **kwargs) as f: - yield f - elif scheme == 's3': - s3 = s3fs.S3FileSystem() - with s3.open(path, *args, **kwargs) as f: - yield f - - -def load_data_for_matching(base_data_directory:str, event_types:list, keys:list, match_job_id:str) -> list: - # We will frame the record linkage problem as a deduplication problem - logger.debug(f'Loading data for event types: {event_types}') - try: - df = pd.concat([load_one_event_type(base_data_directory, event_type, keys, match_job_id) for event_type in event_types]) - except ValueError as e: - if str(e) != "All objects passed were None": - raise - else: - logger.debug('Found no events data.') - raise ValueError(f'No merged data files found for any event type ({event_types}) in {base_data_directory}.') - logger.debug(f'Number of deduped events: {len(df)}') - - ## and the match_job_id - df['match_job_id'] = match_job_id - - # Which event types did we read successfully? - event_types_read = df.event_type.drop_duplicates().values - - ## TODO: Check the definition of keys - # Drop duplicates, disregarding event type - df = df.drop('event_type', axis=1) - df = df.drop_duplicates(subset=keys) - - logger.debug(f"The loaded dataframe has the following columns: {df.columns}") - logger.debug(f"The dimensions of the loaded dataframe is: {df.shape}") - logger.debug(f"The indices of the loaded dataframe are {df.index}") - logger.debug(f'The loaded has {len(df)} rows and {len(df.index.unique())} unique indices') - logger.debug(f'The loaded dataframe has the following duplicate indices: {df[df.index.duplicated()].index.values}') - - # Cache read data - write_dataframe(df=df.reset_index(), filepath=f'{base_data_directory}/match_cache/loaded_data/{match_job_id}') - - return df, event_types_read - - -def load_one_event_type(base_data_directory:str, event_type:str, keys:list, match_job_id:str) -> pd.DataFrame: - logger.info(f'Loading {event_type} data for matching from {base_data_directory}.') - - try: - df = read_merged_data(base_data_directory, event_type, keys) - - # Dropping columns that we don't need for matching - df = df[keys] - - # Keeping track of the event_type - df['event_type'] = event_type - - logger.info(f'{event_type} data loaded from S3.') - - return df - - except FileNotFoundError as e: - logger.info(f'No merged file found for {event_type} in {base_data_directory}. Skipping.') - pass - - -def read_merged_data(base_data_directory:str, event_type:str, keys:list) -> pd.DataFrame: - # Read the data in and select the necessary columns - merged_filepath = f'{base_data_directory}/{event_type}/merged' - logger.info(f"Reading data from {merged_filepath}") - df = pd.read_csv(merged_filepath, sep='|') - - df['person_index'] = utils.concatenate_person_index(df, keys) - df.set_index('person_index', drop=True, inplace=True) - - return df - - -def write_matched_data( - matches:pd.DataFrame, - base_data_directory:str, - person_keys:list, - schema_pk_lookup:dict, - match_job_id:str -) -> dict: - write_dataframe(df=matches.reset_index(), filepath=f'{base_data_directory}/match_cache/matcher_results/{match_job_id}') - matched_results_paths = {} - logger.debug(schema_pk_lookup) - for event_type, primary_keys in schema_pk_lookup.items(): - logger.info(f'Writing matched data for {base_data_directory} {event_type}') - matched_results_paths[event_type] = write_one_event_type( - df=matches, - base_data_directory=base_data_directory, - event_type=event_type, - person_keys=person_keys, - primary_keys=primary_keys, - match_job_id=match_job_id - ) - - return matched_results_paths - - -def write_one_event_type( - df:pd.DataFrame, - base_data_directory:str, - event_type:str, - person_keys:list, - primary_keys:list, - match_job_id:str -) -> str: - # Join the matched ids to the source data - logger.info(f'Joining matches to merged data for {event_type}') - df = join_matched_and_merged_data(df, base_data_directory, event_type, person_keys, primary_keys) - - # Cache the current match to S3 - logger.info(f'Writing data for {base_data_directory} {event_type} to S3.') - write_dataframe(df=df, filepath=f'{base_data_directory}/{event_type}/matches/{match_job_id}') - write_dataframe(df=df, filepath=f'{base_data_directory}/{event_type}/matched') - - return f'{base_data_directory}/{event_type}/matched' - - -def join_matched_and_merged_data( - right_df:pd.DataFrame, - base_data_directory:str, - event_type:str, - person_keys:list, - primary_keys:list -) -> pd.DataFrame: - left_df=read_merged_data(base_data_directory, event_type, person_keys)[primary_keys] - - df = left_df.merge( - right=right_df['matched_id'].to_frame(), - left_index=True, - right_index=True, - copy=False, - validate='many_to_one' - ) - logger.info(f'Joined match ids to merged data for {event_type}') - - return df - - -def write_dataframe(df:pd.DataFrame, filepath:str) -> None: - with open_sesame(filepath, 'wb') as fout: - fout.write(df.to_csv(sep='|', index=False).encode()) - - logger.info(f'Wrote data to {filepath}') - - -def write_dict_to_yaml(dict_to_write:dict, filepath:str): - logger.debug(f'Writing some dictionary data to {filepath}! Oooooo!') - with open_sesame(filepath, 'wb') as fout: - fout.write(yaml.dump(dict_to_write).encode()) - logger.info(f'Wrote metadata to {filepath}') - diff --git a/matcher/matcher/matcher.py b/matcher/matcher/matcher.py index c8e158e6..96a13db3 100644 --- a/matcher/matcher/matcher.py +++ b/matcher/matcher/matcher.py @@ -2,14 +2,16 @@ import pandas as pd import datetime +import hashlib +import json import numpy as np +import sklearn from typing import List -import matcher.contraster as contraster -import matcher.rules as rules -import matcher.cluster as cluster -import matcher.ioutils as ioutils +from matcher.contraster import Contraster +from matcher.scorer import Scorer +from matcher.cluster import Clusterer import matcher.utils as utils from matcher.logger import logger @@ -18,83 +20,63 @@ class Matcher: - def __init__(self, base_data_directory:str, match_job_id:str, clustering_rules:dict, contrast_rules, blocking_rules:dict=None): - self.clustering_rules = clustering_rules - self.base_data_directory = base_data_directory - self.match_job_id = match_job_id - self.contrast_rules = contrast_rules - self.blocking_rules = blocking_rules - self.metadata = {'matcher_initialization_time': datetime.datetime.now()} - - def block_and_match(self, df): - ## We will split-apply-combinei - logger.debug(f'df sent to block-and-match has the following columns: {df.dtypes}') - logger.info(f'Blocking by {self.blocking_rules}') - grouped = df.groupby([utils.unpack_blocking_rule(df, column_name, position) for column_name, position in self.blocking_rules.items()]) - logger.info(f'Applying matcher to {len(grouped)} blocks.') - all_block_metadata = {} - - matches = {} - - for key, group in grouped: - logger.debug(f"Matching group {key} of size {len(group)}") - - if len(group) > 1: - matches[key], block_metadata = self.match(group, key) - else: - block_metadata = { - 'size': 1, - 'n_pairs': 0, - 'contrasts': None, - 'scores': None - } - logger.debug(f"Group {key} only has one record, making a singleton id") - matches[key] = cluster.generate_singleton_id(group, str(key)) - - logger.debug('Wrapping up block') - all_block_metadata[key] = block_metadata - - logger.debug('All blocks done! Yehaw!') - self.metadata['blocks'] = all_block_metadata - return pd.concat(matches.values()) - - def match(self, df:pd.DataFrame, key='all') -> pd.DataFrame: + def __init__( + self, + contraster:Contraster, + scorer:Scorer, + clusterer:Clusterer + ): + self.contraster = contraster + self.scorer = scorer + self.clusterer = clusterer + self.initialization_time = datetime.datetime.now() + self.run_start_time = None + self.run_end_time = None + self.contrasts = None + + @property + def square_distance_matrix(self): + return self.clusterer.square_distance_matrix + + @property + def raw_cluster_ids(self): + return self.clusterer.square_distance_matrix + + def run(self, df:pd.DataFrame) -> pd.DataFrame: + self.run_start_time = datetime.datetime.now() - metadata = { - 'size': len(df) - } - logger.debug('Indexing the data for matching!') - indexer = rl.FullIndex() - pairs = indexer.index(df) - metadata['n_pairs'] = len(pairs) - logger.debug(f"Number of pairs: {metadata['n_pairs']}") - - logger.debug(f"Initializing contrasting") - contraster_obj = contraster.Contraster(self.contrast_rules) - contrasts = contraster_obj.run(pairs, df) - metadata['contraster_metadata'] = contraster_obj.metadata - logger.debug(f"Contrasts created") - - contrasts.index.rename(['matcher_index_left', 'matcher_index_right'], inplace=True) - contrasts = rules.compactify(contrasts, operation='mean') - logger.debug('Summary distances generated. Making you some stats about them.') - metadata['scores'] = utils.summarize_column(contrasts.matches) - logger.debug('Caching those contrasts and distances for you.') - ioutils.write_dataframe(contrasts.reset_index(), filepath=f'{self.base_data_directory}/match_cache/contrasts/{self.match_job_id}/{key}') - - logger.debug(f"Contrasts dataframe size: {contrasts.shape}") - logger.debug(f"Contrasts data without duplicated indexes: {contrasts[~contrasts.index.duplicated(keep='first')].shape}") - logger.debug("Duplicated keys:") - logger.debug(f"{contrasts[contrasts.index.duplicated(keep=False)]}") - - matches = cluster.generate_matched_ids( - distances=contrasts, - DF=df, - clustering_params=self.clustering_rules, - base_data_directory=self.base_data_directory, # at some point, we may want to consider making the matcher into a class - match_job_id=self.match_job_id, # rather than passing around keys, match_job_ids, base_data_directorys, etc. - block_name=str(key) - ) - - return matches, metadata + # If there's only 1 record, nothing to link; make a matched_id and exit + if len(df) == 1: + logger.debug(f"Dataframe only has one record, making a singleton id") + matches = pd.Series(data=[1], index=df.index) + + # If there's more than one record, start linking records + else: + logger.debug('Indexing the data for matching!') + indexer = rl.FullIndex() + pairs = indexer.index(df) + self.n_pairs = len(pairs) + logger.debug(f"Indexing done. Number of pairs: {self.n_pairs}") + + logger.debug(f"Initializing contrasting!") + contrasts = self.contraster.run(pairs, df) + contrasts.index.rename( + ['matcher_index_left', 'matcher_index_right'], inplace=True + ) + logger.debug(contrasts) + logger.debug("Contrasts created!") + + logger.debug('Scoring the distances between records.') + contrasts = self.scorer.run(contrasts) + self.contrasts = contrasts.copy() + logger.debug('Caching those contrasts and distances for you.') + logger.debug('Scores created.') + + logger.debug('Clustering records!') + matches = self.clusterer.run(distances=self.contrasts) + logger.debug('Clustering done. Wrapping up matching.') + + self.run_end_time = datetime.datetime.now() + + return matches diff --git a/matcher/matcher/pipeline.py b/matcher/matcher/pipeline.py new file mode 100644 index 00000000..aa8d6057 --- /dev/null +++ b/matcher/matcher/pipeline.py @@ -0,0 +1,96 @@ +# coding: utf-8 + +import copy +import datetime +import hashlib +import importlib +import json +from typing import List + +import pandas as pd +import numpy as np +import sklearn + +import matcher.blocker as blocker +import matcher.contraster as contraster +import matcher.scorer as scorer +import matcher.cluster as cluster +import matcher.matcher as matcher +import matcher.utils as utils + +from matcher.logger import logger + + +class Pipeline: + def __init__(self, config:dict, cache): + self.config = config + self.cache = cache + self.initialization_time = datetime.datetime.now() + self.run_start_time = None + self.matchers = {} + self.matches = None + self.run_end_time = None + self.initialize_components() + + def initialize_components(self): + if 'blocking_rules' in self.config.keys(): + blocking_rules = self.config['blocking_rules'] + else: + blocking_rules = None + self.blocker = blocker.Blocker(blocking_rules) + + self.contraster = contraster.Contraster(self.config['contrasts']) + + self.scorer = scorer.Scorer(operation=self.config['scorer']['operation']) + + module_name, class_name = self.config['clusterer']['algorithm'].rsplit(".", 1) + module = importlib.import_module(module_name) + cls = getattr(module, class_name) + self.clusterer = cluster.Clusterer( + clustering_algorithm=cls, + **self.config['clusterer']['args'] + ) + + self.base_matcher = matcher.Matcher( + self.contraster, + self.scorer, + self.clusterer + ) + + def cache_dataframe(self, df_type:str) -> None: + logger.debug(f'Trying to cache the {df_type} for you.') + data_to_cache = [] + + for block_name, matcher in self.matchers.items(): + logger.debug(f'Getting the {df_type} for block {block_name}') + df = getattr(matcher, df_type) + if df is not None: + df['block'] = block_name + data_to_cache.append(df) + + self.cache.cache_matcher_data(pd.concat(data_to_cache), df_type) + + def run(self, df:pd.DataFrame): + self.run_start_time = datetime.datetime.now() + logger.info('Matcher run started! Vroom vroom!') + + logger.info('Starting blocking!') + blocks = self.blocker.run(df) + + logger.info(f'Blocking done! Starting matching {len(blocks)} blocks.') + matches = [] + for key, block in blocks: + logger.debug(f"Matching group {key} of size {len(block)}") + matcher = copy.deepcopy(self.base_matcher) + matches.append(''.join(key) + matcher.run(df=block).astype(str)) + self.matchers[''.join(key)] = matcher + logger.debug(self.matchers) + + for df_type in ['contrasts', 'square_distance_matrix', 'raw_cluster_ids']: + self.cache_dataframe(df_type) + + matches = pd.DataFrame({'matched_id': pd.concat(matches)}) + self.matches = matches + + self.run_end_time = datetime.datetime.now() + diff --git a/matcher/matcher/preprocess.py b/matcher/matcher/preprocess.py index add3e542..e49f2556 100644 --- a/matcher/matcher/preprocess.py +++ b/matcher/matcher/preprocess.py @@ -2,14 +2,13 @@ import pandas as pd -import matcher.ioutils as ioutils - from matcher.logger import logger -def preprocess(df:pd.DataFrame, match_job_id:str, base_data_directory:str) -> pd.DataFrame: +def preprocess(df:pd.DataFrame, cache) -> pd.DataFrame: # full_name - # full name is only given if name parts are not. maybe we should do some preprocessing on full names to create - # name parts and use only the name parts, especially since it is possible for the jail and HMIS systems to + # full name is only given if name parts are not. maybe we should do some + # preprocessing on full names to create name parts and use only the name + # parts, especially since it is possible for the jail and HMIS systems to # differ on what they use # prefix @@ -46,20 +45,23 @@ def preprocess(df:pd.DataFrame, match_job_id:str, base_data_directory:str) -> pd df['dob'] = pd.to_datetime(df['dob']) # ssn - # THIS SHOULD BE CONVERTED TO STRING. The SSN consists of 3 words, and numerical distances are only - # VAGUELY meaningful (e.g., the first 3 digits increase roughly east to west but not in a rigorous way, - # and the second 2 digits are given out in a fixed but non-monotonic order) - # the first three digits are the "area code" of where the person was registered. - # most people living in an area will have one of a few local area codes; therefore, the distinctiveness - # of the area code may be useful for matching. we may want to preprocess ssn to extract the area code - # to make this comparison. + # THIS SHOULD BE CONVERTED TO STRING. The SSN consists of 3 words, and + # numerical distances are only VAGUELY meaningful (e.g., the first 3 digits + # increase roughly east to west but not in a rigorous way, and the second 2 + # digits are given out in a fixed but non-monotonic order), and the first + # three digits are the "area code" of where the person was registered. Most + # people living in an area will have one of a few local area codes; + # therefore, the distinctiveness of the area code may be useful for + # matching. we may want to preprocess ssn to extract the area code to make + # this comparison. if 'ssn' in df.columns: logger.debug('Converting social security number to str') df['ssn'] = df['ssn'].astype(str) # dmv_number - # THIS SHOULD BE CAST TO STRING. In some jurisdictions, they are strings and in others ints. To ensure - # that we can generalize here, we need to convert to string for all of them. + # THIS SHOULD BE CAST TO STRING. In some jurisdictions, they are strings + # and in others ints. To ensure that we can generalize here, we need to + # convert to string for all of them. if 'dmv_number' in df.columns: logger.debug('Converting dmv number to str') df['dmv_number'] = df['dmv_number'].astype(str) @@ -73,13 +75,14 @@ def preprocess(df:pd.DataFrame, match_job_id:str, base_data_directory:str) -> pd logger.debug(f"Races observed in preprocessed df: {df['race']}") # ethnicity - # ethnicity encodes only Hispanic/Not Hispanic. for some databases, Hispanic is actually included - # in the race categories instead of in a separate field. we may want to do some pre-processing to - # to add H to the race list where the ethnicity field contains 'Hispanic' + # ethnicity encodes only Hispanic/Not Hispanic. for some databases, + # Hispanic is actually included in the race categories instead of in a + # separate field. we may want to do some pre-processing to add H to the + # race list where the ethnicity field contains 'Hispanic' logger.info('Preprocessing done!') logger.debug(f"The preprocessed dataframe has the following columns: {df.columns}") logger.debug(f"The preprocessed dimensions of the dataframe is: {df.shape}") - ioutils.write_dataframe(df.reset_index(), filepath=f'{base_data_directory}/match_cache/preprocessed_data/{match_job_id}') + cache.cache_matcher_data(df, 'preprocessed_data') return df diff --git a/matcher/matcher/rules.py b/matcher/matcher/rules.py deleted file mode 100644 index 50751c92..00000000 --- a/matcher/matcher/rules.py +++ /dev/null @@ -1,41 +0,0 @@ -# coding: utf-8 - -import pandas as pd -import numpy as np - -from sklearn.preprocessing import MinMaxScaler - -def exact(df:pd.DataFrame) -> pd.DataFrame: - """ - Checks if the two rows are an exact match. The result will be stored in the column "matches" - """ - - df["matches"] = df.loc[:, df.columns.str.startswith("exact_distance_in")].apply(lambda row: np.all(row), axis=1) - - return df - - -def compactify(df:pd.DataFrame, operation='sum', reverse=True) -> pd.DataFrame: - - if operation == 'sum': - df["matches"] = df.apply(lambda row: np.sum(row), axis=1) - elif operation == 'norm': - df["matches"] = df.apply(lambda row: np.linalg.norm(row), axis=1) - elif operation == 'mean': - df['matches'] = df.apply(lambda row: np.mean(row), axis=1) - else: - print(f"Operation {operation} not supported") - df["matches"] = np.nan - - if reverse: - df['matches'] = 1 - df['matches'] - - return df - - -def scale(df:pd.DataFrame, score_column='matches', min=0, max=1) -> pd.DataFrame: - scaler = MinMaxScaler(feature_range=(min, max)) - df = pd.DataFrame(scaler.fit_transform(df), index = df.index, columns=df.columns) - - return df - diff --git a/matcher/matcher/scorer.py b/matcher/matcher/scorer.py new file mode 100644 index 00000000..a3e39770 --- /dev/null +++ b/matcher/matcher/scorer.py @@ -0,0 +1,44 @@ +# coding: utf-8 + +import datetime +import pandas as pd +import numpy as np + +import matcher.utils as utils + +from matcher.logger import logger + + +class Scorer(): + def __init__(self, operation='mean', reverse=True): + self.operation = operation + self.reverse = reverse + self.initialized_time = datetime.datetime.now() + + def run(self, contrasted_df: pd.DataFrame) -> pd.DataFrame: + self.run_start_time = datetime.datetime.now() + + logger.debug(f'Scoring record pairs with operation {self.operation}') + + scored_df = contrasted_df.copy() + logger.debug('Made a copy of contrasts') + if self.operation == 'sum': + scored_df['score'] = scored_df.apply(lambda row: np.sum(row), axis=1) + elif self.operation == 'norm': + scored_df['score'] = scored_df.apply(lambda row: np.linalg.norm(row), axis=1) + elif self.operation == 'mean': + scored_df['score'] = scored_df.apply(lambda row: np.mean(row), axis=1) + logger.debug('Made scores') + else: + raise ValueError(f'Scoring operation {operation} not supported.') + scored_df['score'] = np.nan + + if self.reverse: + scored_df['score'] = 1 - scored_df['score'] + logger.debug('Reversed scores') + self.scores = scored_df['score'] + self.score_descriptives = utils.summarize_column(scored_df.score) + self.run_end_time = datetime.datetime.now() + + return scored_df + diff --git a/matcher/matcher/storage.py b/matcher/matcher/storage.py new file mode 100644 index 00000000..86fe5a65 --- /dev/null +++ b/matcher/matcher/storage.py @@ -0,0 +1,205 @@ +# coding: utf-8 + +import io +import os +from os.path import dirname + +import pandas as pd +import s3fs +import yaml +from urllib.parse import urlparse +from contextlib import contextmanager + +from matcher.logger import logger +from matcher import utils + +# load dotenv +from dotenv import load_dotenv +APP_ROOT = os.path.join(os.path.dirname(__file__), '..') +dotenv_path = os.path.join(APP_ROOT, '.env') +load_dotenv(dotenv_path) + + +class Store(object): + def __init__(self, base_data_directory): + self.base_data_directory = base_data_directory + + def load(self, path:str): + with self._open(path) as f: + return f.read() + + def write_dataframe(self, df:pd.DataFrame, path:str) -> None: + with self._open(path, 'wb') as fout: + fout.write(df.to_csv(sep='|', index=False).encode()) + + logger.info(f'Wrote data to {path}.') + + @contextmanager + def _open(self, path, *args, **kwargs): + """Opens files either on s3 or a filesystem according to the path's + scheme. + """ + full_path = f'{self.base_data_directory}/{path}' + logger.debug(f'Opening {full_path}') + path_parsed = urlparse(full_path) + scheme = path_parsed.scheme # If '' or 'file' then a regular file; if 's3' then 's3' + + if not scheme or scheme == 'file': # Local file + os.makedirs(dirname(full_path), exist_ok=True) + with open(full_path, *args, **kwargs) as f: + yield f + elif scheme == 's3': + s3 = s3fs.S3FileSystem() + with s3.open(full_path, *args, **kwargs) as f: + yield f + + +class Cache(object): + def __init__( + self, + store, + match_job_id:str, + loaded_data:bool=False, + preprocessed_data:bool=False, + contrasts:bool=False, + square_distance_matrix:bool=False, + raw_cluster_ids:bool=False, + matcher_results:bool=False + ): + self.store = store + self.match_job_id = match_job_id + self.loaded_data = loaded_data + self.preprocessed_data = preprocessed_data + self.contrasts = contrasts + self.square_distance_matrix = square_distance_matrix + self.raw_cluster_ids = raw_cluster_ids + self.matcher_results = matcher_results + + def _cache(self, df:pd.DataFrame, path:str) -> None: + self.store.write_dataframe(df=df, path=f'{path}/{self.match_job_id}') + + def cache_matcher_data(self, df, df_type) -> None: + if getattr(self, df_type): + logger.info(f'Caching {df_type}.') + self._cache(df=df.reset_index(), path=f'match_cache/{df_type}') + else: + logger.info(f'Skipping cache of {df_type}; flag not set.') + + def cache_events(self, df, event_type): + self._cache(df=df, path=f'{event_type}/matches') + + +class MatcherServiceStore(object): + def __init__( + self, + store, + cache, + schema_pk_lookup:dict, + matching_keys:list + ): + self.store = store + self.cache = cache + self.schema_pk_lookup = schema_pk_lookup + self.matching_keys = matching_keys + self.event_types_read = [] + + def load_data_for_matching(self) -> list: + logger.debug(f'Loading data for event types: {self.schema_pk_lookup.keys()}') + try: + df = pd.concat([self._try_loading_event_data(event_type) for event_type in self.schema_pk_lookup.keys()]) + except ValueError as e: + if str(e) == 'All objects passed were None': + logger.debug('Found no events data.') + raise ValueError(f'No merged data found for any event type ({self.schema_pk_lookup.keys()}).') + else: + raise + logger.debug(f'Number of events: {len(df)}') + + ## add the match_job_id + df['match_job_id'] = self.cache.match_job_id + + # Drop duplicates, disregarding event type + df = df.drop_duplicates(subset=self.matching_keys) + + logger.debug(f'The loaded dataframe has the following columns: {df.columns}') + logger.debug(f'The dimensions of the loaded and deduped dataframe are: {df.shape}') + logger.debug(f'The indices of the loaded dataframe are {df.index}') + + # Cache read data + self.cache.cache_matcher_data(df=df, df_type='loaded_data') + + return df + + def _try_loading_event_data(self, event_type:str) -> pd.DataFrame: + logger.info(f'Loading {event_type} data for matching.') + + try: + df = self._read_merged_data(event_type) + + # Dropping columns that we don't need for matching + df = df[self.matching_keys] + + logger.info(f'{len(df)} events loaded for {event_type}.') + self.event_types_read.append(event_type) + + return df + + except FileNotFoundError as e: + logger.info(f'No merged file found for {event_type}. Skipping.') + pass + + def _read_merged_data(self, event_type:str) -> pd.DataFrame: + merged_filepath = f'{event_type}/merged' + logger.info(f'Reading data from {merged_filepath}') + df = pd.read_csv(io.BytesIO(self.store.load(merged_filepath)), sep='|') + + df['person_index'] = utils.concatenate_person_index(df, self.matching_keys) + df.set_index('person_index', drop=True, inplace=True) + + return df + + def write_matched_data(self, matches:pd.DataFrame) -> dict: + self.cache.cache_matcher_data(df=matches, df_type='matcher_results') + matched_results_paths = {} + for event_type, primary_keys in self.schema_pk_lookup.items(): + if event_type in self.event_types_read: + logger.info(f'Writing matched data for {event_type}') + matched_results_paths[event_type] = self._write_one_event_type( + df=matches, + event_type=event_type, + primary_keys=primary_keys + ) + else: + logger.info(f'No data read for {event_type}. Skipping writing.') + + return matched_results_paths + + def _write_one_event_type(self, df:pd.DataFrame, event_type:str, primary_keys:list) -> str: + # Join the matched ids to the source data + df = self._join_matched_and_merged_data(df, event_type, primary_keys) + + # Cache the current match to S3 + logger.info(f'Writing data for {event_type}.') + self.cache.cache_events(df=df, event_type=event_type) + self.store.write_dataframe(df=df, path=f'{event_type}/matched') + + return f'{self.store.base_data_directory}/{event_type}/matched' + + def _join_matched_and_merged_data( + self, + right_df:pd.DataFrame, + event_type:str, + primary_keys:list + ) -> pd.DataFrame: + left_df=self._read_merged_data(event_type)[primary_keys] + df = left_df.merge( + right=right_df['matched_id'].to_frame(), + left_index=True, + right_index=True, + copy=False, + validate='many_to_one' + ) + logger.info(f'Joined matched_ids to primary keys for {event_type}') + + return df + diff --git a/matcher/matcher/tasks.py b/matcher/matcher/tasks.py index 236becac..c21f0956 100644 --- a/matcher/matcher/tasks.py +++ b/matcher/matcher/tasks.py @@ -9,22 +9,19 @@ import pandas as pd import yaml -import matcher.matcher as matcher +import matcher.pipeline as pipeline import matcher.preprocess as preprocess import matcher.utils as utils -import matcher.ioutils as ioutils +import matcher.storage as storage from matcher.logger import logger - from redis import Redis from rq import Queue redis_connection = Redis(host='redis', port=6379) q = Queue('webapp', connection=redis_connection) - - def do_match( base_data_directory:str, schema_pk_lookup:dict, @@ -32,68 +29,55 @@ def do_match( notify_webapp:bool=True, config_path:str='matcher_config.yaml' ): + # Initializing: let's get started by collecting some job metadata and creating storage objects with open(config_path) as f: config = yaml.load(f) - - # Initializing: let's get started by collecting some job metadata + logger.debug(config) metadata = { 'match_job_start_time': datetime.datetime.now(), 'match_job_id': utils.unique_match_job_id(), 'base_data_directory': base_data_directory, 'config': config } - logger.info("Matching process started!") + logger.info('Matching process started!') + + store = storage.Store(base_data_directory) + cache = storage.Cache(store, metadata['match_job_id'], **config['cache']) + matcher_service_store = storage.MatcherServiceStore(store, cache, schema_pk_lookup, config['keys']) try: # Loading: collect matching data (keys) for all available event types & record which event types were found logger.info('Loading data for matching.') - df, event_types_read = ioutils.load_data_for_matching( - base_data_directory, - list(schema_pk_lookup.keys()), - config['keys'], - metadata['match_job_id'] - ) - metadata['event_types_read'] = list(event_types_read) + df = matcher_service_store.load_data_for_matching() + metadata['event_types_read'] = matcher_service_store.event_types_read metadata['loaded_data_columns'] = list(df.columns.values) metadata['loaded_data_shape'] = list(df.shape) metadata['data_loaded_time'] = datetime.datetime.now() # Preprocessing: enforce data types and split/combine columns for feartures logger.info('Doing some preprocessing on the columns') - df = preprocess.preprocess(df, metadata['match_job_id'], base_data_directory) + df = preprocess.preprocess(df, cache) metadata['preprocessed_data_columns'] = list(df.columns.values) metadata['preprocessed_data_shape'] = list(df.shape) metadata['data_preprocessed_time'] = datetime.datetime.now() - # Matching: block the data, generate pairs and features, and cluster entities - logger.info(f"Running matcher") - match_object = matcher.Matcher( - base_data_directory=base_data_directory, - match_job_id=metadata['match_job_id'], - clustering_rules=config['clusterer']['args'], - contrast_rules=config['contrasts'], - blocking_rules=config['blocking_rules'] - ) - matches = match_object.block_and_match(df=df) + # Record Linkage: block the data, generate pairs and features, and cluster entities + logger.info(f'Running matching pipeline.') + matchmaker = pipeline.Pipeline(config=config, cache=cache) + matchmaker.run(df) + matches = matchmaker.matches metadata['data_matched_time'] = datetime.datetime.now() - metadata.update(match_object.metadata) logger.debug('Matching done!') - logger.debug(f"Number of matched pairs: {len(matches)}") + logger.debug(f'Number of matched pairs: {len(matches)}') # Writing: Join the matched ids to the source data for each event & write to S3 and postgres logger.info('Writing matched results!') - matched_results_paths = ioutils.write_matched_data( - matches=matches, - base_data_directory=base_data_directory, - person_keys=config['keys'], - schema_pk_lookup={event_type:schema_pk_lookup[event_type] for event_type in event_types_read}, - match_job_id=metadata['match_job_id'] - ) + matched_results_paths = matcher_service_store.write_matched_data(matches=matches) metadata['data_written_time'] = datetime.datetime.now() - ioutils.write_dict_to_yaml(metadata, f"{base_data_directory}/match_cache/metadata/{metadata['match_job_id']}") + #ioutils.write_dict_to_yaml(metadata, f'{base_data_directory}/match_cache/metadata/{metadata["match_job_id"]}') - logger.info('Finished') + logger.info('Finished successfully!') match_end_time = datetime.datetime.now() match_runtime = match_end_time - metadata['match_job_start_time'] @@ -124,7 +108,6 @@ def do_match( finally: if notify_webapp: - logger.error("Notifying the webapp") job = q.enqueue_call( func='webapp.match_finished', args=( diff --git a/matcher/matcher/utils.py b/matcher/matcher/utils.py index 2f292a96..0e539667 100644 --- a/matcher/matcher/utils.py +++ b/matcher/matcher/utils.py @@ -5,8 +5,6 @@ import numpy as np import pandas as pd -from matcher import ioutils - from matcher.logger import logger from uuid import uuid4 diff --git a/matcher/matcher_config.yaml b/matcher/matcher_config.yaml index 27e2fb9d..40557799 100644 --- a/matcher/matcher_config.yaml +++ b/matcher/matcher_config.yaml @@ -1,3 +1,11 @@ +cache: + loaded_data: true + preprocessed_data: true + contrasts: true + square_distance_matrix: true + raw_cluster_ids: true + matcher_results: true + keys: # Which columns to select for matching - first_name @@ -98,8 +106,11 @@ contrasts: args: n_chars: -1 +scorer: + operation: mean + clusterer: - method: sklearn.cluster.DBSCAN + algorithm: sklearn.cluster.DBSCAN args: eps: 0.2 min_samples: 1 diff --git a/matcher/matcher_config_for_reduced_dataset.yaml b/matcher/matcher_config_for_reduced_dataset.yaml index ea05ca5c..63d246b6 100644 --- a/matcher/matcher_config_for_reduced_dataset.yaml +++ b/matcher/matcher_config_for_reduced_dataset.yaml @@ -2,6 +2,14 @@ # only a small subset of personal data is available to the matcher and where # hashed SSNs are used. +cache: + loaded_data: true + preprocessed_data: true + contrasts: true + square_distances: true + raw_cluster_ids: true + matcher_results: true + keys: # Which columns to select for matching - first_name @@ -73,8 +81,11 @@ contrasts: ssn_hash: - method: compare_exact +scorer: + operation: mean + clusterer: - method: sklearn.cluster.DBSCAN + algorithm: sklearn.cluster.DBSCAN args: eps: 0.2 min_samples: 1 diff --git a/matcher/requirements.txt b/matcher/requirements.txt index 23ce7f23..769953f3 100644 --- a/matcher/requirements.txt +++ b/matcher/requirements.txt @@ -3,17 +3,12 @@ pandas==0.21.0 python-dateutil==2.6.1 pytz==2017.3 six==1.11.0 -Flask==0.12.2 -boto3==1.4.7 python-dotenv==0.7.1 -s3fs==0.1.2 -scipy==1.0.0 sklearn==0.0 psycopg2==2.7.3.2 redis rq -flask_script -python-Levenshtein==0.12.0 +s3fs recordlinkage colorlog PyYAML==3.12 diff --git a/matcher/setup.py b/matcher/setup.py index 19c0a452..8710f4bd 100644 --- a/matcher/setup.py +++ b/matcher/setup.py @@ -13,7 +13,7 @@ setup( name='matcher', - version='0.0.1', + version='0.2', description='CSH matcher', url='https://github.com/dssg/matching-tool', packages=find_packages(),