Skip to content

Commit

Permalink
Stubborn message sending in libp2p transport
Browse files Browse the repository at this point in the history
Previously, if an error occurred while sending a message in the libp2p
transport, the module dropped the message and tried to reconnect.
If messages were very sparse and the other side of the connection
gave up receiving after some time after each incoming connection,
all messages would get lost.

Now, if the libp2p transport fails to send a message,
it does not drop it, but instead tries to reconnect and,
if reconnection succeeds, immediately re-tries to send the same message
until it is sent successfully.

Signed-off-by: Matej Pavlovic <[email protected]>
  • Loading branch information
matejpavlovic authored Mar 24, 2023
1 parent fcfcb0a commit 5e674cb
Showing 1 changed file with 35 additions and 16 deletions.
51 changes: 35 additions & 16 deletions pkg/net/libp2p/remoteconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,22 @@ func (conn *remoteConnection) process() {
defer close(conn.done)
defer conn.closeStream()

// Data to be sent to the connection.
// If nil, a new message from conn.msgBuffer will be read, encoded, and stored here.
var msgData []byte

for {
// The processing loop runs indefinitely (until interrupted by explicitly returning).
// One iteration corresponds to sending one message.
// One iteration corresponds to one attempt of sending a message.

// Check if connection is being closed.
// This is necessary for not getting stuck trying to send an unsent message
// (failing all the time and retrying forever).
select {
case <-conn.stop:
return
default:
}

// Create a network connection if there is none.
if conn.stream == nil {
Expand All @@ -238,25 +251,31 @@ func (conn *remoteConnection) process() {
}
}

// Get the next message and write it to the output stream (unless connection is closing).
select {
case <-conn.stop:
return
case msg := <-conn.msgBuffer:

// Encode message to a byte slice.
data, err := encodeMessage(msg, conn.ownID)
if err != nil {
conn.logger.Log(logging.LevelError, "Could not encode message. Disconnecting.", "err", err)
// Get the next message and encode it if there is no pending unsent message.
if msgData == nil {
select {
case <-conn.stop:
return
case msg := <-conn.msgBuffer:
// Encode message to a byte slice.
var err error
msgData, err = encodeMessage(msg, conn.ownID)
if err != nil {
conn.logger.Log(logging.LevelError, "Could not encode message. Disconnecting.", "err", err)
return
}
}
}

// Write the encoded data to the network stream.
// Write the encoded data to the network stream.
if err := conn.writeDataToStream(msgData); err != nil {
// If writing fails, close the stream, such that a new one will be re-established in the next iteration.
if err = conn.writeDataToStream(data); err != nil {
conn.logger.Log(logging.LevelWarn, "Failed sending data.", "err", err)
conn.closeStream()
}
conn.logger.Log(logging.LevelWarn, "Failed sending data.", "err", err)
conn.closeStream()
} else {
// On success, clear the pending message (that has just been sent)
// so a new one can be read from the msbBuffer on the next iteration.
msgData = nil
}
}
}
Expand Down

0 comments on commit 5e674cb

Please sign in to comment.