Skip to content

Commit

Permalink
Merge pull request #1192 from lightninglabs/rfq-order-validation
Browse files Browse the repository at this point in the history
[rfq]: add validation to `AddAssetBuyOrder` and `AddAssetSellOrder` RPCs
  • Loading branch information
guggero authored Nov 21, 2024
2 parents 965edcd + e0d9ea1 commit 39d1889
Show file tree
Hide file tree
Showing 5 changed files with 394 additions and 243 deletions.
80 changes: 50 additions & 30 deletions itest/rfq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,25 +87,35 @@ func testRfqAssetBuyHtlcIntercept(t *harnessTest) {
bidAmt := uint64(90000)
buyOrderExpiry := uint64(time.Now().Add(24 * time.Hour).Unix())

_, err = ts.CarolTapd.AddAssetBuyOrder(
ctxt, &rfqrpc.AddAssetBuyOrderRequest{
AssetSpecifier: &rfqrpc.AssetSpecifier{
Id: &rfqrpc.AssetSpecifier_AssetId{
AssetId: mintedAssetId,
},
// We first try to add a buy order without specifying the asset skip
// flag. That should result in an error, since we only have a normal
// channel and not an asset channel.
buyReq := &rfqrpc.AddAssetBuyOrderRequest{
AssetSpecifier: &rfqrpc.AssetSpecifier{
Id: &rfqrpc.AssetSpecifier_AssetId{
AssetId: mintedAssetId,
},
AssetMaxAmt: purchaseAssetAmt,
Expiry: buyOrderExpiry,
},
AssetMaxAmt: purchaseAssetAmt,
Expiry: buyOrderExpiry,

// Here we explicitly specify Bob as the destination
// peer for the buy order. This will prompt Carol's tapd
// node to send a request for quote message to Bob's
// node.
PeerPubKey: ts.BobLnd.PubKey[:],
// Here we explicitly specify Bob as the destination
// peer for the buy order. This will prompt Carol's tapd
// node to send a request for quote message to Bob's
// node.
PeerPubKey: ts.BobLnd.PubKey[:],

TimeoutSeconds: uint32(rfqTimeout.Seconds()),
},
TimeoutSeconds: uint32(rfqTimeout.Seconds()),
}
_, err = ts.AliceTapd.AddAssetBuyOrder(ctxt, buyReq)
require.ErrorContains(
t.t, err, "error checking peer channel: error checking asset "+
"channel",
)

// Now we set the skip flag and we shouldn't get an error anymore.
buyReq.SkipAssetChannelCheck = true
_, err = ts.CarolTapd.AddAssetBuyOrder(ctxt, buyReq)
require.NoError(t.t, err, "unable to upsert asset buy order")

// Wait until Carol receives an incoming quote accept message (sent from
Expand Down Expand Up @@ -266,25 +276,35 @@ func testRfqAssetSellHtlcIntercept(t *harnessTest) {
askAmt := uint64(42000)
sellOrderExpiry := uint64(time.Now().Add(24 * time.Hour).Unix())

_, err = ts.AliceTapd.AddAssetSellOrder(
ctxt, &rfqrpc.AddAssetSellOrderRequest{
AssetSpecifier: &rfqrpc.AssetSpecifier{
Id: &rfqrpc.AssetSpecifier_AssetId{
AssetId: mintedAssetIdBytes,
},
// We first try to add a sell order without specifying the asset skip
// flag. That should result in an error, since we only have a normal
// channel and not an asset channel.
sellReq := &rfqrpc.AddAssetSellOrderRequest{
AssetSpecifier: &rfqrpc.AssetSpecifier{
Id: &rfqrpc.AssetSpecifier_AssetId{
AssetId: mintedAssetIdBytes,
},
PaymentMaxAmt: askAmt,
Expiry: sellOrderExpiry,
},
PaymentMaxAmt: askAmt,
Expiry: sellOrderExpiry,

// Here we explicitly specify Bob as the destination
// peer for the sell order. This will prompt Alice's
// tapd node to send a request for quote message to
// Bob's node.
PeerPubKey: ts.BobLnd.PubKey[:],
// Here we explicitly specify Bob as the destination
// peer for the sell order. This will prompt Alice's
// tapd node to send a request for quote message to
// Bob's node.
PeerPubKey: ts.BobLnd.PubKey[:],

TimeoutSeconds: uint32(rfqTimeout.Seconds()),
},
TimeoutSeconds: uint32(rfqTimeout.Seconds()),
}
_, err = ts.AliceTapd.AddAssetSellOrder(ctxt, sellReq)
require.ErrorContains(
t.t, err, "error checking peer channel: error checking asset "+
"channel",
)

// Now we set the skip flag and we shouldn't get an error anymore.
sellReq.SkipAssetChannelCheck = true
_, err = ts.AliceTapd.AddAssetSellOrder(ctxt, sellReq)
require.NoError(t.t, err, "unable to upsert asset sell order")

// Wait until Alice receives an incoming sell quote accept message (sent
Expand Down
100 changes: 86 additions & 14 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6340,7 +6340,7 @@ func unmarshalAssetBuyOrder(

// AddAssetBuyOrder upserts a new buy order for the given asset into the RFQ
// manager. If the order already exists for the given asset, it will be updated.
func (r *rpcServer) AddAssetBuyOrder(_ context.Context,
func (r *rpcServer) AddAssetBuyOrder(ctx context.Context,
req *rfqrpc.AddAssetBuyOrderRequest) (*rfqrpc.AddAssetBuyOrderResponse,
error) {

Expand All @@ -6354,13 +6354,24 @@ func (r *rpcServer) AddAssetBuyOrder(_ context.Context,
return nil, fmt.Errorf("error unmarshalling buy order: %w", err)
}

peerStr := fn.MapOptionZ(
buyOrder.Peer, func(peerVertex route.Vertex) string {
return peerVertex.String()
},
// Currently, we require the peer to be specified in the buy order.
peer, err := buyOrder.Peer.UnwrapOrErr(
fmt.Errorf("buy order peer must be specified"),
)
if err != nil {
return nil, err
}

// Check if we have a channel with the peer.
err = r.checkPeerChannel(
ctx, peer, buyOrder.AssetSpecifier, req.SkipAssetChannelCheck,
)
if err != nil {
return nil, fmt.Errorf("error checking peer channel: %w", err)
}

rpcsLog.Debugf("[AddAssetBuyOrder]: upserting buy order "+
"(dest_peer=%s)", peerStr)
"(dest_peer=%s)", peer.String())

// Register an event listener before actually inserting the order, so we
// definitely don't miss any responses.
Expand Down Expand Up @@ -6402,11 +6413,61 @@ func (r *rpcServer) AddAssetBuyOrder(_ context.Context,

case <-timeout:
return nil, fmt.Errorf("timeout waiting for response "+
"(peer=%s)", peerStr)
"(peer=%s)", peer.String())
}
}
}

// checkPeerChannel checks if there is a channel with the given peer. If the
// asset channel check is enabled, it will also check if there is a channel with
// the given asset with the peer.
func (r *rpcServer) checkPeerChannel(ctx context.Context, peer route.Vertex,
specifier asset.Specifier, skipAssetChannelCheck bool) error {

// We want to make sure there is at least a channel between us and the
// peer, otherwise RFQ negotiation doesn't make sense.
switch {
// For integration tests, we can't create asset channels, so we allow
// the asset channel check to be skipped. In this case we simply check
// that we have any channel with the peer.
case skipAssetChannelCheck:
activeChannels, err := r.cfg.Lnd.Client.ListChannels(
ctx, true, false,
)
if err != nil {
return fmt.Errorf("unable to fetch channels: %w", err)
}
peerChannels := fn.Filter(
activeChannels, func(c lndclient.ChannelInfo) bool {
return c.PubKeyBytes == peer
},
)
if len(peerChannels) == 0 {
return fmt.Errorf("no active channel found with peer "+
"%x", peer[:])
}

// For any other case, we'll want to make sure there is a channel with
// a non-zero balance of the given asset to carry the order.
default:
assetID, err := specifier.UnwrapIdOrErr()
if err != nil {
return fmt.Errorf("cannot check asset channel, " +
"missing asset ID")
}

// If we don't get an error here, it means we do have an asset
// channel with the peer.
_, err = r.rfqChannel(ctx, assetID, &peer)
if err != nil {
return fmt.Errorf("error checking asset channel: %w",
err)
}
}

return nil
}

// unmarshalAssetSellOrder unmarshals an asset sell order from the RPC form.
func unmarshalAssetSellOrder(
req *rfqrpc.AddAssetSellOrderRequest) (*rfq.SellOrder, error) {
Expand Down Expand Up @@ -6457,7 +6518,7 @@ func unmarshalAssetSellOrder(

// AddAssetSellOrder upserts a new sell order for the given asset into the RFQ
// manager. If the order already exists for the given asset, it will be updated.
func (r *rpcServer) AddAssetSellOrder(_ context.Context,
func (r *rpcServer) AddAssetSellOrder(ctx context.Context,
req *rfqrpc.AddAssetSellOrderRequest) (*rfqrpc.AddAssetSellOrderResponse,
error) {

Expand All @@ -6472,13 +6533,24 @@ func (r *rpcServer) AddAssetSellOrder(_ context.Context,
err)
}

// Extract peer identifier as a string for logging.
peerStr := fn.MapOptionZ(sellOrder.Peer, func(p route.Vertex) string {
return p.String()
})
// Currently, we require the peer to be specified in the buy order.
peer, err := sellOrder.Peer.UnwrapOrErr(
fmt.Errorf("sell order peer must be specified"),
)
if err != nil {
return nil, err
}

// Check if we have a channel with the peer.
err = r.checkPeerChannel(
ctx, peer, sellOrder.AssetSpecifier, req.SkipAssetChannelCheck,
)
if err != nil {
return nil, fmt.Errorf("error checking peer channel: %w", err)
}

rpcsLog.Debugf("[AddAssetSellOrder]: upserting sell order "+
"(dest_peer=%s)", peerStr)
"(dest_peer=%s)", peer.String())

// Register an event listener before actually inserting the order, so we
// definitely don't miss any responses.
Expand Down Expand Up @@ -6520,7 +6592,7 @@ func (r *rpcServer) AddAssetSellOrder(_ context.Context,

case <-timeout:
return nil, fmt.Errorf("timeout waiting for response "+
"from peer %s", peerStr)
"from peer %s", peer.String())
}
}
}
Expand Down
Loading

0 comments on commit 39d1889

Please sign in to comment.