Skip to content

Commit

Permalink
multi: add universe RPC proof courier
Browse files Browse the repository at this point in the history
  • Loading branch information
ffranr committed Aug 31, 2023
1 parent 4c426d2 commit cba7cb0
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 2 deletions.
2 changes: 2 additions & 0 deletions proof/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type Locator struct {
// ScriptKey specifies the script key of the asset to fetch/store. This
// field MUST be specified.
ScriptKey btcec.PublicKey

OutPoint *wire.OutPoint
}

// Hash returns a SHA256 hash of the bytes serialized locator.
Expand Down
261 changes: 261 additions & 0 deletions proof/courier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/lightninglabs/lightning-node-connect/hashmailrpc"
"github.com/lightninglabs/taproot-assets/asset"
"github.com/lightninglabs/taproot-assets/fn"
"github.com/lightninglabs/taproot-assets/taprpc"
unirpc "github.com/lightninglabs/taproot-assets/taprpc/universerpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
Expand All @@ -37,6 +39,10 @@ const (
// TODO(ffranr): Rename to HashmailCourier (use protocol name rather
// than service).
ApertureCourier = "hashmail"

// UniverseRpcCourierType is a courier that uses the daemon universe RPC
// endpoints to deliver proofs.
UniverseRpcCourierType = "universerpc"
)

