Skip to content

Commit

Permalink
Merge pull request #459 from lightninglabs/hashmail-courier-session
Browse files Browse the repository at this point in the history
Chain porter proof courier service initialised using configurable address
  • Loading branch information
Roasbeef authored Aug 30, 2023
2 parents ef0f3f1 + e5ef9bd commit 4a0a16c
Show file tree
Hide file tree
Showing 17 changed files with 324 additions and 187 deletions.
9 changes: 2 additions & 7 deletions itest/aperture_harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ type ApertureHarness struct {
// ListenAddr is the address that the aperture service is listening on.
ListenAddr string

// TlsCertPath is the path to the TLS certificate that the aperture
// service is using.
TlsCertPath string

// service is the instance of the aperture service that is running.
Service *aperture.Aperture
}
Expand Down Expand Up @@ -59,9 +55,8 @@ func NewApertureHarness(t *testing.T, port int) ApertureHarness {
service := aperture.NewAperture(cfg)

return ApertureHarness{
ListenAddr: listenAddr,
TlsCertPath: filepath.Join(baseDir, "tls.cert"),
Service: service,
ListenAddr: listenAddr,
Service: service,
}
}

Expand Down
1 change: 0 additions & 1 deletion itest/tapd_harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ func newTapdHarness(ht *harnessTest, cfg tapdConfig,
)

finalCfg.HashMailCourier = &proof.HashMailCourierCfg{
TlsCertPath: typedProofCourier.TlsCertPath,
ReceiverAckTimeout: receiverAckTimeout,
BackoffCfg: backoffCfg,
}
Expand Down
232 changes: 147 additions & 85 deletions proof/courier.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,41 +39,6 @@ const (
ApertureCourier = "hashmail"
)

// NewCourierType returns the CourierType that corresponds to the given string.
func NewCourierType(scheme string) (CourierType, error) {
switch scheme {
case ApertureCourier:
return ApertureCourier, nil
}

return DisabledCourier, fmt.Errorf("unknown courier address "+
"protocol: %v", scheme)
}

func ParseCourierAddr(addr string) (*url.URL, error) {
// Parse URI.
urlAddr, err := url.ParseRequestURI(addr)
if err != nil {
return nil, fmt.Errorf("invalid proof courier URI address: %w",
err)
}

// Validate port number.
if urlAddr.Port() == "" {
return nil, fmt.Errorf("proof courier URI address port "+
"unspecified: %w", err)
}

// Validate protocol supported.
_, err = NewCourierType(urlAddr.Scheme)
if err != nil {
return nil, fmt.Errorf("invalid proof courier protocol: %w",
err)
}

return urlAddr, nil
}

