Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Matcher v2 #379

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
9fa0e10
Bumped to version 0.2
nanounanue May 18, 2018
a3baeff
Not needed anymore
nanounanue May 18, 2018
bc55d7e
Semi-refactored. Needs @ecsalomon wisdom
nanounanue May 18, 2018
1e337c3
Removing singleton ids and block_name, now is problem of the Matcher
nanounanue May 18, 2018
ef79cec
Blocker class added. Also discovered and killed a bug in the ValueErr…
nanounanue May 18, 2018
6973ced
Forgot self. Added
nanounanue May 18, 2018
6883bb0
I always forget about self
nanounanue May 18, 2018
e920380
I always forget about self (again)
nanounanue May 18, 2018
8d7d1b7
Scorer class
nanounanue May 18, 2018
df00eca
Using @ecsalomon suggestion regarding to the future
nanounanue May 18, 2018
251d9e3
Removed the lines related to blocker
nanounanue May 18, 2018
4937b59
Merge branch 'master' into erickas-oo
nanounanue May 23, 2018
44dd54f
some work
ecsalomon Jun 13, 2018
3eece57
Merge branch 'master' of https://github.com/dssg/csh into erickas-oo
ecsalomon Aug 11, 2018
0891c69
add blocker metadata, comments
ecsalomon Aug 11, 2018
1d6bda2
use new blocker
ecsalomon Aug 11, 2018
8a89248
fix long lines
ecsalomon Aug 11, 2018
f2e719b
add metadata to scorer
ecsalomon Aug 11, 2018
e95edb7
call scorer.run()
ecsalomon Aug 11, 2018
5c322fa
rename matches column to score
ecsalomon Aug 11, 2018
be14a28
improve contraster metadata
ecsalomon Aug 11, 2018
3f582ac
refactor clusterer, add metadata
ecsalomon Aug 11, 2018
73f4876
add block key to ids
ecsalomon Aug 11, 2018
728cce3
IT RUNS!
ecsalomon Aug 12, 2018
f3b2fe9
clean up metadata
ecsalomon Aug 12, 2018
a083607
refactor into Pipeline and Matcher, begin better metadata handling
ecsalomon Aug 13, 2018
4c08f87
remove metadata dictionary from cluster
ecsalomon Aug 13, 2018
a59f1de
remove metadata dict from contraster
ecsalomon Aug 13, 2018
824cf5a
remove metadata dictionary from scorer
ecsalomon Aug 13, 2018
c9f1f91
rename func to describe contrasts
ecsalomon Aug 13, 2018
3697bb7
be consistent about version
ecsalomon Aug 13, 2018
5718a31
add pipeline file
ecsalomon Aug 13, 2018
8a155e3
add triage storage file
ecsalomon Aug 13, 2018
99f0c71
storage refactoring
ecsalomon Sep 26, 2018
3fe37b8
restore match cache
ecsalomon Oct 7, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file removed matcher/config/.gitkeep
Empty file.
2 changes: 1 addition & 1 deletion matcher/matcher/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# coding: utf-8

__version__='0.0.1'
__version__ = '0.2'

from matcher.tasks import do_match

61 changes: 61 additions & 0 deletions matcher/matcher/blocker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# coding: utf-8

import datetime
import pandas as pd

from matcher.logger import logger


class Blocker():

def __init__(self, blocking_rules:dict=None):
self.blocking_rules = blocking_rules
self.initialization_time = datetime.datetime.now()
self.run_start_time = None
self.num_blocks = None
self.block_keys = None
self.run_end_time = None

def run(self, preprocessed_df:pd.DataFrame) -> pd.core.groupby.DataFrameGroupBy:
""" Take the preprocessed dataframe, and return a group dictionary of
dataframes where the keys are the blocking values and the values
are dataframes containing only records matching the key.
"""
self.run_start_time = datetime.datetime.now()

if self.blocking_rules:
logger.debug(f"Blocking by {self.blocking_rules}")
grouped_df = preprocessed_df.groupby([
self._unpack_blocking_rule(preprocessed_df, column_name, position)
for column_name, position
in self.blocking_rules.items()
])

else:
logger.debug('No blocking rules passed. Matching all record pairs.')
block = pd.Series(data=['all']*len(preprocessed_df), index=preprocessed_df.index)
grouped_df = preprocessed_df.groupby(block)

logger.debug(f"Blocking is done: got {len(grouped_df)} blocks.")
self.num_blocks = len(grouped_df)
self.block_keys = pd.DataFrame(grouped_df.keys).drop_duplicates().T
self.run_end_time = datetime.datetime.now()

return grouped_df

def _unpack_blocking_rule(
self,
df:pd.DataFrame,
column_name:str,
position:int
) -> pd.Series:
""" Given a blocking rule and a dataframe, convert the relevant column
to a string and return the value of the key for each row.
"""
if position < 0:
return df[column_name].astype(str).str[position:]
elif position > 0:
return df[column_name].astype(str).str[:position]
else:
raise ValueError(f"I cannot split a string at this position: {position}")

162 changes: 69 additions & 93 deletions matcher/matcher/cluster.py
Original file line number Diff line number Diff line change
@@ -1,101 +1,77 @@
# coding: utf-8

import pandas as pd
import datetime
import numpy as np


from sklearn.cluster import DBSCAN

import matcher.ioutils as ioutils
import pandas as pd
import sklearn

from matcher.logger import logger

