Skip to content

Commit

Permalink
Fix: NBX would sometimes stays stuck in case of reorg (Fix #461 #409)
Browse files Browse the repository at this point in the history
  • Loading branch information
NicolasDorier committed Mar 6, 2024
1 parent e83cfdb commit 825d5bc
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 81 deletions.
30 changes: 30 additions & 0 deletions NBXplorer.Tests/UnitTest1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4501,5 +4501,35 @@ public async Task CanUseRPCProxy(Backend backend)
await tester.Client.RPCClient.GetTxOutAsync(uint256.One, 0);
}
}


[Fact]
public async Task DoNotHangDuringReorg()
{
using var tester = ServerTester.Create(Backend.Postgres);
var wallet = await tester.Client.GenerateWalletAsync(new GenerateWalletRequest());
var addr = await tester.Client.GetUnusedAsync(wallet.DerivationScheme, DerivationFeature.Deposit);
var txId = tester.SendToAddress(addr.Address, Money.Coins(1.0m));
tester.Notifications.WaitForTransaction(wallet.DerivationScheme, txId);
var blocks = await tester.RPC.GenerateAsync(4);
for (int i = 0; i < blocks.Length; i++)
{
Logs.Tester.LogInformation($"Chain1: [{i}]: {blocks[i]}");
}
tester.Notifications.WaitForBlocks(blocks[^1]);
Logs.Tester.LogInformation("Invalidate the first block which confirmed the transaction " + blocks[0]);
tester.RPC.InvalidateBlock(blocks[0]);
var blocks2 = await tester.RPC.GenerateAsync(3);
for (int i = 0; i < blocks2.Length; i++)
{
Logs.Tester.LogInformation($"Chain2: [{i}]: {blocks2[i]}");
}
tester.Notifications.WaitForBlocks(blocks2[^1]);
Logs.Tester.LogInformation("Reconsider the block " + blocks[0]);
tester.RPC.SendCommand("reconsiderblock", blocks[0]);

Logs.Tester.LogInformation($"Waiting for the first chain to be processed again");
tester.Notifications.WaitForBlocks(blocks[^1]);
}
}
}
150 changes: 70 additions & 80 deletions NBXplorer/Backends/Postgres/PostgresIndexers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ public PostgresIndexer(
CancellationTokenSource cts;
Task _indexerLoop;
Task _watchdogLoop;
Node _Node;
Channel<object> _Channel;
Channel<Block> _DownloadedBlocks;

// This one will check if the indexer is "stuck" and disconnect the node if it is the case
async Task WatchdogLoop()
Expand All @@ -58,14 +55,14 @@ async Task WatchdogLoop()
try
{
await Task.Delay(TimeSpan.FromMinutes(5.0), cancellationToken);
var height = await SeemsStuck(cancellationToken);
if (height is null)
var lastBlock = await SeemsStuck(cancellationToken);
if (lastBlock is null)
goto wait;
await Task.Delay(TimeSpan.FromMinutes(2.0), cancellationToken);
var height2 = await SeemsStuck(cancellationToken);
if (height != height2)
var lastBlock2 = await SeemsStuck(cancellationToken);
if (lastBlock != lastBlock2)
goto wait;
_Node?.DisconnectAsync($"Sync seems stuck at height {height.Value}, restarting the connection.");
_Connection?.Dispose($"Sync seems stuck after block {lastBlock.Hash} ({lastBlock.Hash}), restarting the connection.");
goto wait;
}
catch when (cts.Token.IsCancellationRequested)
Expand All @@ -80,18 +77,17 @@ async Task WatchdogLoop()
end:;
}

async Task<long?> SeemsStuck(CancellationToken cancellationToken)
async Task<SlimChainedBlock> SeemsStuck(CancellationToken cancellationToken)
{
if (State is not (BitcoinDWaiterState.NBXplorerSynching or BitcoinDWaiterState.Ready) ||
SyncHeight is not long syncHeight ||
lastIndexedBlock is not { } lastBlock ||
GetConnectedClient() is not RPCClient rpc)
{
return null;
}

var blockchainInfo = await rpc.GetBlockchainInfoAsyncEx(cancellationToken);
if (Math.Min(blockchainInfo.Headers, blockchainInfo.Blocks) > syncHeight)
return syncHeight;
return null;
return blockchainInfo.BestBlockHash != lastBlock.Hash ? lastBlock : null;
}

async Task IndexerLoop()
Expand Down Expand Up @@ -121,24 +117,53 @@ async Task IndexerLoop()
}
}

