Skip to content

Commit

Permalink
Merge pull request #1176 from lightninglabs/hashmail-deliver-logging
Browse files Browse the repository at this point in the history
Hashmail logging cleanup
  • Loading branch information
ffranr authored Nov 11, 2024
2 parents 55c55c9 + f9581d5 commit d81bd74
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 28 deletions.
51 changes: 33 additions & 18 deletions proof/courier.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,16 +439,19 @@ func (h *HashMailBox) RecvAck(ctx context.Context, sid streamID) error {
return fmt.Errorf("unable to create read stream: %w", err)
}

log.Debugf("Exec stream Recv for receiver ACK (sid=%x)", sid[:])
msg, err := readStream.Recv()
if err != nil {
return err
return fmt.Errorf("failed on stream Recv (sid=%x): %w", sid[:],
err)
}

if bytes.Equal(msg.Msg, ackMsg) {
log.Debugf("Received ACK from sender (sid=%x)", sid[:])
return nil
}

return fmt.Errorf("expected ack, got %x", msg.Msg)
return fmt.Errorf("expected ACK from hashmail service, got %x", msg.Msg)
}

// CleanUp attempts to tear down the mailbox as specified by the passed sid.
Expand Down Expand Up @@ -867,13 +870,20 @@ func (h *HashMailCourier) ensureConnect(ctx context.Context) error {
func (h *HashMailCourier) DeliverProof(ctx context.Context,
recipient Recipient, proof *AnnotatedProof) error {

log.Infof("Attempting to deliver receiver proof for send of "+
"asset_id=%v, amt=%v", recipient.AssetID, recipient.Amount)

// Compute the stream IDs for the sender and receiver.
// Compute the stream IDs for the sender and receiver. Note that these
// stream IDs are derived from the recipient's script key only. Which
// means that stream IDs will be identical for multiple proofs sent to
// the same recipient.
senderStreamID := deriveSenderStreamID(recipient)
receiverStreamID := deriveReceiverStreamID(recipient)

log.Infof("Delivering proof to asset transfer receiver "+
"(amt=%v, asset_id=%v, script_pub_key=%x, "+
"sender_sid=%x, receiver_sid=%x)",
recipient.Amount, recipient.AssetID,
recipient.ScriptKey.SerializeCompressed(), senderStreamID,
receiverStreamID)

// Interact with the hashmail service using a backoff procedure to
// ensure that we don't overwhelm the service with delivery attempts.
deliveryExec := func() error {
Expand Down Expand Up @@ -901,8 +911,7 @@ func (h *HashMailCourier) DeliverProof(ctx context.Context,
// TODO(roasbeef): do ecies here
// (this ^ TODO relates to encrypting proofs for the receiver
// before uploading to the courier)
log.Infof("Sending receiver proof via sid=%x",
senderStreamID)
log.Infof("Writing proof to mailbox (sid=%x)", senderStreamID)
err = h.mailbox.WriteProof(
ctx, senderStreamID, proof.Blob,
)
Expand All @@ -911,21 +920,27 @@ func (h *HashMailCourier) DeliverProof(ctx context.Context,
"transfer receiver: %w", err)
}

// Wait to receive the ACK from the remote party over
// their stream.
log.Infof("Waiting (%v) for receiver ACK via sid=%x",
h.cfg.ReceiverAckTimeout, receiverStreamID)
// Wait to receive ACK from proof transfer receiving peer over
// hashmail service.
log.Infof("Waiting for receiver ACK from hashmail service "+
"(timeout=%v, sid=%x)", h.cfg.ReceiverAckTimeout,
receiverStreamID)

ctxTimeout, cancel := context.WithTimeout(
ctx, h.cfg.ReceiverAckTimeout,
)
defer cancel()
err = h.mailbox.RecvAck(ctxTimeout, receiverStreamID)
if err != nil {
return fmt.Errorf("failed to receive ACK from "+
"receiver within timeout: %w", err)
return fmt.Errorf("failed to retrieve proof transfer "+
"receiver ACK within timeout (sid=%x): %w",
receiverStreamID, err)
}

log.Infof("Retrieved proof transfer receiver ACK from "+
"hashmail service (timeout=%v, sid=%x)",
h.cfg.ReceiverAckTimeout, receiverStreamID)

return nil
}
err := h.backoffHandle.Exec(
Expand All @@ -937,12 +952,12 @@ func (h *HashMailCourier) DeliverProof(ctx context.Context,
"failed: %w", err)
}

log.Infof("Received ACK from receiver! Cleaning up mailboxes...")

defer h.Close()

// Once we receive this ACK, we can clean up our mailbox and also the
// receiver's mailbox.
// If the backoff handler's exec routine completes successfully, we can
// remove our mailbox and the receiver's mailbox.
log.Infof("Removing sender and recipient mailboxes from hashmail " +
"service")
if err := h.mailbox.CleanUp(ctx, senderStreamID); err != nil {
return fmt.Errorf("failed to cleanup sender mailbox: %w", err)
}
Expand Down
82 changes: 72 additions & 10 deletions tapfreighter/chain_porter.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,15 +763,72 @@ func (p *ChainPorter) updateAssetProofFile(ctx context.Context,
}, nil
}

// reportProofTransfers logs a summary of the transfer outputs that require
// proof delivery and those that do not.
func reportProofTransfers(notDeliveringOutputs []TransferOutput,
pendingDeliveryOutputs []TransferOutput) {

log.Debugf("Count of transfer output(s) by proof delivery status: "+
"(count_delivery_not_applicable=%d, count_pending_delivery=%d)",
len(notDeliveringOutputs), len(pendingDeliveryOutputs))

// Report the transfer outputs that do not require proof delivery.
if len(notDeliveringOutputs) > 0 {
logEntries := make([]string, 0, len(notDeliveringOutputs))
for idx := range notDeliveringOutputs {
out := notDeliveringOutputs[idx]
key := out.ScriptKey.PubKey

entry := fmt.Sprintf("transfer_output_position=%d, "+
"proof_delivery_status=%v, "+
"script_key=%x", out.Position,
out.ProofDeliveryComplete,
key.SerializeCompressed())
logEntries = append(logEntries, entry)
}

entriesJoin := strings.Join(logEntries, "\n")
log.Debugf("Transfer outputs that do not require proof "+
"delivery:\n%v", entriesJoin)
}

// Report the transfer outputs that require proof delivery.
if len(pendingDeliveryOutputs) > 0 {
logEntries := make([]string, 0, len(pendingDeliveryOutputs))
for idx := range pendingDeliveryOutputs {
out := pendingDeliveryOutputs[idx]
key := out.ScriptKey.PubKey

entry := fmt.Sprintf("transfer_output_position=%d, "+
"proof_delivery_status=%v, "+
"proof_courier_addr=%s, "+
"script_key=%x", out.Position,
out.ProofDeliveryComplete, out.ProofCourierAddr,
key.SerializeCompressed())
logEntries = append(logEntries, entry)
}

entriesJoin := strings.Join(logEntries, "\n")
log.Debugf("Transfer outputs that require proof delivery:\n%v",
entriesJoin)
}
}

// transferReceiverProof retrieves the sender and receiver proofs from the
// archive and then transfers the receiver's proof to the receiver. Upon
// successful transfer, the asset parcel delivery is marked as complete.
func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {
ctx, cancel := p.WithCtxQuitNoTimeout()
defer cancel()

deliver := func(ctx context.Context, out TransferOutput) error {
key := out.ScriptKey.PubKey
// Classify transfer outputs into those that require proof delivery and
// those that do not.
var (
notDeliveringOutputs []TransferOutput
pendingDeliveryOutputs []TransferOutput
)
for idx := range pkg.OutboundPkg.Outputs {
out := pkg.OutboundPkg.Outputs[idx]

// We'll first check to see if the proof should be delivered.
shouldDeliverProof, err := out.ShouldDeliverProof()
Expand All @@ -781,15 +838,20 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {
}

if !shouldDeliverProof {
log.Debugf("Transfer ouput proof does not require "+
"delivery (transfer_output_position=%d, "+
"proof_delivery_status=%v, "+
"script_key=%x)", out.Position,
out.ProofDeliveryComplete,
key.SerializeCompressed())
return nil
notDeliveringOutputs = append(notDeliveringOutputs, out)
continue
}

pendingDeliveryOutputs = append(pendingDeliveryOutputs, out)
}

// Log a summary of the transfer outputs that require proof delivery and
// those that do not.
reportProofTransfers(notDeliveringOutputs, pendingDeliveryOutputs)

deliver := func(ctx context.Context, out TransferOutput) error {
key := out.ScriptKey.PubKey

// We just look for the full proof in the list of final proofs
// by matching the content of the proof suffix.
var receiverProof *proof.AnnotatedProof
Expand Down Expand Up @@ -871,7 +933,7 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {
// If we have a non-interactive proof, then we'll launch several
// goroutines to deliver the proof(s) to the receiver(s).
instanceErrors, err := fn.ParSliceErrCollect(
ctx, pkg.OutboundPkg.Outputs, deliver,
ctx, pendingDeliveryOutputs, deliver,
)
if err != nil {
return fmt.Errorf("error delivering proof(s): %w", err)
Expand Down

0 comments on commit d81bd74

Please sign in to comment.