From cba7cb020fafe8a7c896a8b8b2d75e8f8609293a Mon Sep 17 00:00:00 2001 From: ffranr Date: Tue, 29 Aug 2023 18:35:21 +0100 Subject: [PATCH] multi: add universe RPC proof courier --- proof/archive.go | 2 + proof/courier.go | 261 +++++++++++++++++++++++++++++++++++++ tapdb/assets_store_test.go | 8 +- tapgarden/custodian.go | 1 + 4 files changed, 270 insertions(+), 2 deletions(-) diff --git a/proof/archive.go b/proof/archive.go index 23faccf18..6f03fd826 100644 --- a/proof/archive.go +++ b/proof/archive.go @@ -57,6 +57,8 @@ type Locator struct { // ScriptKey specifies the script key of the asset to fetch/store. This // field MUST be specified. ScriptKey btcec.PublicKey + + OutPoint *wire.OutPoint } // Hash returns a SHA256 hash of the bytes serialized locator. diff --git a/proof/courier.go b/proof/courier.go index 72e8b9e90..65ace6f62 100644 --- a/proof/courier.go +++ b/proof/courier.go @@ -14,6 +14,8 @@ import ( "github.com/lightninglabs/lightning-node-connect/hashmailrpc" "github.com/lightninglabs/taproot-assets/asset" "github.com/lightninglabs/taproot-assets/fn" + "github.com/lightninglabs/taproot-assets/taprpc" + unirpc "github.com/lightninglabs/taproot-assets/taprpc/universerpc" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" @@ -37,6 +39,10 @@ const ( // TODO(ffranr): Rename to HashmailCourier (use protocol name rather // than service). ApertureCourier = "hashmail" + + // UniverseRpcCourierType is a courier that uses the daemon universe RPC + // endpoints to deliver proofs. + UniverseRpcCourierType = "universerpc" ) // CourierHarness interface is an integration testing harness for a proof @@ -100,6 +106,8 @@ func ParseCourierAddrUrl(addr url.URL) (CourierAddr, error) { switch addr.Scheme { case ApertureCourier: return NewHashMailCourierAddr(addr) + case UniverseRpcCourierType: + return NewUniverseRpcCourierAddr(addr) } return nil, fmt.Errorf("unknown courier address protocol: %v", @@ -164,6 +172,70 @@ func NewHashMailCourierAddr(addr url.URL) (*HashMailCourierAddr, error) { }, nil } +// UniverseRpcCourierAddr is a universe RPC protocol specific implementation of +// the CourierAddr interface. +type UniverseRpcCourierAddr struct { + addr url.URL +} + +// Url returns the url.URL representation of the courier address. +func (h *UniverseRpcCourierAddr) Url() *url.URL { + return &h.addr +} + +// NewCourier generates a new courier service handle. +func (h *UniverseRpcCourierAddr) NewCourier(_ context.Context, + cfg *CourierCfg, recipient Recipient) (Courier, error) { + + // Ensure that the courier address is a universe RPC address. + if h.addr.Scheme != UniverseRpcCourierType { + return nil, fmt.Errorf("unsupported courier protocol: %v", + h.addr.Scheme) + } + + // Connect to the universe RPC server. + dialOpts, err := serverDialOpts() + if err != nil { + return nil, err + } + + serverAddr := fmt.Sprintf( + "%s:%s", h.addr.Hostname(), h.addr.Port(), + ) + conn, err := grpc.Dial(serverAddr, dialOpts...) + if err != nil { + return nil, err + } + + client := unirpc.NewUniverseClient(conn) + + // Instantiate the events subscribers map. + subscribers := make( + map[uint64]*fn.EventReceiver[fn.Event], + ) + + return &UniverseRpcCourier{ + recipient: recipient, + client: client, + deliveryLog: cfg.DeliveryLog, + subscribers: subscribers, + }, nil +} + +// NewUniverseRpcCourierAddr generates a new universe RPC courier address from a +// given URL. This function also performs protocol specific address validation. +func NewUniverseRpcCourierAddr(addr url.URL) (*UniverseRpcCourierAddr, error) { + // We expect the port number to be specified. + if addr.Port() == "" { + return nil, fmt.Errorf("proof courier URI address port " + + "unspecified") + } + + return &UniverseRpcCourierAddr{ + addr, + }, nil +} + // NewCourier instantiates a new courier service handle given a service URL // address. func NewCourier(ctx context.Context, addr url.URL, cfg *CourierCfg, @@ -854,6 +926,195 @@ func (h *HashMailCourier) SetSubscribers( // proof.Courier interface. var _ Courier = (*HashMailCourier)(nil) +// UniverseRpcCourier is a universe RPC proof courier service handle. It +// implements the Courier interface. +type UniverseRpcCourier struct { + // recipient describes the recipient of the proof. + recipient Recipient + + // client is the RPC client that the courier will use to interact with + // the universe RPC server. + client unirpc.UniverseClient + + // deliveryLog is the log that the courier will use to record the + // attempted delivery of proofs to the receiver. + deliveryLog DeliveryLog + + // subscribers is a map of components that want to be notified on new + // events, keyed by their subscription ID. + subscribers map[uint64]*fn.EventReceiver[fn.Event] + + // subscriberMtx guards the subscribers map and access to the + // subscriptionID. + subscriberMtx sync.Mutex +} + +// DeliverProof attempts to delivery a proof file to the receiver. +func (c *UniverseRpcCourier) DeliverProof(ctx context.Context, + annotatedProof *AnnotatedProof) error { + + // Decode annotated proof into proof file. + proofFile := &File{} + err := proofFile.Decode(bytes.NewReader(annotatedProof.Blob)) + if err != nil { + return err + } + + // Iterate over each proof in the proof file and submit to the courier + // service. + for i := range proofFile.proofs { + hashedProof := proofFile.proofs[i] + + var transitionProof Proof + if err := transitionProof.Decode( + bytes.NewReader(hashedProof.proofBytes), + ); err != nil { + return err + } + + proofAsset := transitionProof.Asset + + // Construct asset leaf. + rpcAsset, err := taprpc.MarshalAsset( + ctx, &proofAsset, true, true, nil, + ) + if err != nil { + return err + } + + assetLeaf := unirpc.AssetLeaf{ + Asset: rpcAsset, + IssuanceProof: hashedProof.proofBytes, + } + + // Construct universe key. + outPoint := transitionProof.OutPoint() + assetKey := unirpc.MarshalAssetKey( + outPoint, proofAsset.ScriptKey.PubKey, + ) + universeID := unirpc.MarshalUniverseID(proofAsset.ID()) + universeKey := unirpc.UniverseKey{ + Id: &universeID, + LeafKey: &assetKey, + } + + // Submit proof to courier. + _, err = c.client.InsertProof(ctx, &unirpc.AssetProof{ + Key: &universeKey, + AssetLeaf: &assetLeaf, + }) + if err != nil { + return fmt.Errorf("error inserting proof into "+ + "universe courier service: %w", err) + } + } + + return err +} + +// ReceiveProof attempts to obtain a proof file from the courier service. The +// proof is identified by the given locator. +func (c *UniverseRpcCourier) ReceiveProof(ctx context.Context, + originLocator Locator) (*AnnotatedProof, error) { + + // In order to reconstruct the proof file we must collect all the + // transition proofs that make up the main chain of proofs. That is + // accomplished by iterating backwards through the main chain of proofs + // until we reach the genesis point (minting proof). + + // We will update the locator at each iteration. + loc := originLocator + + // revProofs is a slice of transition proofs ordered from latest to + // earliest. + var revProofs []Proof + + for { + universeID := unirpc.MarshalUniverseID(*loc.AssetID) + assetKey := unirpc.MarshalAssetKey( + *loc.OutPoint, &loc.ScriptKey, + ) + universeKey := unirpc.UniverseKey{ + Id: &universeID, + LeafKey: &assetKey, + } + + resp, err := c.client.QueryProof(ctx, &universeKey) + if err != nil { + return nil, err + } + + // Decode transition proof from query response. + proofBlob := resp.AssetLeaf.IssuanceProof + var transitionProof Proof + if err := transitionProof.Decode( + bytes.NewReader(proofBlob), + ); err != nil { + return nil, err + } + + revProofs = append(revProofs, transitionProof) + + // Break if no previous witnesses + prevWitnesses := transitionProof.Asset.PrevWitnesses + if len(prevWitnesses) == 0 { + break + } + + // Update locator with principal input to the current outpoint. + prevID := prevWitnesses[0].PrevID + + // Parse script key public key. + scriptKeyPubKey, err := btcec.ParsePubKey(prevID.ScriptKey[:]) + if err != nil { + return nil, fmt.Errorf("failed to parse script key "+ + "public key from Proof.PrevID: %w", err) + } + loc.ScriptKey = *scriptKeyPubKey + + loc.AssetID = &prevID.ID + loc.OutPoint = &prevID.OutPoint + } + + // Append proofs to proof file in reverse order to their collected + // order. + proofFile := &File{} + for i := len(revProofs) - 1; i >= 0; i-- { + err := proofFile.AppendProof(revProofs[i]) + if err != nil { + return nil, fmt.Errorf("error appending proof to "+ + "proof file: %w", err) + } + } + + // Encode the full proof file. + var buf bytes.Buffer + if err := proofFile.Encode(&buf); err != nil { + return nil, fmt.Errorf("error encoding proof file: %w", err) + } + proofFileBlob := buf.Bytes() + + return &AnnotatedProof{ + Locator: originLocator, + Blob: proofFileBlob, + }, nil +} + +// SetSubscribers sets the subscribers for the courier. This method is +// thread-safe. +func (c *UniverseRpcCourier) SetSubscribers( + subscribers map[uint64]*fn.EventReceiver[fn.Event]) { + + c.subscriberMtx.Lock() + defer c.subscriberMtx.Unlock() + + c.subscribers = subscribers +} + +// A compile-time assertion to ensure the UniverseRpcCourier meets the +// proof.Courier interface. +var _ Courier = (*UniverseRpcCourier)(nil) + // DeliveryLog is an interface that allows the courier to log the (attempted) // delivery of a proof. type DeliveryLog interface { diff --git a/tapdb/assets_store_test.go b/tapdb/assets_store_test.go index 87144b067..c4f60524a 100644 --- a/tapdb/assets_store_test.go +++ b/tapdb/assets_store_test.go @@ -339,7 +339,9 @@ func TestImportAssetProof(t *testing.T) { // Finally, we'll verify all the anchor information that was inserted // on disk. require.Equal(t, testProof.AnchorBlockHash, dbAsset.AnchorBlockHash) - require.Equal(t, testProof.OutPoint, dbAsset.AnchorOutpoint) + require.Equal( + t, testProof.AssetSnapshot.OutPoint, dbAsset.AnchorOutpoint, + ) require.Equal(t, testProof.AnchorTx.TxHash(), dbAsset.AnchorTx.TxHash()) // We should also be able to fetch the proof we just inserted using the @@ -395,7 +397,9 @@ func TestImportAssetProof(t *testing.T) { // Finally, we'll verify all the anchor information that was inserted // on disk. require.Equal(t, testProof.AnchorBlockHash, dbAsset.AnchorBlockHash) - require.Equal(t, testProof.OutPoint, dbAsset.AnchorOutpoint) + require.Equal( + t, testProof.AssetSnapshot.OutPoint, dbAsset.AnchorOutpoint, + ) require.Equal(t, testProof.AnchorTx.TxHash(), dbAsset.AnchorTx.TxHash()) } diff --git a/tapgarden/custodian.go b/tapgarden/custodian.go index 88e0995e5..57b462250 100644 --- a/tapgarden/custodian.go +++ b/tapgarden/custodian.go @@ -381,6 +381,7 @@ func (c *Custodian) inspectWalletTx(walletTx *lndclient.Transaction) error { loc := proof.Locator{ AssetID: &assetID, ScriptKey: addr.ScriptKey, + OutPoint: &op, } addrProof, err := courier.ReceiveProof(ctx, loc) if err != nil {