def cluster(
distances:pd.DataFrame,
eps:float=0.5,
min_samples:int=1,
algorithm:str='auto',
leaf_size:int=30,
n_jobs:int=1
) -> pd.DataFrame:
""" Cluster the scored entities into individuals. Return the cluster ids
indexed with the source row_id.
"""

logger.info('Beginning clustering.')

clusterer = DBSCAN(
eps=eps,
min_samples=min_samples,
metric='precomputed',
metric_params=None,
algorithm=algorithm,
leaf_size=leaf_size,
p=None,
n_jobs=n_jobs
)

clusterer.fit(X=distances)
logger.info('Clustering done! Assigning matched ids.')

return pd.Series(
index=distances.index,
data=clusterer.labels_
)


def square_distance_matrix(df:pd.DataFrame) -> pd.DataFrame:
# create a copy, swap the indicies
tmp_df = df.copy()
tmp_df.reset_index(inplace=True)
tmp_df.rename(
mapper={'matcher_index_left': 'matcher_index_right', 'matcher_index_right': 'matcher_index_left'},
axis='columns',
inplace=True
)
tmp_df.set_index(['matcher_index_left', 'matcher_index_right'], inplace=True)

# concat original & df with swapped indices; square the matrix, filling in 0 distance for self-pairs
return pd.concat([df, tmp_df]).matches.unstack(level=-1, fill_value=0)


def generate_matched_ids(
distances:pd.DataFrame,
DF:pd.DataFrame,
clustering_params:dict,
base_data_directory:str,
match_job_id=str,
block_name='',
) -> pd.DataFrame:

logger.info('Beginning clustering & id generation.')
distances = square_distance_matrix(distances)
ioutils.write_dataframe(distances.reset_index(), filepath=f'{base_data_directory}/match_cache/square_distances/{match_job_id}/{block_name}')

ids = cluster(
distances, **clustering_params
)
ioutils.write_dataframe(ids.reset_index(), filepath=f'{base_data_directory}/match_cache/raw_cluster_ids/{match_job_id}/{block_name}')
max_cluster_id = ids.max()
replacement_ids = pd.Series(range(max_cluster_id + 1, max_cluster_id + len(ids[ids == -1]) + 1), index=ids[ids==-1].index)
ids[ids == -1] = replacement_ids
logger.debug(f'IDs: {ids}')
logger.debug(f'Replaced noisy singleton ids with \n{replacement_ids}')

logger.debug(f'Adding the block name ({block_name}) to the matched_ids.')
ids = block_name + ids.astype(str)
logger.debug(f'New IDs: \n{ids}')

df = DF.copy()

df['matched_id'] = ids
logger.info('Matched ids generated')

return (df)


def generate_singleton_id(df:pd.DataFrame, block_name:str) -> pd.DataFrame:
df['matched_id'] = block_name + '0'
logger.info(f'Singleton has id {df.matched_id.values[0]}')
return df
class Clusterer():

def __init__(self, clustering_algorithm=sklearn.cluster.DBSCAN, **kwargs):
self.kwargs = kwargs
self.clusterer = clustering_algorithm(**kwargs)
self.initialized_time = datetime.datetime.now()
self.square_distance_matrix = None
self.raw_cluster_ids = None

def run(self, distances: pd.DataFrame) -> pd.DataFrame:
""" Cluster the scored entities into individuals. Return the cluster ids
indexed with the source row_id.
"""
self.run_start_time = datetime.datetime.now()
logger.info('Beginning clustering & id generation.')

self.square_distance_matrix = self._square_distance_matrix(distances)
self.square_distance_matrix_created_time = datetime.datetime.now()
self.square_distance_matrix_dimensions = self.square_distance_matrix.shape

logger.debug('Squared the distances. Beginning clustering.')
self.clusterer.fit(X=self.square_distance_matrix)
self.clusterer_fit_time = datetime.datetime.now()

logger.debug('Clustering done! Assigning matched ids.')
ids = self._generate_ids(
index=self.square_distance_matrix.index.values,
labels=self.clusterer.labels_
).astype(str)
self.run_end_time = datetime.datetime.now()

return ids

def _square_distance_matrix(self, df: pd.DataFrame) -> pd.DataFrame:
# create a copy, swap the indicies
tmp_df = df.copy()
tmp_df.reset_index(inplace=True)
tmp_df.rename(
mapper={'matcher_index_left': 'matcher_index_right', 'matcher_index_right': 'matcher_index_left'},
axis='columns',
inplace=True
)
tmp_df.set_index(['matcher_index_left', 'matcher_index_right'], inplace=True)

# concat original & df with swapped indices;
# square (unstack) the matrix, filling in 0 distance for self-pairs
return pd.concat([df, tmp_df]).score.unstack(level=-1, fill_value=0)

def _generate_ids(self, index: pd.Index, labels: np.array) -> pd.Series:
logger.debug(f'index {len(index)}')
logger.debug(f'labels {len(labels)}')
ids = pd.Series(index=index, data=labels, name='matched_id')
self.raw_cluster_ids = ids.copy()
logger.debug(f'ids {ids}')
max_cluster_id = ids.max()
self.num_clusters = int(max_cluster_id)
self.num_noisy_clusters = len(ids[ids == -1])

replacement_ids = pd.Series(
data=range(max_cluster_id + 1, max_cluster_id + len(ids[ids == -1]) + 1),
index=ids[ids == -1].index
)
ids[ids == -1] = replacement_ids
logger.info('Matched ids generated')

return ids

Loading