-
Notifications
You must be signed in to change notification settings - Fork 0
/
exam_problem6.py
276 lines (258 loc) · 9.41 KB
/
exam_problem6.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
#Code shared across examples
import pylab, string
import random
def stdDev(X):
mean = sum(X)/float(len(X))
tot = 0.0
for x in X:
tot += (x - mean)**2
return (tot/len(X))**0.5
def scaleFeatures(vals):
"""Assumes vals is a sequence of numbers"""
result = pylab.array(vals)
mean = sum(result)/float(len(result))
result = result - mean
sd = stdDev(result)
result = result/sd
return result
class Point(object):
def __init__(self, name, originalAttrs):
"""originalAttrs is an array"""
self.name = name
self.attrs = originalAttrs
def dimensionality(self):
return len(self.attrs)
def getAttrs(self):
return self.attrs
def distance(self, other):
#Euclidean distance metric
result = 0.0
for i in range(self.dimensionality()):
result += (self.attrs[i] - other.attrs[i])**2
return result**0.5
def getName(self):
return self.name
def toStr(self):
return self.name + str(self.attrs)
def __str__(self):
return self.name
class Cluster(object):
""" A Cluster is defines as a set of elements, all having
a particular type """
def __init__(self, points, pointType):
""" Elements of a cluster are saved in self.points
and the pointType is also saved """
self.points = points
self.pointType = pointType
def singleLinkageDist(self, other):
""" Returns the float distance between the points that
are closest to each other, where one point is from
self and the other point is from other. Uses the
Euclidean dist between 2 points, defined in Point."""
# TO DO
sinL_dist = float(self.points[0].distance(other.points[0]))
for i in self.points:
for j in other.points:
if sinL_dist > float(i.distance(j)):
sinL_dist = float(i.distance(j))
return sinL_dist
def maxLinkageDist(self, other):
""" Returns the float distance between the points that
are farthest from each other, where one point is from
self and the other point is from other. Uses the
Euclidean dist between 2 points, defined in Point."""
# TO DO
maxL_dist = float(self.points[0].distance(other.points[0]))
for i in self.points:
for j in other.points:
if maxL_dist < float(i.distance(j)):
maxL_dist = float(i.distance(j))
return maxL_dist
def averageLinkageDist(self, other):
""" Returns the float average (mean) distance between all
pairs of points, where one point is from self and the
other point is from other. Uses the Euclidean dist
between 2 points, defined in Point."""
# TO DO
aveL_dist = 0.0
for i in self.points:
for j in other.points:
aveL_dist += float(i.distance(j))
return aveL_dist/(len(self.points) * len(other.points))
def mysteryLinkageDist(self, other):
av_dist = self.averageLinkageDist(other)
max_dist = self.maxLinkageDist(other)
min_dist = self.singleLinkageDist(other)
retDist = 0.0
if av_dist == max_dist and max_dist == min_dist:
retDist = av_dist
elif av_dist == max_dist:
retDist = av_dist
elif av_dist == min_dist:
retDist = av_dist
elif max_dist == min_dist:
retDist = min_dist
else:
retDist = random.choice([av_dist,min_dist,max_dist])
return retDist
def members(self):
for p in self.points:
yield p
def isIn(self, name):
""" Returns True is the element named name is in the cluster
and False otherwise """
for p in self.points:
if p.getName() == name:
return True
return False
def toStr(self):
result = ''
for p in self.points:
result = result + p.toStr() + ', '
return result[:-2]
def getNames(self):
""" For consistency, returns a sorted list of all
elements in the cluster """
names = []
for p in self.points:
names.append(p.getName())
return sorted(names)
def __str__(self):
names = self.getNames()
result = ''
for p in names:
result = result + p + ', '
return result[:-2]
class ClusterSet(object):
""" A ClusterSet is defined as a list of clusters """
def __init__(self, pointType):
""" Initialize an empty set, without any clusters """
self.members = []
def add(self, c):
""" Append a cluster to the end of the cluster list
only if it doesn't already exist. If it is already in the
cluster set, raise a ValueError """
if c in self.members:
raise ValueError
self.members.append(c)
def getClusters(self):
return self.members[:]
def mergeClusters(self, c1, c2):
""" Assumes clusters c1 and c2 are in self
Adds to self a cluster containing the union of c1 and c2
and removes c1 and c2 from self """
# TO DO
new = Cluster(c1.points + c2.points, c1.pointType)
self.members.append(new)
self.members.remove(c1)
self.members.remove(c2)
def findClosest(self, linkage):
""" Returns a tuple containing the two most similar
clusters in self
Closest defined using the metric linkage """
# closest = {distance : (c1, c2)}
closest = {linkage(self.members[0], self.members[1]) : (self.members[0], self.members[1])}
for i in range(len(self.members)-1):
for j in self.members[i+1:]:
dist = linkage(self.members[i], j)
if closest.keys()[0] - dist > 0.01: # float1 != float1
closest.clear()
closest[dist] = (self.members[i], j)
return closest.values()[0]
def mergeOne(self, linkage):
""" Merges the two most simililar clusters in self
Similar defined using the metric linkage
Returns the clusters that were merged """
# TO DO
(c1, c2) = self.findClosest(linkage)
self.mergeClusters(c1, c2)
return (c1, c2)
def numClusters(self):
return len(self.members)
def toStr(self):
cNames = []
for c in self.members:
cNames.append(c.getNames())
cNames.sort()
result = ''
for i in range(len(cNames)):
names = ''
for n in cNames[i]:
names += n + ', '
names = names[:-2]
result += ' C' + str(i) + ':' + names + '\n'
return result
#City climate example
class City(Point):
pass
def readCityData(fName, scale = False):
"""Assumes scale is a Boolean. If True, features are scaled"""
dataFile = open(fName, 'r')
numFeatures = 0
#Process lines at top of file
for line in dataFile: #Find number of features
if line[0:4] == '#end': #indicates end of features
break
numFeatures += 1
numFeatures -= 1
featureVals = []
#Produce featureVals, cityNames
featureVals, cityNames = [], []
for i in range(numFeatures):
featureVals.append([])
#Continue processing lines in file, starting after comments
for line in dataFile:
dataLine = string.split(line[:-1], ',') #remove newline; then split
cityNames.append(dataLine[0])
for i in range(numFeatures):
featureVals[i].append(float(dataLine[i+1]))
#Use featureVals to build list containing the feature vectors
#For each city scale features, if needed
if scale:
for i in range(numFeatures):
featureVals[i] = scaleFeatures(featureVals[i])
featureVectorList = []
for city in range(len(cityNames)):
featureVector = []
for feature in range(numFeatures):
featureVector.append(featureVals[feature][city])
featureVectorList.append(featureVector)
return cityNames, featureVectorList
def buildCityPoints(fName, scaling):
cityNames, featureList = readCityData(fName, scaling)
points = []
for i in range(len(cityNames)):
point = City(cityNames[i], pylab.array(featureList[i]))
points.append(point)
return points
#Use hierarchical clustering for cities
def hCluster(points, linkage, numClusters, printHistory):
cS = ClusterSet(City)
for p in points:
cS.add(Cluster([p], City))
history = []
while cS.numClusters() > numClusters:
merged = cS.mergeOne(linkage)
history.append(merged)
if printHistory:
print ''
for i in range(len(history)):
names1 = []
for p in history[i][0].members():
names1.append(p.getName())
names2 = []
for p in history[i][1].members():
names2.append(p.getName())
print 'Step', i, 'Merged', names1, 'with', names2
print ''
print 'Final set of clusters:'
print cS.toStr()
return cS
# with or without scaling the data
def test0():
points = buildCityPoints('test_problem6.txt', False)
hCluster(points, Cluster.singleLinkageDist, 3, False)
hCluster(points, Cluster.maxLinkageDist, 3, False)
hCluster(points, Cluster.averageLinkageDist, 3, False)
hCluster(points, Cluster.mysteryLinkageDist, 3, False)
#test0()