class Connection : IDisposable
{
public Channel<Object> Events;
public Channel<Block> Blocks;
public Node Node;
public Connection(Node node)
{
Node = node;
Events = Channel.CreateUnbounded<object>(new() { AllowSynchronousContinuations = false });
Blocks = Channel.CreateUnbounded<Block>(new() { AllowSynchronousContinuations = false });
}
bool _Disposed = false;

public void Dispose()
{
Dispose(null);
}
public void Dispose(string reason)
{
if (_Disposed)
return;
Node.DisconnectAsync(reason);
Events.Writer.TryComplete();
Blocks.Writer.TryComplete();
_Disposed = true;
}
}
Connection _Connection;
private async Task IndexerLoopCore(CancellationToken token)
{
await ConnectNode(token, true);
await foreach (var item in _Channel.Reader.ReadAllAsync(token))
await ConnectNode(token);
var connection = _Connection;
await foreach (var item in connection.Events.Reader.ReadAllAsync(token))
{
await using var conn = await ConnectionFactory.CreateConnectionHelper(Network);
if (item is PullBlocks pb)
{
var headers = ConsolidatePullBlocks(_Channel.Reader, pb);
var headers = ConsolidatePullBlocks(connection.Events.Reader, pb);
foreach (var batch in headers.Chunk(maxinflight))
{
_ = _Node.SendMessageAsync(
_ = connection.Node.SendMessageAsync(
new GetDataPayload(
batch.Select(b => new InventoryVector(_Node.AddSupportedOptions(InventoryType.MSG_BLOCK), b.GetHash())
batch.Select(b => new InventoryVector(connection.Node.AddSupportedOptions(InventoryType.MSG_BLOCK), b.GetHash())
).ToArray()));
var remaining = batch.Select(b => b.GetHash()).ToHashSet();
List<Block> unorderedBlocks = new List<Block>();
await foreach (var block in _DownloadedBlocks.Reader.ReadAllAsync(token))
await foreach (var block in connection.Blocks.Reader.ReadAllAsync(token))
{
if (!remaining.Remove(block.Header.GetHash()))
continue;
Expand Down Expand Up @@ -188,17 +213,14 @@ private async Task IndexerLoopCore(CancellationToken token)
}
}
await SaveProgress(conn);
await UpdateState();
await UpdateState(connection.Node);
}
await AskNextHeaders(token);
}
if (item is NodeDisconnected)
{
await ConnectNode(token, false);
if (connection.Node.State != NodeState.HandShaked)
await AskNextHeaders(connection.Node, token);
}
if (item is Transaction tx)
{
var txs = PullTransactions(_Channel.Reader, tx);
var txs = PullTransactions(connection.Events.Reader, tx);
await SaveMatches(conn, txs, null, true);
}
}
Expand Down Expand Up @@ -255,15 +277,8 @@ private IList<BlockHeader> ConsolidatePullBlocks(ChannelReader<object> reader, P
}


private async Task ConnectNode(CancellationToken token, bool forceRestart)
private async Task ConnectNode(CancellationToken token)
{
if (_Node is not null)
{
if (!forceRestart && _Node.State == NodeState.HandShaked)
return;
_Node.DisconnectAsync("Restarting");
_Node = null;
}
State = BitcoinDWaiterState.NotStarted;
using (var handshakeTimeout = CancellationTokenSource.CreateLinkedTokenSource(token))
{
Expand Down Expand Up @@ -344,35 +359,35 @@ private async Task ConnectNode(CancellationToken token, bool forceRestart)
State = BitcoinDWaiterState.NBXplorerSynching;
// Refresh the NetworkInfo that may have become different while it was synching.
NetworkInfo = await RPCClient.GetNetworkInfoAsync();
_Node = node;
_Channel?.Writer.Complete();
_Channel = Channel.CreateUnbounded<object>();
_DownloadedBlocks?.Writer.Complete();
_DownloadedBlocks = Channel.CreateUnbounded<Block>();

_Connection?.Dispose("Creating new connection");
_Connection = new Connection(node);
node.MessageReceived += Node_MessageReceived;
node.Disconnected += Node_Disconnected;

var locator = await AskNextHeaders(token);
var locator = await AskNextHeaders(node, token);
lastIndexedBlock = await Repository.GetLastIndexedSlimChainedBlock(locator);
if (lastIndexedBlock is null)
{
var locatorTip = await RPCClient.GetBlockHeaderAsyncEx(locator.Blocks[0], token);
lastIndexedBlock = locatorTip?.ToSlimChainedBlock();
}
await UpdateState();
await UpdateState(node);
}
}


bool firstConnect = true;
private async Task<BlockLocator> AskNextHeaders(CancellationToken token)
private async Task<BlockLocator> AskNextHeaders(Node node, CancellationToken token)
{
var indexProgress = await Repository.GetIndexProgress();
if (indexProgress is null)
{
indexProgress = await GetDefaultCurrentLocation(token);
}
await _Node.SendMessageAsync(new GetHeadersPayload(indexProgress));
foreach (var block in indexProgress.Blocks)
{
Logger.LogInformation($"Asking for block {block}");
}
await node.SendMessageAsync(new GetHeadersPayload(indexProgress));
return indexProgress;
}

Expand All @@ -391,8 +406,10 @@ private async Task SaveProgress(DbConnectionHelper conn)
await Repository.SetIndexProgress(conn.Connection, locator);
}

private async Task UpdateState()
private async Task UpdateState(Node node)
{
if (node.State != NodeState.HandShaked)
return;
var blockchainInfo = await RPCClient.GetBlockchainInfoAsyncEx();
if (blockchainInfo.IsSynching(Network))
{
Expand Down Expand Up @@ -508,18 +525,16 @@ private async Task SaveMatches(DbConnectionHelper conn, List<Transaction> transa

SlimChainedBlock lastIndexedBlock;
record PullBlocks(IList<BlockHeader> headers);
record NodeDisconnected();
private void Node_MessageReceived(Node node, IncomingMessage message)
{
var channel = _Channel;
var downloadedBlocks = _DownloadedBlocks;
var connection = _Connection;
if (message.Message.Payload is HeadersPayload h && h.Headers.Count != 0)
{
channel.Writer.TryWrite(new PullBlocks(h.Headers));
connection.Events.Writer.TryWrite(new PullBlocks(h.Headers));
}
else if (message.Message.Payload is BlockPayload b)
{
downloadedBlocks.Writer.TryWrite(b.Object);
connection.Blocks.Writer.TryWrite(b.Object);
}
else if (message.Message.Payload is InvPayload invs)
{
Expand All @@ -535,41 +550,17 @@ private void Node_MessageReceived(Node node, IncomingMessage message)
{
node.SendMessageAsync(data);
}
// DOGE coin doing doge things forget we want header first sync... reboot the connection
else
{
if (invs.Inventory.Where(t => t.Type.HasFlag(InventoryType.MSG_BLOCK)).Any())
{
node.DisconnectAsync("Not sending headers first anymore");
}
}
}
else if (message.Message.Payload is TxPayload tx)
{
channel.Writer.TryWrite(tx.Object);
connection.Events.Writer.TryWrite(tx.Object);
}
}

private void Node_Disconnected(Node node)
{
var channel = _Channel;
if (node.DisconnectReason.Reason != "Restarting")
{
if (!cts.IsCancellationRequested)
{
var exception = node.DisconnectReason.Exception?.Message;
if (!string.IsNullOrEmpty(exception))
exception = $" ({exception})";
else
exception = String.Empty;
Logger.LogWarning($"Node disconnected for reason: {node.DisconnectReason.Reason}{exception}");
}
channel.Writer.TryWrite(new NodeDisconnected());
}
else
{
Logger.LogInformation($"Restarting node connection...");
}
Logger.LogInformation($"Node disconnected ({node.DisconnectReason.Reason})");
_Connection?.Dispose();
node.MessageReceived -= Node_MessageReceived;
node.Disconnected -= Node_Disconnected;
State = BitcoinDWaiterState.NotStarted;
Expand All @@ -589,12 +580,11 @@ public async Task StartAsync(CancellationToken cancellationToken)
public async Task StopAsync(CancellationToken cancellationToken)
{
cts?.Cancel();
_Channel.Writer.Complete();
_Connection?.Dispose("NBXplorer stopping...");
if (_indexerLoop is not null)
await _indexerLoop;
if (_watchdogLoop is not null)
await _watchdogLoop;
_Node?.DisconnectAsync();
}
public NBXplorerNetwork Network => network;

Expand Down
2 changes: 1 addition & 1 deletion NBXplorer/NBXplorer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<OutputType>Exe</OutputType>
<TargetFramework Condition="'$(TargetFrameworkOverride)' == ''">net8.0</TargetFramework>
<TargetFramework Condition="'$(TargetFrameworkOverride)' != ''">$(TargetFrameworkOverride)</TargetFramework>
<Version>2.5.0</Version>
<Version>2.5.1</Version>
<DocumentationFile>bin\$(Configuration)\$(TargetFramework)\NBXplorer.xml</DocumentationFile>
<NoWarn>1701;1702;1705;1591;CS1591</NoWarn>
<LangVersion>12</LangVersion>
Expand Down

0 comments on commit 825d5bc

Please sign in to comment.