// CourierHarness interface is an integration testing harness for a proof
Expand Down Expand Up @@ -100,6 +106,8 @@ func ParseCourierAddrUrl(addr url.URL) (CourierAddr, error) {
switch addr.Scheme {
case ApertureCourier:
return NewHashMailCourierAddr(addr)
case UniverseRpcCourierType:
return NewUniverseRpcCourierAddr(addr)
}

return nil, fmt.Errorf("unknown courier address protocol: %v",
Expand Down Expand Up @@ -164,6 +172,70 @@ func NewHashMailCourierAddr(addr url.URL) (*HashMailCourierAddr, error) {
}, nil
}

// UniverseRpcCourierAddr is a universe RPC protocol specific implementation of
// the CourierAddr interface.
type UniverseRpcCourierAddr struct {
addr url.URL
}

// Url returns the url.URL representation of the courier address.
func (h *UniverseRpcCourierAddr) Url() *url.URL {
return &h.addr
}

// NewCourier generates a new courier service handle.
func (h *UniverseRpcCourierAddr) NewCourier(_ context.Context,
cfg *CourierCfg, recipient Recipient) (Courier, error) {

// Ensure that the courier address is a universe RPC address.
if h.addr.Scheme != UniverseRpcCourierType {
return nil, fmt.Errorf("unsupported courier protocol: %v",
h.addr.Scheme)
}

// Connect to the universe RPC server.
dialOpts, err := serverDialOpts()
if err != nil {
return nil, err
}

serverAddr := fmt.Sprintf(
"%s:%s", h.addr.Hostname(), h.addr.Port(),
)
conn, err := grpc.Dial(serverAddr, dialOpts...)
if err != nil {
return nil, err
}

client := unirpc.NewUniverseClient(conn)

// Instantiate the events subscribers map.
subscribers := make(
map[uint64]*fn.EventReceiver[fn.Event],
)

return &UniverseRpcCourier{
recipient: recipient,
client: client,
deliveryLog: cfg.DeliveryLog,
subscribers: subscribers,
}, nil
}

// NewUniverseRpcCourierAddr generates a new universe RPC courier address from a
// given URL. This function also performs protocol specific address validation.
func NewUniverseRpcCourierAddr(addr url.URL) (*UniverseRpcCourierAddr, error) {
// We expect the port number to be specified.
if addr.Port() == "" {
return nil, fmt.Errorf("proof courier URI address port " +
"unspecified")
}

return &UniverseRpcCourierAddr{
addr,
}, nil
}

// NewCourier instantiates a new courier service handle given a service URL
// address.
func NewCourier(ctx context.Context, addr url.URL, cfg *CourierCfg,
Expand Down Expand Up @@ -854,6 +926,195 @@ func (h *HashMailCourier) SetSubscribers(
// proof.Courier interface.
var _ Courier = (*HashMailCourier)(nil)

// UniverseRpcCourier is a universe RPC proof courier service handle. It
// implements the Courier interface.
type UniverseRpcCourier struct {
// recipient describes the recipient of the proof.
recipient Recipient

// client is the RPC client that the courier will use to interact with
// the universe RPC server.
client unirpc.UniverseClient

// deliveryLog is the log that the courier will use to record the
// attempted delivery of proofs to the receiver.
deliveryLog DeliveryLog

// subscribers is a map of components that want to be notified on new
// events, keyed by their subscription ID.
subscribers map[uint64]*fn.EventReceiver[fn.Event]

// subscriberMtx guards the subscribers map and access to the
// subscriptionID.
subscriberMtx sync.Mutex
}

// DeliverProof attempts to delivery a proof file to the receiver.
func (c *UniverseRpcCourier) DeliverProof(ctx context.Context,
annotatedProof *AnnotatedProof) error {

// Decode annotated proof into proof file.
proofFile := &File{}
err := proofFile.Decode(bytes.NewReader(annotatedProof.Blob))
if err != nil {
return err
}

// Iterate over each proof in the proof file and submit to the courier
// service.
for i := range proofFile.proofs {
hashedProof := proofFile.proofs[i]

var transitionProof Proof
if err := transitionProof.Decode(
bytes.NewReader(hashedProof.proofBytes),
); err != nil {
return err
}

proofAsset := transitionProof.Asset

// Construct asset leaf.
rpcAsset, err := taprpc.MarshalAsset(
ctx, &proofAsset, true, true, nil,
)
if err != nil {
return err
}

assetLeaf := unirpc.AssetLeaf{
Asset: rpcAsset,
IssuanceProof: hashedProof.proofBytes,
}

// Construct universe key.
outPoint := transitionProof.OutPoint()
assetKey := unirpc.MarshalAssetKey(
outPoint, proofAsset.ScriptKey.PubKey,
)
universeID := unirpc.MarshalUniverseID(proofAsset.ID())
universeKey := unirpc.UniverseKey{
Id: &universeID,
LeafKey: &assetKey,
}

// Submit proof to courier.
_, err = c.client.InsertProof(ctx, &unirpc.AssetProof{
Key: &universeKey,
AssetLeaf: &assetLeaf,
})
if err != nil {
return fmt.Errorf("error inserting proof into "+
"universe courier service: %w", err)
}
}

return err
}

// ReceiveProof attempts to obtain a proof file from the courier service. The
// proof is identified by the given locator.
func (c *UniverseRpcCourier) ReceiveProof(ctx context.Context,
originLocator Locator) (*AnnotatedProof, error) {

// In order to reconstruct the proof file we must collect all the
// transition proofs that make up the main chain of proofs. That is
// accomplished by iterating backwards through the main chain of proofs
// until we reach the genesis point (minting proof).

// We will update the locator at each iteration.
loc := originLocator

// revProofs is a slice of transition proofs ordered from latest to
// earliest.
var revProofs []Proof

for {
universeID := unirpc.MarshalUniverseID(*loc.AssetID)
assetKey := unirpc.MarshalAssetKey(
*loc.OutPoint, &loc.ScriptKey,
)
universeKey := unirpc.UniverseKey{
Id: &universeID,
LeafKey: &assetKey,
}

resp, err := c.client.QueryProof(ctx, &universeKey)
if err != nil {
return nil, err
}

// Decode transition proof from query response.
proofBlob := resp.AssetLeaf.IssuanceProof
var transitionProof Proof
if err := transitionProof.Decode(
bytes.NewReader(proofBlob),
); err != nil {
return nil, err
}

revProofs = append(revProofs, transitionProof)

// Break if no previous witnesses
prevWitnesses := transitionProof.Asset.PrevWitnesses
if len(prevWitnesses) == 0 {
break
}

// Update locator with principal input to the current outpoint.
prevID := prevWitnesses[0].PrevID

// Parse script key public key.
scriptKeyPubKey, err := btcec.ParsePubKey(prevID.ScriptKey[:])
if err != nil {
return nil, fmt.Errorf("failed to parse script key "+
"public key from Proof.PrevID: %w", err)
}
loc.ScriptKey = *scriptKeyPubKey

loc.AssetID = &prevID.ID
loc.OutPoint = &prevID.OutPoint
}

// Append proofs to proof file in reverse order to their collected
// order.
proofFile := &File{}
for i := len(revProofs) - 1; i >= 0; i-- {
err := proofFile.AppendProof(revProofs[i])
if err != nil {
return nil, fmt.Errorf("error appending proof to "+
"proof file: %w", err)
}
}

// Encode the full proof file.
var buf bytes.Buffer
if err := proofFile.Encode(&buf); err != nil {
return nil, fmt.Errorf("error encoding proof file: %w", err)
}
proofFileBlob := buf.Bytes()

return &AnnotatedProof{
Locator: originLocator,
Blob: proofFileBlob,
}, nil
}

// SetSubscribers sets the subscribers for the courier. This method is
// thread-safe.
func (c *UniverseRpcCourier) SetSubscribers(
subscribers map[uint64]*fn.EventReceiver[fn.Event]) {

c.subscriberMtx.Lock()
defer c.subscriberMtx.Unlock()

c.subscribers = subscribers
}

// A compile-time assertion to ensure the UniverseRpcCourier meets the
// proof.Courier interface.
var _ Courier = (*UniverseRpcCourier)(nil)

// DeliveryLog is an interface that allows the courier to log the (attempted)
// delivery of a proof.
type DeliveryLog interface {
Expand Down
8 changes: 6 additions & 2 deletions tapdb/assets_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,9 @@ func TestImportAssetProof(t *testing.T) {
// Finally, we'll verify all the anchor information that was inserted
// on disk.
require.Equal(t, testProof.AnchorBlockHash, dbAsset.AnchorBlockHash)
require.Equal(t, testProof.OutPoint, dbAsset.AnchorOutpoint)
require.Equal(
t, testProof.AssetSnapshot.OutPoint, dbAsset.AnchorOutpoint,
)
require.Equal(t, testProof.AnchorTx.TxHash(), dbAsset.AnchorTx.TxHash())

// We should also be able to fetch the proof we just inserted using the
Expand Down Expand Up @@ -395,7 +397,9 @@ func TestImportAssetProof(t *testing.T) {
// Finally, we'll verify all the anchor information that was inserted
// on disk.
require.Equal(t, testProof.AnchorBlockHash, dbAsset.AnchorBlockHash)
require.Equal(t, testProof.OutPoint, dbAsset.AnchorOutpoint)
require.Equal(
t, testProof.AssetSnapshot.OutPoint, dbAsset.AnchorOutpoint,
)
require.Equal(t, testProof.AnchorTx.TxHash(), dbAsset.AnchorTx.TxHash())
}

Expand Down
1 change: 1 addition & 0 deletions tapgarden/custodian.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ func (c *Custodian) inspectWalletTx(walletTx *lndclient.Transaction) error {
loc := proof.Locator{
AssetID: &assetID,
ScriptKey: addr.ScriptKey,
OutPoint: &op,
}
addrProof, err := courier.ReceiveProof(ctx, loc)
if err != nil {
Expand Down

0 comments on commit cba7cb0

Please sign in to comment.