// CourierHarness interface is an integration testing harness for a proof
// courier service.
type CourierHarness interface {
Expand All @@ -90,20 +55,144 @@ type CourierHarness interface {
// receiver can use this to fetch a proof from the sender.
//
// TODO(roasbeef): FileSystemCourier, RpcCourier
type Courier[Addr any] interface {
type Courier interface {
// DeliverProof attempts to delivery a proof to the receiver, using the
// information in the Addr type.
DeliverProof(context.Context, Addr, *AnnotatedProof) error
DeliverProof(context.Context, *AnnotatedProof) error

// ReceiveProof attempts to obtain a proof as identified by the passed
// locator from the source encapsulated within the specified address.
ReceiveProof(context.Context, Addr, Locator) (*AnnotatedProof, error)
ReceiveProof(context.Context, Locator) (*AnnotatedProof, error)

// SetSubscribers sets the set of subscribers that will be notified
// of proof courier related events.
SetSubscribers(map[uint64]*fn.EventReceiver[fn.Event])
}

// CourierAddr is a fully validated courier address (including protocol specific
// validation).
type CourierAddr interface {
// Url returns the url.URL representation of the courier address.
Url() *url.URL

// NewCourier generates a new courier service handle.
NewCourier(ctx context.Context, cfg *CourierCfg,
recipient Recipient) (Courier, error)
}

// ParseCourierAddrString parses a proof courier address string and returns a
// protocol specific courier address instance.
func ParseCourierAddrString(addr string) (CourierAddr, error) {
// Parse URI.
urlAddr, err := url.ParseRequestURI(addr)
if err != nil {
return nil, fmt.Errorf("invalid proof courier URI address: %w",
err)
}

return ParseCourierAddrUrl(*urlAddr)
}

// ParseCourierAddrUrl parses a proof courier address url.URL and returns a
// protocol specific courier address instance.
func ParseCourierAddrUrl(addr url.URL) (CourierAddr, error) {
// Create new courier addr based on URL scheme.
switch addr.Scheme {
case ApertureCourier:
return NewHashMailCourierAddr(addr)
}

return nil, fmt.Errorf("unknown courier address protocol: %v",
addr.Scheme)
}

// HashMailCourierAddr is a hashmail protocol specific implementation of the
// CourierAddr interface.
type HashMailCourierAddr struct {
addr url.URL
}

// Url returns the url.URL representation of the hashmail courier address.
func (h *HashMailCourierAddr) Url() *url.URL {
return &h.addr
}

// NewCourier generates a new courier service handle.
func (h *HashMailCourierAddr) NewCourier(_ context.Context, cfg *CourierCfg,
recipient Recipient) (Courier, error) {

hashMailCfg := HashMailCourierCfg{
ReceiverAckTimeout: cfg.ReceiverAckTimeout,
BackoffCfg: cfg.BackoffCfg,
}

hashMailBox, err := NewHashMailBox(&h.addr)
if err != nil {
return nil, fmt.Errorf("unable to make mailbox: %v",
err)
}

subscribers := make(
map[uint64]*fn.EventReceiver[fn.Event],
)
return &HashMailCourier{
cfg: &hashMailCfg,
recipient: recipient,
mailbox: hashMailBox,
deliveryLog: cfg.DeliveryLog,
subscribers: subscribers,
}, nil
}

// NewHashMailCourierAddr generates a new hashmail courier address from a given
// URL. This function also performs hashmail protocol specific address
// validation.
func NewHashMailCourierAddr(addr url.URL) (*HashMailCourierAddr, error) {
if addr.Scheme != ApertureCourier {
return nil, fmt.Errorf("expected hashmail courier protocol: %v",
addr.Scheme)
}

// We expect the port number to be specified for a hashmail service.
if addr.Port() == "" {
return nil, fmt.Errorf("hashmail proof courier URI address " +
"port unspecified")
}

return &HashMailCourierAddr{
addr,
}, nil
}

// NewCourier instantiates a new courier service handle given a service URL
// address.
func NewCourier(ctx context.Context, addr url.URL, cfg *CourierCfg,
recipient Recipient) (Courier, error) {

courierAddr, err := ParseCourierAddrUrl(addr)
if err != nil {
return nil, err
}

return courierAddr.NewCourier(ctx, cfg, recipient)
}

// CourierCfg contains general config parameters applicable to all proof
// couriers.
type CourierCfg struct {
// ReceiverAckTimeout is the maximum time we'll wait for the receiver to
// acknowledge the proof.
ReceiverAckTimeout time.Duration

// BackoffCfg configures the behaviour of the proof delivery
// functionality.
BackoffCfg *BackoffCfg

// DeliveryLog is the log that the courier will use to record the
// attempted delivery of proofs to the receiver.
DeliveryLog DeliveryLog
}

// ProofMailbox represents an abstract store-and-forward mailbox that can be
// used to send/receive proofs.
type ProofMailbox interface {
Expand Down Expand Up @@ -136,25 +225,13 @@ type HashMailBox struct {

// serverDialOpts returns the set of server options needed to connect to the
// server using a TLS connection.
func serverDialOpts(tlsCertPath string) ([]grpc.DialOption, error) {
func serverDialOpts() ([]grpc.DialOption, error) {
var opts []grpc.DialOption

if tlsCertPath != "" {
// Read in the specified TLS certificate and build transport
// credentials with it.
creds, err := credentials.NewClientTLSFromFile(tlsCertPath, "")
if err != nil {
return nil, err
}
opts = append(opts, grpc.WithTransportCredentials(creds))

return opts, nil
}

// If TLS certificate file path not given, use the system's TLS trust
// store.
creds := credentials.NewTLS(&tls.Config{})
opts = append(opts, grpc.WithTransportCredentials(creds))
// Skip TLS certificate verification.
tlsConfig := tls.Config{InsecureSkipVerify: true}
transportCredentials := credentials.NewTLS(&tlsConfig)
opts = append(opts, grpc.WithTransportCredentials(transportCredentials))

return opts, nil
}
Expand All @@ -164,15 +241,15 @@ func serverDialOpts(tlsCertPath string) ([]grpc.DialOption, error) {
//
// NOTE: The TLS certificate path argument (tlsCertPath) is optional. If unset,
// then the system's TLS trust store is used.
func NewHashMailBox(courierAddr *url.URL, tlsCertPath string) (*HashMailBox,
func NewHashMailBox(courierAddr *url.URL) (*HashMailBox,
error) {

if courierAddr.Scheme != ApertureCourier {
return nil, fmt.Errorf("unsupported courier protocol: %v",
courierAddr.Scheme)
}

dialOpts, err := serverDialOpts(tlsCertPath)
dialOpts, err := serverDialOpts()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -373,8 +450,6 @@ type Recipient struct {

// HashMailCourierCfg is the config for the hashmail proof courier.
type HashMailCourierCfg struct {
TlsCertPath string `long:"tlscertpath" description:"Service TLS certificate file path"`

// ReceiverAckTimeout is the maximum time we'll wait for the receiver to
// acknowledge the proof.
ReceiverAckTimeout time.Duration `long:"receiveracktimeout" description:"The maximum time to wait for the receiver to acknowledge the proof."`
Expand Down Expand Up @@ -403,11 +478,15 @@ type BackoffCfg struct {
MaxBackoff time.Duration `long:"maxbackoff" description:"The maximum backoff time to wait before retrying to deliver the proof to the receiver."`
}

// HashMailCourier is an implementation of the Courier interfaces that
// HashMailCourier is a hashmail proof courier service handle. It implements the
// Courier interface.
type HashMailCourier struct {
// cfg contains the courier's configuration parameters.
cfg *HashMailCourierCfg

// recipient describes the recipient of the proof.
recipient Recipient

mailbox ProofMailbox

// deliveryLog is the log that the courier will use to record the
Expand All @@ -423,36 +502,19 @@ type HashMailCourier struct {
subscriberMtx sync.Mutex
}

// NewHashMailCourier implements the Courier interface using the specified
// ProofMailbox. This instance of the Courier relies on the Taproot Asset
// address itself as the parametrized address type.
func NewHashMailCourier(cfg *HashMailCourierCfg, mailbox ProofMailbox,
deliveryLog DeliveryLog) (*HashMailCourier, error) {

subscribers := make(
map[uint64]*fn.EventReceiver[fn.Event],
)
return &HashMailCourier{
cfg: cfg,
mailbox: mailbox,
deliveryLog: deliveryLog,
subscribers: subscribers,
}, nil
}

// DeliverProof attempts to delivery a proof to the receiver, using the
// information in the Addr type.
//
// TODO(roasbeef): other delivery context as type param?
func (h *HashMailCourier) DeliverProof(ctx context.Context, recipient Recipient,
func (h *HashMailCourier) DeliverProof(ctx context.Context,
proof *AnnotatedProof) error {

log.Infof("Attempting to deliver receiver proof for send of "+
"asset_id=%x, amt=%v", recipient.AssetID, recipient.Amount)
"asset_id=%x, amt=%v", h.recipient.AssetID, h.recipient.Amount)

// Compute the stream IDs for the sender and receiver.
senderStreamID := deriveSenderStreamID(recipient)
receiverStreamID := deriveReceiverStreamID(recipient)
senderStreamID := deriveSenderStreamID(h.recipient)
receiverStreamID := deriveReceiverStreamID(h.recipient)

// Query delivery log to ensure a sensible rate of delivery attempts.
timestamps, err := h.deliveryLog.QueryProofDeliveryLog(
Expand Down Expand Up @@ -742,10 +804,10 @@ func NewReceiverProofBackoffWaitEvent(

// ReceiveProof attempts to obtain a proof as identified by the passed locator
// from the source encapsulated within the specified address.
func (h *HashMailCourier) ReceiveProof(ctx context.Context, recipient Recipient,
func (h *HashMailCourier) ReceiveProof(ctx context.Context,
loc Locator) (*AnnotatedProof, error) {

senderStreamID := deriveSenderStreamID(recipient)
senderStreamID := deriveSenderStreamID(h.recipient)
if err := h.mailbox.Init(ctx, senderStreamID); err != nil {
return nil, err
}
Expand All @@ -761,7 +823,7 @@ func (h *HashMailCourier) ReceiveProof(ctx context.Context, recipient Recipient,

// Now that we've read the proof, we'll create our mailbox (which might
// already exist) to send an ACK back to the sender.
receiverStreamID := deriveReceiverStreamID(recipient)
receiverStreamID := deriveReceiverStreamID(h.recipient)
log.Infof("Sending ACK to sender via sid=%x", receiverStreamID)
if err := h.mailbox.Init(ctx, receiverStreamID); err != nil {
return nil, err
Expand Down Expand Up @@ -790,7 +852,7 @@ func (h *HashMailCourier) SetSubscribers(

// A compile-time assertion to ensure the HashMailCourier meets the
// proof.Courier interface.
var _ Courier[Recipient] = (*HashMailCourier)(nil)
var _ Courier = (*HashMailCourier)(nil)

// DeliveryLog is an interface that allows the courier to log the (attempted)
// delivery of a proof.
Expand Down
12 changes: 10 additions & 2 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -991,13 +991,19 @@ func (r *rpcServer) NewAddr(ctx context.Context,
// the default specified in the config.
courierAddr := r.cfg.DefaultProofCourierAddr
if in.ProofCourierAddr != "" {
courierAddr, err = proof.ParseCourierAddr(
addr, err := proof.ParseCourierAddrString(
in.ProofCourierAddr,
)
if err != nil {
return nil, fmt.Errorf("invalid proof courier "+
"address: %w", err)
}

// At this point, we do not intend on creating a proof courier
// service instance. We are only interested in parsing and
// validating the address. We therefore convert the address into
// an url.URL type for storage in the address book.
courierAddr = addr.Url()
}

// Check that the proof courier address is set. This should never
Expand Down Expand Up @@ -1522,7 +1528,9 @@ func (r *rpcServer) FundVirtualPsbt(ctx context.Context,
return nil, fmt.Errorf("no recipients specified")
}

fundedVPkt, err = r.cfg.AssetWallet.FundAddressSend(ctx, addr)
fundedVPkt, _, err = r.cfg.AssetWallet.FundAddressSend(
ctx, addr,
)
if err != nil {
return nil, fmt.Errorf("error funding address send: "+
"%w", err)
Expand Down
Loading

0 comments on commit 4a0a16c

Please sign in to comment.