-
Notifications
You must be signed in to change notification settings - Fork 1
/
FARGModel.py
1571 lines (1342 loc) · 52.5 KB
/
FARGModel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# FARGModel.py -- The class that holds a FARG model, and ancillary classes
# that are not specific to any one FARG model.
from pprint import pprint as pp
import inspect
from time import process_time
import csv
from dataclasses import dataclass, field, fields, replace, is_dataclass, InitVar
import dataclasses
from typing import Union, List, Tuple, Dict, Set, FrozenSet, Iterable, Any, \
NewType, Type, ClassVar, Sequence, Callable, Hashable, Collection, \
Sequence, Literal, Protocol, runtime_checkable
from numbers import Number, Real
from abc import ABC, abstractmethod
from itertools import chain
from copy import copy
import operator
from operator import itemgetter, attrgetter
from heapq import nlargest
from collections import Counter, defaultdict
from io import StringIO
from inspect import isclass
import inspect
from random import choice
import networkx as nx # type: ignore[import]
import matplotlib.pyplot as plt # type: ignore[import]
#import netgraph
from FMTypes import Elem, Elems, Value, Addr, FMPred, Pred, match_wo_none
#from Slipnet import Slipnet, empty_slipnet, Before, After
from Slipnet2 import Slipnet
from FMGraphs import ActivationGraph
from Graph2 import Before, After
from NumberMatcher import NumberMatcher
from util import is_iter, as_iter, as_list, pts, pl, pr, csep, ssep, \
as_hashable, backslash, singleton, first, tupdict, as_dict, short, \
sample_without_replacement, clip, reseed, default_field_value, d_subset, \
fields_for, filter_none, force_setattr, PushAttr
# Global functions
# Returns Callable[['FARGModel', ...], bool]
def as_fmpred(o: FMPred) -> Callable[..., bool]:
'''Returns a predicate function that takes two arguments: a FARGModel and
an object.'''
# TODO Document the many ways this thing constructs a function.
if isclass(o):
return lambda fm, x: isinstance(x, o) # type: ignore[arg-type] # mypy bug?
elif isinstance(o, tuple):
preds = tuple(as_fmpred(p) for p in o)
return lambda fm, x: any(p(fm, x) for p in preds)
elif callable(o):
if first_arg_is_fargmodel(o):
return o
else:
return lambda fm, x: o(x) # type: ignore[call-arg, operator, misc] # mypy bug
elif o is None:
return lambda fm, x: True
else:
return lambda fm, x: match_wo_none(x, o)
# raise ValueError(
# f'as_pred: {repr(o)} must be a class, a callable, or None.'
# )
def first_arg_is_fargmodel(o: Callable) -> bool:
p0 = first(inspect.signature(o).parameters.values())
#print('FIRARG', o, p0, p0.annotation, type(p0.annotation))
try:
# TODO What if annotation is a string?
return issubclass(p0.annotation, FARGModel)
except TypeError:
return False
# TODO Type hint for 'o': int, str, fn
def as_mpred(o) -> Callable[[Hashable], float]:
if isinstance(o, int):
return NumberMatcher.make(o)
raise NotImplementedError(
f"Can't convert {type(o)} into match predicate: {o}"
)
class HasAvailValues(ABC):
'''Mix-in for cells and other things that (can) have avail values.'''
@abstractmethod
def has_avail_value(self, v: Value) -> bool:
pass
@abstractmethod
def take_avails(self, values: Iterable[Value]) \
-> Tuple[Iterable[Value], Iterable[Value]]:
'''Returns (taken_avails, remaining_avails). Might raise
ValuesNotAvail.'''
# TODO Require a determinate Collection, not just an Iterable (since
# we might fail and need to put 'values' into an exception).
pass
@property
def avails(self) -> Union[Sequence[Value], None]:
pass
def has_avail_value(
elem: Union['CellRef', Elem, None],
v: Value
) -> Union[bool, None, 'CellRef', Elem]:
'''
if elem is None:
return False
elif isinstance(elem, CellRef):
return any(elem.has_avail_value(v1) for v1 in as_iter(v))
elif elem == v:
return elem
else:
return False
'''
if elem is None:
return False
elif isinstance(elem, HasAvailValues):
return any(elem.has_avail_value(v1) for v1 in as_iter(v))
else:
return elem == v
''' Version before correcting for mypy 26-Sep-2021
if elem is None:
return False
try:
#return elem.has_avail_value(v)
return any(elem.has_avail_value(v1) for v1 in as_iter(v))
except AttributeError:
if elem == v:
return elem
else:
return False
'''
# TODO UT
def dig_attr(elem: Union[Elem, None], attr: str) -> Hashable:
'''Returns value of attr within elem. If elem contains no attr,
digs through elem.contents, the .contents of that, and so on, until
finding an attribute named attr. Returns None if the search fails.'''
if elem is None:
return None
try:
return getattr(elem, attr)
except AttributeError:
return dig_attr(getattr(elem, 'contents', None), attr)
# Element classes
@dataclass
class AgentStateDataclassMixin:
'''Needed only to get around mypy bug https://stackoverflow.com/a/69344698/1393162'''
succeeded: ClassVar[bool] = False
class AgentState(ABC, AgentStateDataclassMixin):
@abstractmethod
def can_go(self) -> bool:
'''While in this AgentState, can the Agent go?'''
pass
@dataclass
class Active(AgentState):
def can_go(self):
return True
@dataclass
class Succeeded(AgentState):
succeeded: ClassVar[bool] = True
def can_go(self):
return False
@dataclass
class AwaitingDelegate(AgentState):
delegate: 'Agents'
def can_go(self):
return False
def add_delegate(self, new_delegate: 'Agents'):
if not isinstance(self.delegate, list):
self.delegate = as_list(self.delegate)
self.delegate += as_iter(new_delegate)
def __str__(self):
cl = self.__class__.__name__
return f'{cl}({self.delegate})'
'''
@dataclass
class AwaitingOK(AgentState):
'''
@dataclass
class MustCheckIfSucceeded(AgentState):
prev_agentstate: AgentState
delegate: Union['Agent', None]
def can_go(self):
return True
def __str__(self):
cl = self.__class__.__name__
return f'{cl}({self.prev_agentstate}, {self.delegate})'
class Agent(ABC): #(Elem):
'''A workspace element that does things.'''
#orientation: Dict[str, Hashable]
@abstractmethod
# TODO Maybe put desired args explicitly in go()'s signature
#def go(self, fm: 'FARGModel', contents=None, orientation=None):
def go(self, fm: 'FARGModel', **kwargs):
pass
@abstractmethod
def __hash__(self):
pass
# TODO Default impl should check if Agent is Blocked
def can_go(self, fm: 'FARGModel', **kwargs) -> bool:
'''Is the Agent capable of executing its .go() method? If False,
FARGModel.do_timestep() will give the Agent a zero probability of
running. Default implementation: True unless Agent has a Blocked
tag.'''
#TODO Default implementation should also return False if Agent has
# any Blanks.
return not fm.is_blocked(self)
def update_support(cls, fm: 'FARGModel'):
'''The Agent should update the weights from itself to other Elems.
Default implementation: do nothing.'''
# TODO Return an iterable of updates so they're easy to log.
pass
def check_if_succeeded(self, fm: 'FARGModel', **kwargs):
'''The Agent should look at the ws to see if it has succeeded, and
update its AgentState accordingly. Default implementation: revert
to previous AgentState if current AgentState is MustCheckIfSucceeded.'''
st = fm.agent_state(self)
if isinstance(st, MustCheckIfSucceeded):
fm.set_agent_state(self, st.prev_agentstate)
Agents = Union[Agent, List[Agent], None]
class Detector(ABC):
@abstractmethod
def look(self, fm: 'FARGModel'):
pass
@dataclass
class ElemInWS:
'''An element in the workspace.'''
elem: Elem
builder: Union[Agent, None]
# .builder should not be used by Agents; it's strictly for debugging
# and reporting on model behavor.
behalf_of: List[Agent] = field(init=False, default_factory=list)
# .behalf_of is all the Agents that want this Elem to exist, including
# but not limited to its builder.
tob: int # time of birth (when Elem was added to the ws)
# activation: float = 1.0
# overrides for contents?
# overrides for orientation?
def add_behalf_of(self, agents):
self.behalf_of += as_iter(agents)
def __str__(self):
return f'{self.elem} builder={self.builder} tob={self.tob}'
# Exceptions
@dataclass(frozen=True)
class Halt(Exception):
pass
@dataclass(frozen=True)
class ValuesNotAvail(Exception):
#container: Hashable # Change this to a CellRef?
cellref: Union['CellRef', None]
avails: Tuple[Value, ...]
unavails: Tuple[Value, ...]
def __str__(self):
cl = self.__class__.__name__
return f'{cl}({self.cellref}, avails={self.avails}, unavails={self.unavails})'
# Generic FARG model
@dataclass
class FARGModel:
ws: Dict[Elem, ElemInWS] = field(default_factory=dict)
t: int = 0
slipnet: Slipnet = None # type: ignore[assignment] # mypy how?
seed: Union[int, None] = None
activation_g: ActivationGraph = field(
default_factory=ActivationGraph, init=False
)
sleeping: Dict[Elem, int] = field(
default_factory=dict, init=False
)
_agent_states: Dict[Elem, AgentState] = field(
default_factory=dict, init=False
)
mutual_support_weight: float = 1.0
mutual_antipathy_weight: float = -0.2
globals: Dict[str, Any] = field(default_factory=dict, init=False)
force_slipnet_result: Hashable = field(default=None, init=False)
# Initialization
def __post_init__(self):
self.seed = reseed(self.seed)
if self.slipnet is None:
self.slipnet = Slipnet.empty()
self.fill_slipnet()
def fill_slipnet(self):
'''Should add nodes and edges to self.slipnet(). Default implementation
does nothing.'''
pass
# Whole-codelet functions
def build(self, *args, **kwargs) -> Elem:
'''The arguments specify an Elem to build in the workspace. If such
an Elem already exists, we don't build anything. Returns the built
or found Elem.'''
if not args:
raise NotImplementedError('still need to provide object to .build')
if isclass(args[0]):
raise NotImplementedError(".build can't yet construct the object for you")
# TODO with logging('BUILT', args, kwargs)
obj = args[0]
builder = kwargs.pop('builder', None)
init_a = kwargs.pop('init_a', None)
min_a = kwargs.pop('min_a', None) # only has effect on true build
max_a = kwargs.pop('max_a', None) # only has effect on true build
if obj is None: # attempting to build None builds nothing
return None
eiws = self.get_eiws(obj)
if eiws is None: # the Elem is not there, so now we really build it
self.ws[obj] = eiws = ElemInWS(obj, builder=builder, tob=self.t)
#print('BUILT', obj)
if init_a is None:
if builder is None:
init_a = 1.0
else:
init_a = min(1.0, self.a(builder))
if min_a is not None:
force_setattr(obj, 'min_a', min_a)
min_a = getattr(obj, 'min_a', None)
if max_a is not None:
force_setattr(obj, 'max_a', max_a)
max_a = getattr(obj, 'max_a', None)
init_a = clip(min_a, max_a, init_a)
self.activation_g.add_node(obj, a=init_a)
# create antipathy between obj and its enemies
for elem in self.elems(HasAntipathyTo(obj, ignore=builder)):
self.add_mutual_antipathy(obj, elem)
# call the obj's .on_build() method
if isinstance(obj, Agent):
self._agent_states[obj] = Active()
try:
obj.on_build(self, **kwargs)
except AttributeError:
pass
else: # the Elem is already there, so don't build a new one
obj = eiws.elem
if builder:
eiws.add_behalf_of(builder)
self.add_mutual_support(builder, obj)
return obj
def paint(self, cr: 'CellRef', v: Value):
cr.paint(v)
def run(self, agent: Agent, **kwargs):
'''Calls the agent's .go() method regardless of whether its .can_go()
would return False.'''
# TODO Not if agent doesn't exist?
# TODO Document force_slipnet_result
with PushAttr(self, 'force_slipnet_result'):
self.force_slipnet_result = kwargs.pop('force_slipnet_result', None)
if isinstance(self.agent_state(agent), MustCheckIfSucceeded):
agent.check_if_succeeded(self, **kwargs)
else:
agent.go(self, **kwargs) # TODO Supply overrides from eiws?
agent.update_support(self)
def pulse_slipnet(
self,
activations_in: Dict[Hashable, float],
type: Union[Type, None]=None,
k: int=20,
num_get: int=1, # number of slipnodes to return
filter: Union[Callable, None]=lambda x: True
) -> List[Hashable]:
if self.force_slipnet_result is not None:
return as_list(self.force_slipnet_result)
sd = self.slipnet.dquery(activations_in=activations_in)
nas = self.slipnet.topna(sd, pred=type, k=k)
#print('PULSE')
#pr(activations_in)
#hs = list(self.slipnet.base_graph.hops_from_node(Before(4)))
#pr(len(hs))
#pr(sd)
return list(sample_without_replacement(
[na.node for na in nas],
k=num_get,
weights=[na.a for na in nas]
))
# Activation
def a(self, node: Hashable) -> float:
'''Current activation level of node.'''
return self.activation_g.a(node)
def ae_weight(self, from_node: Hashable, to_node: Hashable) -> float:
'''Activation edge weight. 0.0 if either node does not exist.'''
try:
return self.activation_g.edges[from_node, to_node]['weight']
except KeyError:
return 0.0
def sum_a(self) -> float:
return sum(self.a(elem) for elem in self.elems())
def boost(self, node: Hashable, amt=None):
self.activation_g.boost(node, amt=amt)
def give_boost(self, from_elem: Elem, to_elem: Elems):
for t in as_iter(to_elem):
self.boost(t, amt=self.a(from_elem))
def downboost(self, node: Hashable):
# HACK to make a node quiet down for a bit after it's done something;
# better to explicitly sleep for a few timesteps?
self.activation_g.nodes[node]['a'] /= 2.0
def deactivate(self, node: Hashable):
self.activation_g.nodes[node]['a'] = 0.0
def propagate_a(self, num: int=1):
'''Propagate activation. num is the number of iterations--how many
times we call the propagator, not how many spreading steps happen
in each iteration.'''
for _ in range(num):
self.activation_g.propagate()
# Support
def add_mutual_support(
self, a: Hashable, b: Hashable, weight: Union[float, None]=None
):
if weight is None:
weight = self.mutual_support_weight
self.activation_g.add_edge(a, b, weight=weight)
self.activation_g.add_edge(b, a, weight=weight)
set_mutual_support = add_mutual_support
def set_support_edge(
self, a: Hashable, b: Hashable, weight: Union[float, None]=None
):
if weight is None:
weight = self.mutual_support_weight
self.activation_g.add_edge(a, b, weight=weight)
def add_mutual_antipathy(
self, a: Hashable, b: Hashable, weight: Union[float, None]=None
):
if weight is None:
weight = self.mutual_antipathy_weight
self.activation_g.add_edge(a, b, weight=weight)
self.activation_g.add_edge(b, a, weight=weight)
# Querying
def ws_query(
self,
pred: Union[Type, Callable, None]=None,
min_a: Union[float, None]=None,
max_n: int=1
) -> Iterable[Elem]:
'''Returns generator of up to max_n nodes that match pred,
chosen randomly, weighted by activation.'''
elems = self.elems(pred)
if min_a is not None:
elems = (e for e in elems if self.a(e) >= min_a)
elems = list(elems)
activations = [self.a(e) for e in elems]
yield from sample_without_replacement(
elems, weights=activations, k=max_n
)
# TODO Make an MPred type: things that are or can be made into a predicate
# that returns a number indicating quality of match.
# TODO Make as_mpred().
def vals_query(self, vals: Iterable, mpred: Callable, k: int=1) -> Iterable:
'''Returns up to k values from vals, chosen randomly with weight
returned by mpred.'''
vals, weights = self.vals_weights(vals, mpred)
return list(sample_without_replacement(
vals, weights=weights, k=k
))
def vals_weights(self, vals: Iterable, mpred: Callable, threshold=0.1) \
-> Tuple[List[Any], List[float]]:
'''Returns vals and weights, weighted by mpred. Filters out vals
whose weights are below threshold.'''
# TODO Add noise.
# TODO Involve the slipnet. Pass activations_in and/or features of
# the vals.
mpred = as_mpred(mpred)
vs = []
ws = []
for v in as_list(vals):
w = mpred(v)
if w < threshold:
continue
vs.append(v)
ws.append(w)
return (vs, ws)
def is_tagged(self, elems, tagpred) -> bool:
'''Are any of elems tagged with a tag matching tagpred?'''
# TODO Optimize; this now searches the entire ws
return any(
self.is_tagging(tag, elem)
for elem in as_iter(elems)
for tag in self.elems(pred=tagpred)
)
def is_tagging(self, tag: Elem, elem: Elem) -> bool:
try:
return tag.is_tagging(elem) # type: ignore[attr-defined] # mypy how?
except AttributeError:
return elem in self.taggees_of(tag)
def taggees_of(self, tag: Elem) -> Iterable[Elem]:
try:
yield tag.taggees_of() # type: ignore[attr-defined] # mypy how?
except AttributeError:
yield tag.taggee # type: ignore[attr-defined] # mypy how?
except AttributeError:
yield from tag.taggees # type: ignore[attr-defined] # mypy how?
except AttributeError:
return
def is_blocked(self, elems) -> bool:
return self.is_tagged(elems, Blocked)
def is_sleeping(self, elem: Elem):
return elem in self.sleeping
def degree(self, a: Elem) -> int:
return len(self.neighbors(a))
def neighbors(self, e: Elem) -> Collection[Elem]: #List[Elem]:
g = self.activation_g
return set(list(g.successors(e)) + list(g.predecessors(e)))
# Ancillary functions, callable by codelets and Agents
def elems(self, pred=None, es=None) -> Iterable[Elem]:
'''Returns a generator for *all* matches of pred. Unlike .ws_query(),
.elems() does not make a weighted choice.'''
fmpred = as_fmpred(pred)
if es is None:
es = self.ws.keys()
return (e for e in as_iter(es) if fmpred(self, e))
def get_eiws(self, obj) -> Union[ElemInWS, None]:
'''Returns the ElemInWS from the workspace if it exists, otherwise
None.'''
return self.ws.get(obj, None)
# TODO UT
def override(self, elem: Elem, **kwargs):
'''Replaces elem with an elem containing new values, taken from
kwargs.'''
# MAJOR TODO We really should store the overrides in the ElemInWS,
# and require that all code dereference an Elem to get the overridden
# version. Tracking stale references is just too hard. If all
# Agent actions are done inside Codelet objects, called by fm,
# this shouldn't be too hard. The model itself should figure out
# how to deal with messy situations that result from overrides.
# TODO This should be done as a paint operation.
# HACK This assumes that Elem is a dataclass.
if not kwargs:
return
old_eiws = self.get_eiws(elem)
if not old_eiws:
return
new_elem = replace(elem, **kwargs)
if new_elem in self.ws:
raise NotImplementedError
else:
self.ws[new_elem] = new_eiws = replace(old_eiws, elem=new_elem)
self.activation_g.add_node(new_elem, a=self.a(elem))
for u, v, d in chain(
self.activation_g.in_edges(elem, data=True),
):
self.activation_g.add_edge(u, new_elem, **d)
for u, v, d in chain(
self.activation_g.out_edges(elem, data=True)
):
self.activation_g.add_edge(new_elem, v, **d)
self.set_agent_state(new_elem, self.agent_state(elem)) # type: ignore #BUG
self.remove_elem(elem)
def remove_elem(self, elem):
# TODO Must remove all references to elem
for d in [self.ws, self._agent_states, self.sleeping]:
try:
del d[elem]
except KeyError:
pass
self.activation_g.remove_node(elem)
def builder_of(self, elem: Elem) -> Union[Elem, None]:
try:
eiws = self.ws[elem]
except KeyError:
return None
return eiws.builder
def built_by(self, agent: Agent) -> Collection[Elem]:
# TODO Optimize: this checks all Elems in the ws
return [
e for e in self.elems()
if self.builder_of(e) is agent
]
def behalf_of(self, elem: Elem) -> Sequence[Agent]:
try:
return self.ws[elem].behalf_of
except KeyError:
return []
def has_antipathy_to(self, a: Hashable, b: Hashable) -> bool:
try:
return a.has_antipathy_to(b) # type: ignore[attr-defined] # mypy how?
except AttributeError:
return False
def sleep(self, elem: Elem, num_timesteps=2):
if elem in self.sleeping:
self.sleeping[elem] += num_timesteps
else:
# TODO only if elem is in the ws
self.sleeping[elem] = self.t + num_timesteps
def can_go(self, agent: Agent) -> bool:
if not isinstance(agent, Agent):
return False
if self.is_sleeping(agent):
return False
if not self._agent_states[agent].can_go():
return False
if self.is_tagged(agent, Blocked):
return False
return agent.can_go(self)
def ok_to_paint(self, painter: Agent, cellref: 'CellRef') -> bool:
# TODO Check that the painter beats its competition?
# TODO Get threshold information from cellref?
return (
not cellref.has_value()
and
self.a(painter) >= 1.0
)
def agent_state(self, agent: Agent):
return self._agent_states.get(agent, None)
def set_agent_state(self, agent: Agent, agent_state: AgentState):
self._agent_states[agent] = agent_state
def awaiting_delegate(self, agent: Agent, delegate: Agent):
'''Marks that agent is waiting for delegate to succeed.'''
# TODO UT for multiple delegates
st = self.agent_state(agent)
if isinstance(st, AwaitingDelegate):
st.add_delegate(delegate)
else:
self.set_agent_state(agent, AwaitingDelegate(delegate))
def succeeded(self, agent: Agent):
self._agent_states[agent] = Succeeded()
for a in self.behalf_of(agent):
if not isinstance(a, Agent):
continue
if not self.has_succeeded(a):
self.must_check_if_succeeded(a, agent)
def has_succeeded(self, agent: Agent) -> bool:
try:
return self.agent_state(agent).succeeded
except AttributeError: # if no such agent
return False
def must_check_if_succeeded(
self,
agent: Agent,
delegate: Union[Agent, None]=None
):
prev_agentstate = self._agent_states[agent]
self._agent_states[agent] = \
MustCheckIfSucceeded(prev_agentstate, delegate)
self.give_boost(delegate, agent)
self.unsleep(agent) # TODO UT
# Timestep functions
def do_timestep(
self, ag: Union[Agent, None]=None, num: int=1, until=None
):
'''act: whether to force agents to .act even if the current timestep
isn't designated for acting.
until: last timestep; overrides num.'''
if until is None:
until = self.t + num
while self.t < until:
self.t += 1
self.remove_sleepers()
self.run_detectors()
if ag is None:
agent = self.choose_agent_by_activation(self.can_go)
else:
agent = ag
if agent:
#print('AGENT', agent)
self.run(agent)
#self.activation_g.decay()
self.activation_g.propagate()
#self.activation_g.pr_flows() #DIAG
self.log_activations()
#print(self) #DIAG
#agent.go(self)
def log_activations(self):
mode = 'w' if self.t == 1 else 'a'
with open(self.globals.get('alog', 'a.csv'), mode=mode, newline='') \
as csvfile:
writer = csv.writer(csvfile, quoting=csv.QUOTE_NONNUMERIC)
for node in self.nodes_to_log():
writer.writerow([self.t, node, self.a(node)])
def nodes_to_log(self) -> Iterable[Hashable]:
return self.elems(self.globals.get('logpred', None))
def run_detectors(self):
for detector in list(self.elems(Detector)):
#print('look:', detector) #DIAG
detector.look(self)
def choose_agent_by_activation(self, pred: Callable):
# TODO OAOO .search_ws
agents = list(self.ws_query(pred))
# GLOBAL constant in next line
activations = [self.a(agent) ** 2.0 for agent in agents]
return first(sample_without_replacement(agents, weights=activations))
def remove_sleepers(self):
for waking in [
elem for (elem, t) in self.sleeping.items() if t <= self.t
]:
del self.sleeping[waking]
def unsleep(self, agent: Agent):
'''Removes agent from the sleeping list. Does not boost agent's
activation or otherwise notify agent.'''
try:
del self.sleeping[agent]
except KeyError:
pass
# Debugging and reporting
def the(self, pred, es=None) -> Union[Elem, None]:
'''Returns the first element from .elems(), or None if there isn't
one.'''
return first(self.elems(pred=pred, es=es))
def __len__(self):
'''Returns number of Elems in the ws.'''
return len(self.ws)
def __repr__(self):
cl = self.__class__.__name__
return f'<{cl} object, t={self.t}>'
def __str__(self):
result = StringIO()
#print(f't={self.t} sum_a={self.sum_a():2.3f}', file=result)
#print(self.elines(self.elems(), tofile=result))
self.pr(tofile=result, extra=True, seed=True)
return result.getvalue()
def is_mutual_support(self, a: Elem, b: Elem) -> bool:
return (
self.ae_weight(a, b) > 0.0
and
self.ae_weight(b, a) > 0.0
)
def l1str(self, eiws: Union[ElemInWS, Elem], indent=None) -> str:
'''The one-line string for a ws elem, showing its activation.'''
if indent is None:
indent = ' '
if not isinstance(eiws, ElemInWS):
eiws = self.ws[eiws] # TODO if the elem does not exist
result = f'{indent}{self.a(eiws.elem): 7.3f} {eiws} deg={self.degree(eiws.elem)}'
if isinstance(eiws.elem, Agent):
if self.is_blocked(eiws.elem):
bl = ' Blocked+'
else:
bl = ''
result += f' {bl}{self.agent_state(eiws.elem)}'
return result
def e1str(self, node1: Elem, node2: Elem, indent=None) -> str:
'''The one-line string for the edge from node1 to node2. Does not
show node1. Indented one level further than 'indent'.'''
if indent is None:
indent = ' '
outgoing_weight = self.ae_weight(node1, node2)
incoming_weight = self.ae_weight(node2, node1)
if outgoing_weight != 0.0:
if incoming_weight != 0.0:
arrow = f'{outgoing_weight: 6.3f} <--> {incoming_weight: 6.3f}'
else:
arrow = f'{outgoing_weight: 6.3f} --> '
else:
arrow = f' <-- {incoming_weight: 6.3f}'
#return f'{indent} {weight: 7.3f} --> {node2} a={self.a(node2):2.3f}'
return f'{indent} {arrow} {node2} a={self.a(node2):2.3f}'
def pr(
self,
pred: Union[Type, Callable, None]=None,
es=None, # Elem, Elems, or None for all Elems
tofile=None,
indent=None,
edges=False,
extra=False, # extra stuff like t, sum_a, and seed
seed=False, # show seed?
**kwargs
):
'''Prints a subset of the workspace.'''
if extra:
print(f't={self.t} elems={len(self.ws)} sum_a={self.sum_a():2.3f}', file=tofile)
count = 0
for s, elem in sorted(
(self.l1str(elem, indent), elem)
for elem in self.elems(pred=pred, es=es)
):
count += 1
print(s, file=tofile)
if edges:
for e in sorted(
self.e1str(elem, neighbor)
for neighbor in self.neighbors(elem)
):
print(' ', e, file=tofile)
if pred:
print(f'n={count}', file=tofile)
if seed:
print(f'seed={self.seed}')
def pr_flows(self):
print(f'FLOWS t={self.t}')
self.activation_g.pr_flows()
@dataclass(frozen=True)
class HasAntipathyTo:
elem: Elem
ignore: Union[Elem, None]=None
def __call__(self, fm: FARGModel, elem) -> bool:
if elem == self.ignore:
return False
else:
return fm.has_antipathy_to(self.elem, elem)
@dataclass(frozen=True)
class StateDelta:
'''A change from one state to another.'''
before: Any # What got replaced
after: Any # What replaced it
how: Union[Any, None] # Some clue about how it happened
def seq_str(self):
'''How to display this inside SeqState.__str__.'''
return str(self)
@dataclass(frozen=True)
class SeqState(HasAvailValues):
avails: Union[Sequence[Value], None] = None
last_move: Union[StateDelta, None] = None
# TODO Put this into a WithAvails mix-in.
def take_avails(self, values: Iterable[Value]) \
-> Tuple[Iterable[Value], Iterable[Value]]:
'''Returns (taken_avails, remaining_avails). Might raise
ValuesNotAvail.'''
remaining_avails: List[Value] = \
[] if self.avails is None else list(self.avails)
taken_avails: List[Value] = []
missing_avails: List[Value] = []
for v in values:
try:
remaining_avails.remove(v)
except ValueError:
taken_avails.append(None)
missing_avails.append(v)
else:
taken_avails.append(v)
missing_avails.append(None)
if any(t is None for t in taken_avails):
raise ValuesNotAvail(
None,
tuple(taken_avails),
tuple(missing_avails)
)
return (taken_avails, remaining_avails)
def has_avail_value(self, v):
return v in self.avails
def last_move_str(self):
try:
return self.last_move.seq_str()
except AttributeError:
return str(self.last_move)
@property
def before(self):
return dig_attr(self.last_move, 'before')
@property
def after(self):
return dig_attr(self.last_move, 'after')
def __str__(self):
if self.avails is None:
avails_str = str(self.avails)
else:
avails_str = f"({' '.join(str(a) for a in self.avails)})"
if self.last_move is None:
return avails_str
else:
return f'{self.last_move_str()} {avails_str}'
class Canvas(ABC):
# TODO get an iterator of all CellRefs, search for a value
@abstractmethod
def __getitem__(self, addr: Addr) -> Value:
pass
@abstractmethod
def __setitem__(self, addr: Addr, v: Value) -> None:
pass
@dataclass(eq=False)
class SeqCanvas(Canvas):
states: List[Union[SeqState, None]] = field(default_factory=list)
min_a: ClassVar[float] = 1.0
def __hash__(self):
return id(self)
def __getitem__(self, addr: Addr) -> Value:
# TODO Handle addr that can't be found or is not an index
#print('SEQCGET', addr, len(self.states))
if isinstance(addr, int) and addr < len(self.states):
return self.states[addr]
else:
return None