-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathresource_manager.go
381 lines (314 loc) · 12.1 KB
/
resource_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
package lrc
import (
"errors"
"fmt"
"sync"
"time"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/queue"
)
// MaxMilliSatoshi is the maximum amount of millisatoshi that can possibly
// exist given 21 million bitcoin cap.
const MaxMilliSatoshi = 21_000_000 * 10_000_0000 * 1000
var (
// ErrChannelNotFound is returned when we don't have a channel
// tracked in our internal state.
ErrChannelNotFound = errors.New("channel not found")
)
// Compile time check that ReputationManager implements the
// LocalReputationManager interface.
var _ LocalResourceManager = (*ResourceManager)(nil)
// ResourceManager tracks local reputation earned by incoming channels, and
// the thresholds required to earn endorsement on the outgoing channels
// required to implement resource bucketing for a node's channels.
type ResourceManager struct {
// Parameters for reputation algorithm.
params ManagerParams
// incomingRevenue maps channel ids to decaying averages of the
// revenue that individual channels have earned the local node as
// incoming channels.
// channelReputation tracks information required to track channel
// reputation:
// - The revenue that the channel has earned the local node forwarding
// *incoming* HTLCs.
// - The incoming HTLCs that the channel has forwarded to the local
// node that have not yet resolved.
channelReputation map[lnwire.ShortChannelID]reputationMonitor
// targetChannels tracks the routing revenue that channels have
// earned the local node for both incoming and outgoing HTLCs.
targetChannels map[lnwire.ShortChannelID]targetMonitor
// resolutionPeriod is the period of time that is considered reasonable
// for a htlc to resolve in.
resolutionPeriod time.Duration
// lookupReputation fetches previously persisted resolution values for
// a channel.
lookupReputation LookupReputation
// lookupRevenue fetches previously persisted revenue values for
// a channel.
lookupRevenue LookupRevenue
// newReputationMonitor creates a new reputation monitor, pulled
// out for mocking purposes in tests.
newReputationMonitor NewReputationMonitor
// newTargetMonitor creates a new target monitor, pull out for mocking
// in tests.
newTargetMonitor NewTargetMonitor
clock clock.Clock
log Logger
// A single mutex guarding access to the manager.
sync.Mutex
}
type ChannelFetcher func(lnwire.ShortChannelID) (*ChannelInfo, error)
// LookupReputation is the function signature for fetching a decaying average
// start value for the give channel's reputation. If not history is available
// it is expected to return nil.
type LookupReputation func(id lnwire.ShortChannelID) (*DecayingAverageStart,
error)
// LookupReputation is the function signature for fetching a decaying average
// start value for the give channel's reputation. If not history is available
// it is expected to return nil.
type LookupRevenue func(id lnwire.ShortChannelID) (*DecayingAverageStart,
error)
// NewReputationMonitor is a function signature for a constructor that creates
// a new reputation monitor.
type NewReputationMonitor func(start *DecayingAverageStart) reputationMonitor
// NewTargetMonitor is a function signature for a constructor that creates
// a new target channel revenue monitor.
type NewTargetMonitor func(start *DecayingAverageStart,
chanInfo *ChannelInfo) (targetMonitor, error)
type ManagerParams struct {
// RevenueWindow is the amount of time that we examine the revenue of
// outgoing links over.
RevenueWindow time.Duration
// ReputationMultiplier is the multiplier on RevenueWindow that is
// used to determine the longer period of time that incoming links
// reputation is assessed over.
ReputationMultiplier uint8
// ProtectedPercentage is the percentage of liquidity and slots that
// are reserved for high reputation, endorsed HTLCs.
ProtectedPercentage uint64
// ResolutionPeriod is the amount of time that we reasonably expect
// HTLCs to complete within.
ResolutionPeriod time.Duration
// BlockTime is the expected block time.
BlockTime time.Duration
}
// validate that we have sane parameters.
func (p *ManagerParams) validate() error {
if p.ProtectedPercentage > 100 {
return fmt.Errorf("Percentage: %v > 100", p.ProtectedPercentage)
}
if p.ResolutionPeriod == 0 {
return errors.New("Resolution period must be > 0")
}
if p.BlockTime == 0 {
return errors.New("Block time must be > 0")
}
return nil
}
// revenueWindow returns the period over which we examine revenue.
func (p *ManagerParams) reputationWindow() time.Duration {
return p.RevenueWindow * time.Duration(
p.ReputationMultiplier,
)
}
// NewResourceManager creates a local reputation manager that will track
// channel revenue over the window provided, and incoming channel reputation
// over the window scaled by the multiplier.
func NewResourceManager(params ManagerParams, clock clock.Clock,
lookupReputation LookupReputation,
lookupRevenue LookupRevenue, log Logger) (*ResourceManager, error) {
if err := params.validate(); err != nil {
return nil, err
}
return &ResourceManager{
params: params,
channelReputation: make(
map[lnwire.ShortChannelID]reputationMonitor,
),
targetChannels: make(
map[lnwire.ShortChannelID]targetMonitor,
),
resolutionPeriod: params.ResolutionPeriod,
lookupReputation: lookupReputation,
lookupRevenue: lookupRevenue,
newReputationMonitor: func(start *DecayingAverageStart) reputationMonitor {
return newReputationTracker(
clock, params, log, start,
)
},
newTargetMonitor: func(start *DecayingAverageStart,
chanInfo *ChannelInfo) (targetMonitor, error) {
return newTargetChannelTracker(
clock, params, chanInfo, log, start,
)
},
clock: clock,
log: log,
}, nil
}
// getTargetChannel looks up a channel's revenue record in the reputation
// manager, creating a new decaying average if one if not found. This function
// returns a pointer to the map entry which can be used to mutate its
// underlying value.
func (r *ResourceManager) getTargetChannel(channel lnwire.ShortChannelID,
chanInfo *ChannelInfo) (targetMonitor, error) {
if r.targetChannels[channel] == nil {
revenue, err := r.lookupRevenue(channel)
if err != nil {
return nil, err
}
r.targetChannels[channel], err = r.newTargetMonitor(
revenue, chanInfo,
)
if err != nil {
return nil, err
}
r.log.Infof("Added new revenue channel: %v with start: %v",
channel.ToUint64(), revenue)
}
return r.targetChannels[channel], nil
}
// getChannelReputation looks up a channel's reputation tracker in the
// reputation manager, creating a new tracker if one is not found. This
// function returns a pointer to the map entry which can be used to mutate its
// underlying value.
func (r *ResourceManager) getChannelReputation(
channel lnwire.ShortChannelID) (reputationMonitor, error) {
if r.channelReputation[channel] == nil {
startValue, err := r.lookupReputation(channel)
if err != nil {
return nil, err
}
r.channelReputation[channel] = r.newReputationMonitor(
startValue,
)
r.log.Infof("Adding new channel reputation: %v with start: %v",
channel.ToUint64(), startValue)
}
return r.channelReputation[channel], nil
}
// sufficientReputation returns a reputation check that is used to determine
// whether the forwarding peer has sufficient reputation to forward the
// proposed htlc over the outgoing channel that they have requested.
func (r *ResourceManager) sufficientReputation(htlc *ProposedHTLC,
outgoingChannelRevenue float64) (*ReputationCheck, error) {
incomingChannel, err := r.getChannelReputation(htlc.IncomingChannel)
if err != nil {
return nil, err
}
return &ReputationCheck{
IncomingReputation: incomingChannel.IncomingReputation(),
OutgoingRevenue: outgoingChannelRevenue,
HTLCRisk: outstandingRisk(
float64(r.params.BlockTime), htlc, r.resolutionPeriod,
),
}, nil
}
type htlcIdxTimestamp struct {
ts time.Time
idx int
}
// Less is used to order PriorityQueueItem's by their release time such that
// items with the older release time are at the top of the queue.
//
// NOTE: Part of the queue.PriorityQueueItem interface.
func (r *htlcIdxTimestamp) Less(other queue.PriorityQueueItem) bool {
return r.ts.Before(other.(*htlcIdxTimestamp).ts)
}
// ForwardHTLC returns a boolean indicating whether the HTLC proposed is
// allowed to proceed based on its reputation, endorsement and resources
// available on the outgoing channel. If this function returns true, the HTLC
// has been added to internal state and must be cleared out using ResolveHTLC.
// If it returns false, it assumes that the HTLC will be failed back and does
// not expect any further resolution notification.
func (r *ResourceManager) ForwardHTLC(htlc *ProposedHTLC,
chanOutInfo *ChannelInfo) (*ForwardDecision, error) {
// Validate the HTLC amount. When LND intercepts, it hasn't yet
// checked anything about the HTLC so this value could be manipulated.
if htlc.OutgoingAmount > MaxMilliSatoshi {
return nil, ErrAmtOverflow
}
r.Lock()
defer r.Unlock()
incomingChannel, err := r.getChannelReputation(htlc.IncomingChannel)
if err != nil {
return nil, err
}
outgoingChannel, err := r.getTargetChannel(
htlc.OutgoingChannel, chanOutInfo,
)
if err != nil {
return nil, err
}
// Get a forwarding decision from the outgoing channel, considering
// the reputation of the incoming channel.
forwardDecision := outgoingChannel.AddInFlight(
incomingChannel.IncomingReputation(), htlc,
)
// If we do proceed with the forward, then add it to our incoming
// link, tracking our outgoing endorsement status.
if err := incomingChannel.AddInFlight(
htlc, forwardDecision.ForwardOutcome,
); err != nil {
return nil, err
}
return &forwardDecision, nil
}
// ResolveHTLC updates the reputation manager's state to reflect the
// resolution. If the incoming channel or the in flight HTLC are not found
// this operation is a no-op.
// TODO: figure out whether we should allow this API to be called and then the
// corresponding forward is not found (depends on replay logic).
func (r *ResourceManager) ResolveHTLC(htlc *ResolvedHTLC) (*InFlightHTLC,
error) {
r.Lock()
defer r.Unlock()
// Fetch the in flight HTLC from the incoming channel and add its
// effective fees to the incoming channel's reputation.
incomingChannel := r.channelReputation[htlc.IncomingChannel]
if incomingChannel == nil {
return nil, fmt.Errorf("Incoming success=%v %w: %v(%v) -> %v(%v)",
htlc.Success, ErrChannelNotFound,
htlc.IncomingChannel.ToUint64(),
htlc.IncomingIndex, htlc.OutgoingChannel.ToUint64(),
htlc.OutgoingIndex,
)
}
// Resolve the HTLC on the incoming channel. If it's not found, it's
// possible that we only started tracking after the HTLC was forwarded
// so we log the event and return without error.
inFlight, err := incomingChannel.ResolveInFlight(htlc)
if err != nil {
return nil, err
}
// If the htlc was not assigned any outgoing resources, then it would
// not have been allocated any resources on our outgoing link (it is
// expected to have been failed back), so we can exit here.
if inFlight.OutgoingDecision == ForwardOutcomeNoResources {
return inFlight, nil
}
// It's possible that after we intercepted the HTLC it was forwarded
// over another channel (non-strict forwarding). This is only an issue
// when we're using an external interceptor (when built into a solution,
// we'd know which channel we used).
if inFlight.OutgoingChannel != htlc.OutgoingChannel {
r.log.Debugf("Non-strict forwarding: %v used instead of %v",
htlc.OutgoingChannel, inFlight.OutgoingChannel)
}
// Update state on the outgoing channel as well, likewise if we can't
// find the channel we're receiving a resolution that we didn't catch
// on the add. We use the outgoing channel specified by the in-flight
// HTLC, as that's where we added the in-flight HTLC.
outgoingChannel := r.targetChannels[inFlight.OutgoingChannel]
if outgoingChannel == nil {
return nil, fmt.Errorf("Outgoing success=%v %w: %v(%v) -> %v(%v)",
htlc.Success, ErrChannelNotFound,
htlc.IncomingChannel.ToUint64(),
htlc.IncomingIndex,
htlc.OutgoingChannel.ToUint64(),
htlc.OutgoingIndex)
}
outgoingChannel.ResolveInFlight(htlc, inFlight)
return inFlight, nil
}