-
Notifications
You must be signed in to change notification settings - Fork 7
/
arc.go
148 lines (121 loc) · 3.34 KB
/
arc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package shift
import (
"context"
"database/sql"
"github.com/luno/jettison/errors"
"github.com/luno/jettison/j"
"github.com/luno/reflex"
"github.com/luno/reflex/rsql"
)
// NewArcFSM returns a new ArcFSM builder.
func NewArcFSM(events eventInserter[int64], opts ...option) arcbuilder {
fsm := ArcFSM{
updates: make(map[int][]tuple),
events: events,
}
for _, opt := range opts {
opt(&fsm.options)
}
return arcbuilder(fsm)
}
type arcbuilder ArcFSM
func (b arcbuilder) Insert(st Status, inserter Inserter[int64]) arcbuilder {
b.inserts = append(b.inserts, tuple{
Status: st.ShiftStatus(),
Type: inserter,
})
return b
}
func (b arcbuilder) Update(from, to Status, updater Updater[int64]) arcbuilder {
tups := b.updates[from.ShiftStatus()]
tups = append(tups, tuple{
Status: to.ShiftStatus(),
Type: updater,
})
b.updates[from.ShiftStatus()] = tups
return b
}
func (b arcbuilder) Build() *ArcFSM {
fsm := ArcFSM(b)
return &fsm
}
type tuple struct {
Status int
Type interface{}
}
// ArcFSM is a defined Finite-State-Machine that allows specific mutations of
// the domain model in the underlying sql table via inserts and updates.
// All mutations update the status of the model, mutates some fields and
// inserts a reflex event.
//
// ArcFSM doesn't have the restriction of FSM and can be defined with arbitrary transitions.
type ArcFSM struct {
options
events eventInserter[int64]
inserts []tuple
updates map[int][]tuple
}
func (fsm *ArcFSM) Insert(ctx context.Context, dbc *sql.DB, st Status, inserter Inserter[int64]) (int64, error) {
tx, err := dbc.Begin()
if err != nil {
return 0, err
}
defer tx.Rollback()
id, notify, err := fsm.InsertTx(ctx, tx, st, inserter)
if err != nil {
return 0, err
}
err = tx.Commit()
if err != nil {
return 0, err
}
notify()
return id, nil
}
func (fsm *ArcFSM) InsertTx(ctx context.Context, tx *sql.Tx, st Status, inserter Inserter[int64]) (int64, rsql.NotifyFunc, error) {
var found bool
for _, tup := range fsm.inserts {
if tup.Status == st.ShiftStatus() && sameType(tup.Type, inserter) {
found = true
break
}
}
if !found {
return 0, nil, errors.Wrap(ErrInvalidStateTransition, "invalid insert status and inserter", j.KV("status", st.ShiftStatus()))
}
return insertTx(ctx, tx, st, inserter, fsm.events, reflex.EventType(st), fsm.options)
}
func (fsm *ArcFSM) Update(ctx context.Context, dbc *sql.DB, from, to Status, updater Updater[int64]) error {
tx, err := dbc.Begin()
if err != nil {
return err
}
defer tx.Rollback()
notify, err := fsm.UpdateTx(ctx, tx, from, to, updater)
if err != nil {
return err
}
err = tx.Commit()
if err != nil {
return err
}
notify()
return nil
}
func (fsm *ArcFSM) UpdateTx(ctx context.Context, tx *sql.Tx, from, to Status, updater Updater[int64]) (rsql.NotifyFunc, error) {
tl, ok := fsm.updates[from.ShiftStatus()]
if !ok {
return nil, errors.Wrap(ErrInvalidStateTransition, "invalid update from status", j.KV("status", from.ShiftStatus()))
}
var found bool
for _, tup := range tl {
if tup.Status == to.ShiftStatus() && sameType(tup.Type, updater) {
found = true
break
}
}
if !found {
return nil, errors.Wrap(ErrInvalidStateTransition, "invalid update to status and updater", j.KV("status", from.ShiftStatus()))
}
return updateTx(ctx, tx, from, to, updater, fsm.events, reflex.EventType(to), fsm.options)
}