forked from Smartitect/dataprep
-
Notifications
You must be signed in to change notification settings - Fork 0
/
notebook03_joinData.py
106 lines (82 loc) · 3.47 KB
/
notebook03_joinData.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#%% [markdown]
# Stage 3 - Join Data
# ## Common
# So this is where we are trying to do all the common stuff to ingest all of the files. Key is recognition that there are common patterns we can exploit across the files.
# NOTE - still to figure out how to do this from a single file and import it successfully.
#%%
# Import all of the libraries we need to use...
import pandas as pd
import azureml.dataprep as dprep
import seaborn as sns
import os as os
import re as re
import collections
from azureml.dataprep import value
from azureml.dataprep import col
from azureml.dataprep import Package
# Let's also set up global variables and common functions...
# Path to the source data
dataPath = "./data"
# Path to the location where the dataprep packags that are created
packagePath = "./packages"
# Name of package file
packageFileSuffix = "_package.dprep"
# A helper function to create full package path
def createFullPackagePath(packageName, stage, qualityFlag):
return packagePath + '/' + packageName + '_' + stage + '_' + qualityFlag + packageFileSuffix
# A save package helper function
def savePackage(dataFlowToPackage, packageName, stage, qualityFlag):
dataFlowToPackage = dataFlowToPackage.set_name(packageName)
packageToSave = dprep.Package(dataFlowToPackage)
fullPackagePath = createFullPackagePath(packageName, stage, qualityFlag)
packageToSave = packageToSave.save(fullPackagePath)
return fullPackagePath
# An open package helper function
def openPackage(packageName, stage, qualityFlag):
fullPackagePath = createFullPackagePath(packageName, stage, qualityFlag)
packageToOpen = Package.open(fullPackagePath)
dataFlow = packageToOpen[packageName]
return dataFlow
#%% [markdown]
# ## Open PEOPLE and MEMBERS data flows from stage 2
# Simply pick up the data flows from stage 2...
#%%
peopleDataFlow = openPackage('PEOPLE', '2', 'A')
membersDataFlow = openPackage('MEMBERS', '2', 'A')
#%% [markdown]
# ## Join the PEOPLE and MEMBERS data flows
# Crunch time! Let's see if we can get these cleaned up data sets to join.
#%%
join_builder = peopleDataFlow.builders.join(right_dataflow=membersDataFlow, left_column_prefix='l', right_column_prefix='r')
join_builder.detect_column_info()
join_builder
#%%
join_builder.generate_suggested_join()
join_builder.list_join_suggestions()
#%% [markdown]
# Weird, it doesn't come up with a suggestion despite having two MEMNO integer columns to work with!
#%%
joinedDataFlow = dprep.Dataflow.join(left_dataflow=peopleDataFlow,
right_dataflow=membersDataFlow,
join_key_pairs=[('ID', 'PEOPLEID')],
left_column_prefix='PEOPLE_',
right_column_prefix='MEMBERS_')
#%%
joinedDataFlow.head(5)
#%%
joinedDataFlow.get_profile()
#%% [markdown]
# Just running a couple of checks now to see how well the join has worked:
#%%
print('PEOPLE row count = {0}'.format(peopleDataFlow.row_count))
print('MEMBERS row count = {0}'.format(membersDataFlow.row_count))
print('JOINED row count = {0}'.format(joinedDataFlow.row_count))
#%%
orphanedPeopleDataFlow = joinedDataFlow.filter(joinedDataFlow['MEMBER_PEOPLEID'] == None)
orphanedPeopleDataFlow.head(20)
#%% [markdown]
# ## Save JOINED data
# Finally save the JOINED data flow that comes out of stage 3 for consumption downstream
#%%
fullPackagePath = savePackage(joinedDataFlow, 'JOINED', '3', 'A')
print('Saved package to file {0}'.format(fullPackagePath))