-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathObservation.py
247 lines (182 loc) · 9.72 KB
/
Observation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
from Attribute import Attribute
from HyperParameters import HyperParameters
class Observation:
def __init__(self, capsule, route, inputObservations : list, outputAttributes : dict, outputProbability : float, attrIndex : int = 0):
# inputObservations # List of Observations or List of Lists of Observations
# outputAttributes # Attribute - Value or Attribute - List of Values
# inputProbabilities # Capsule - Probability
self._inputObservations : list = [] # Observations
self._outputAttributes : dict = {} # Attribute - Value
self._outputProbability : float = outputProbability
self._route = route
self._capsule = capsule
self._previousObservation = None
self._accelerations : dict = {} # Attribute - Value
for obs in inputObservations:
if type(obs) is list:
for actualObs in obs:
self._inputObservations.append(actualObs)
else:
self._inputObservations.append(obs)
for attribute, value in outputAttributes.items():
if type(value) is list:
self._outputAttributes[attribute] = value[attrIndex]
else:
self._outputAttributes[attribute] = value
def getJSONOutput(self):
obs = {}
obs["name"] = self._capsule.getName()
obs["route"] = self._route.getName()
obs["probability"] = self._outputProbability
attrList = []
for attr, val in self._outputAttributes.items():
attrList.append({"attribute" : attr.getName(), "value" : val})
obs["attributes"] = attrList
return obs
def isParent(self, observation):
for obs in self._inputObservations:
if obs == observation:
return True
return False
def getOutputs(self, onlyInheritable : bool = False):
if onlyInheritable is False:
return self._outputAttributes # Attribute - Value
else:
outputDict = {}
for attribute, value in self._outputAttributes.items():
if attribute.isInheritable() is True:
outputDict[attribute] = value
return outputDict # Attribute - Value
def getOutputsList(self, onlyInheritable : bool = False):
outputDict = {}
for attribute, value in self._outputAttributes.items():
if onlyInheritable is False or attribute.isInheritable() is True:
outputDict[attribute] = [value]
return outputDict # Attribute - List of Values
def setOutput(self, attribute : Attribute, value : float):
if attribute in self._outputAttributes:
self._outputAttributes[attribute] = value
def getInputs(self):
inputs = {} # Attribute - List of values
for obs in self._inputObservations:
newInputs = obs.getOutputs()
for attr, value in newInputs.items():
if attr in inputs:
inputs[attr].append(value)
else:
inputs[attr] = [value]
return inputs # Attribute - List of values
def getOutput(self, attribute : Attribute = None, attributeName : str = None):
if attribute is not None and attribute in self._outputAttributes:
return self._outputAttributes[attribute]
if attributeName is not None:
for attr, value in self._outputAttributes.items():
if attr.getName() == attributeName:
return value
return 0.0
def appendOutputAttribute(self, attribute : Attribute, attributeValue : float):
self._outputAttributes[attribute] = attributeValue
def rescaleAttribute(self, attribute : Attribute, scale : float):
for attr, val in self._outputAttributes:
if attr == attribute:
self._outputAttributes[attr] = val * scale
def printOutputs(self, includeNonInheritable : bool):
print(self._capsule.getName())
print("Probability - " + str(int(self._outputProbability * 100)) + "%")
for attribute, value in self._outputAttributes.items():
if includeNonInheritable is True or attribute.isInheritable() is True:
print(attribute.getName() + ": " + str(value))
print("-------------------------")
def printContinuity(self, includeNonInheritable : bool):
if self._previousObservation is None:
leftPad = " ".ljust(40)
leftArrow = " ".ljust(30) + " -----> ".ljust(10)
print(leftArrow + self._capsule.getName())
print(leftPad + "Probability - " + str(int(self._outputProbability * 100)) + "%")
for attribute, value in self._outputAttributes.items():
if includeNonInheritable is True or attribute.isInheritable() is True:
print(leftPad + attribute.getName() + ": " + str(value))
print(leftPad + "-------------------------")
else:
prevOb = self._previousObservation
leftArrow = prevOb._capsule.getName().ljust(30) + " -----> ".ljust(10)
print(leftArrow + self._capsule.getName())
leftPad = "Probability - " + str(int(prevOb._outputProbability * 100)) + "%"
leftPad = leftPad.ljust(40)
print(leftPad + "Probability - " + str(int(self._outputProbability * 100)) + "%")
for attribute, value in self._outputAttributes.items():
if includeNonInheritable is True or attribute.isInheritable() is True:
leftPad = attribute.getName() + ": " + str(prevOb._outputAttributes[attribute])
leftPad = leftPad.ljust(40)
print(leftPad + attribute.getName() + ": " + str(value))
leftPad = "-------------------------".ljust(40)
print(leftPad + "-------------------------")
def getProbability(self):
return self._outputProbability
def getMeanProbability(self):
if self._route is None:
return 1.0
return self._route.getMeanProbability()
def getInputProbability(self, capsule):
for obs in self._inputObservations:
if obs.getCapsule() == capsule:
return obs.getProbability()
return 0.0
def getTakenRoute(self):
return self._route
def getInputObservations(self):
return self._inputObservations
def addInputObservation(self, observation):
self._inputObservations.append(observation)
def clearInputObservations(self):
self._inputObservations = []
def getCapsule(self):
return self._capsule
def isZeroObservation(self):
for value in self._outputAttributes.values():
if abs(value) > 0.01:
return False
return True
def offset(self, offsetLabelX : str, offsetLabelY : str, offsetLabelRatio : str, targetLabelX : str, targetLabelY : str, targetLabelSize : str):
# We can only offset by one set of attributes, so get any Capsule from the list (Should only be
# one when this method is called anyways)
inputCapsule = self._inputObservations[0].getCapsule()
offsetX = self._inputObservations[0].getOutputs()[inputCapsule.getAttributeByName(offsetLabelX)]
offsetY = self._inputObservations[0].getOutputs()[inputCapsule.getAttributeByName(offsetLabelY)]
offsetRatio = self._inputObservations[0].getOutputs()[inputCapsule.getAttributeByName(offsetLabelRatio)]
for attribute, value in self._outputAttributes.items():
if attribute.getName() == targetLabelX:
self._outputAttributes[attribute] = value * offsetRatio + offsetX
if attribute.getName() == targetLabelY:
self._outputAttributes[attribute] = value * offsetRatio + offsetY
if attribute.getName() == targetLabelSize:
self._outputAttributes[attribute] = value * offsetRatio
def cleanupSymmetries(self, applySymmetries):
# applySymmetries # Lambda attributes -> attributes
newOutputs = applySymmetries(self.getOutputsList())
for attr, valList in newOutputs.items():
self._outputAttributes[attr] = valList[0]
def linkPreviousObservation(self, observation):
self._previousObservation = observation
def hasPreviousObservation(self):
if self._previousObservation is None:
return False
else:
return True
def setAccelerations(self, accelerations : dict):
self._accelerations = accelerations
def getVelocities(self, timeStep : float):
velocities = {}
if self._previousObservation is None:
for attr, value in self._outputAttributes.items():
velocities[attr] = 0.0
else:
linkedOutputs = self._previousObservation.getOutputs()
for attr, value in self._outputAttributes.items():
velocities[attr] = (value - linkedOutputs[attr]) / timeStep
if attr in self._accelerations:
velocities[attr] = velocities[attr] + 0.5 * self._accelerations[attr] * timeStep
if abs(velocities[attr]) < HyperParameters.VelocityCutoff:
# "Smooth" out small error fluctuations
velocities[attr] = 0.0
return velocities # {Attribute, Velocity}