Skip to content

Commit

Permalink
feat: Implement fallback when block announce fails.
Browse files Browse the repository at this point in the history
  • Loading branch information
Zurcusa committed Sep 11, 2024
1 parent 1ec9c92 commit 1123686
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import com.limechain.exception.scale.ScaleEncodingException;
import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshakeBuilder;
import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage;
import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceHandshakeScaleWriter;
import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceMessageScaleReader;
import com.limechain.network.protocol.warp.dto.BlockHeader;
import com.limechain.polkaj.reader.ScaleCodecReader;
import com.limechain.polkaj.writer.ScaleCodecWriter;
import com.limechain.rpc.server.AppBean;
Expand All @@ -27,14 +27,14 @@ public class BlockAnnounceEngine {

public static void handleBlockAnnounce(byte[] msg, String peerId) {
ScaleCodecReader reader = new ScaleCodecReader(msg);
BlockAnnounceMessage announce = reader.read(new BlockAnnounceMessageScaleReader());
BlockHeader announceHeader = reader.read(new BlockAnnounceMessageScaleReader()).getHeader();
// connectionManager.updatePeer(peerId, announce);
AppBean.getBean(WarpSyncState.class).syncBlockAnnounce(announce);
log.log(Level.FINE, "Received block announce for block #" + announce.getHeader().getBlockNumber() +
" from " + peerId +
" with hash:0x" + announce.getHeader().getHash() +
" parentHash:" + announce.getHeader().getParentHash() +
" stateRoot:" + announce.getHeader().getStateRoot());
AppBean.getBean(WarpSyncState.class).syncRuntimeUpdate(announceHeader);
log.log(Level.FINE, "Received block announce for block #" + announceHeader.getBlockNumber() +
" from " + peerId +
" with hash:0x" + announceHeader.getHash() +
" parentHash:" + announceHeader.getParentHash() +
" stateRoot:" + announceHeader.getStateRoot());
}

public static String getHandshake() {
Expand All @@ -48,29 +48,29 @@ public static String getHandshake() {
}

@JSBody(params = {"handshake", "protocolId"}, script = "window.fruzhin.libp.getConnections().forEach(async (peer) => {" +
" let stream = await ItPbStream.pbStream(await window.fruzhin.libp.dialProtocol(peer.remotePeer, protocolId));" +
" stream.writeLP(Ed25519.h2b(handshake));" +
"});")
" let stream = await ItPbStream.pbStream(await window.fruzhin.libp.dialProtocol(peer.remotePeer, protocolId));" +
" stream.writeLP(Ed25519.h2b(handshake));" +
"});")
public static native void sendHandshakeToAll(String handshake, String protocolId);


@JSBody(params = {"announceExport", "protocolId"}, script =
"window.fruzhin.libp.handle(protocolId, async ({connection, stream}) => {" +
" ItPipe.pipe(stream, async function (source) {" +
" for await (const msg of source) {" +
" let subarr = msg.subarray();" +
" if(subarr.length === 69) {" +
" let handshake = announceExport.getHandshake();" +
" (await ItPbStream.pbStream(stream)).writeLP(Ed25519.h2b(handshake));" +
" } else if (subarr.length > 1) {" +
" announceExport.blockAnnounce(Ed25519.b2h(subarr.slice(2)), connection.remotePeer.toString());" +
" }" +
" }" +
" });" +
"});" +
"fruzhin.libp.addEventListener('peer:connect', async (evt) => {" +
" let handshake = announceExport.getHandshake();" +
" (await ItPbStream.pbStream(await window.fruzhin.libp.dialProtocol(evt.detail, protocolId))).writeLP(Ed25519.h2b(handshake));" +
"});")
" ItPipe.pipe(stream, async function (source) {" +
" for await (const msg of source) {" +
" let subarr = msg.subarray();" +
" if(subarr.length === 69) {" +
" let handshake = announceExport.getHandshake();" +
" (await ItPbStream.pbStream(stream)).writeLP(Ed25519.h2b(handshake));" +
" } else if (subarr.length > 1) {" +
" announceExport.blockAnnounce(Ed25519.b2h(subarr.slice(2)), connection.remotePeer.toString());" +
" }" +
" }" +
" });" +
"});" +
"fruzhin.libp.addEventListener('peer:connect', async (evt) => {" +
" let handshake = announceExport.getHandshake();" +
" (await ItPbStream.pbStream(await window.fruzhin.libp.dialProtocol(evt.detail, protocolId))).writeLP(Ed25519.h2b(handshake));" +
"});")
public static native void registerHandler(JSObject announceExport, String protocolId);
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,39 @@
package com.limechain.network.protocol.blockannounce;

import com.limechain.network.protocol.blockannounce.teavm.BlockAnnounceHandler;
import com.limechain.network.protocol.warp.dto.BlockHeader;
import com.limechain.rpc.ChainRpcClient;
import com.limechain.rpc.dto.ChainGetHeaderResult;
import com.limechain.rpc.server.AppBean;
import com.limechain.sync.warpsync.WarpSyncState;
import com.limechain.teavm.TeaVMScheduler;
import com.limechain.utils.RpcUtils;
import com.limechain.utils.Stopwatch;
import lombok.extern.java.Log;

import java.util.logging.Level;

@Log
public class BlockAnnounceService {

private final String protocolId;
private boolean isRegistered = false;

// 10 second threshold starts the fallback after one.
private static final long FALLBACK_THRESHOLD = 10_000;

public BlockAnnounceService(String protocolId) {
this.protocolId = protocolId;
}

public void sendHandshake() {
try {
if (!isRegistered) {
BlockAnnounceEngine.registerHandler(new BlockAnnounceHandler(), protocolId);
Stopwatch stopwatch = new Stopwatch();
BlockAnnounceEngine.registerHandler(new BlockAnnounceHandler(stopwatch), protocolId);
isRegistered = true;

registerFallbackRpcScheduler(stopwatch);
}
} catch (IllegalStateException e) {
log.warning("Error registering block announce handler");
Expand All @@ -28,4 +44,19 @@ public void sendHandshake() {
log.warning("Error sending block announce handshake request");
}
}

private static void registerFallbackRpcScheduler(Stopwatch stopwatch) {
TeaVMScheduler.schedule(() -> {
if (stopwatch.getElapsedTime() > FALLBACK_THRESHOLD) {
ChainGetHeaderResult rpcResult = ChainRpcClient.getHeader(null);
BlockHeader fallbackHeader = RpcUtils.toBlockHeader(rpcResult);
AppBean.getBean(WarpSyncState.class).syncRuntimeUpdate(fallbackHeader);

log.log(Level.INFO, "Synced block announce via RPC for block #" + fallbackHeader.getBlockNumber() +
" with hash:0x" + fallbackHeader.getHash() +
" parentHash:" + fallbackHeader.getParentHash() +
" stateRoot:" + fallbackHeader.getStateRoot());
}
}, 6_000);
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
package com.limechain.network.protocol.blockannounce.teavm;

import com.limechain.network.protocol.blockannounce.BlockAnnounceEngine;
import com.limechain.utils.Stopwatch;
import com.limechain.utils.StringUtils;


public class BlockAnnounceHandler implements BlockAnnounceExport {

private final Stopwatch stopwatch;

public BlockAnnounceHandler(Stopwatch stopwatch) {
this.stopwatch = stopwatch;
}

public void blockAnnounce(String announce, String peerId) {
BlockAnnounceEngine.handleBlockAnnounce(StringUtils.fromHex(announce), peerId);
stopwatch.reset();
}

public String getHandshake() {
return BlockAnnounceEngine.getHandshake();
}

}
}
6 changes: 5 additions & 1 deletion src/main/java/com/limechain/rpc/ChainRpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ public static Hash256 getLastFinalizedBlockHash() {
}

public static ChainGetHeaderResult getHeader(String blockHash) {
RpcResponse response = sendRpcRequest(RpcMethod.CHAIN_GET_HEADER, List.of(blockHash));
List<Object> params = blockHash == null
? List.of()
: List.of(blockHash);

RpcResponse response = sendRpcRequest(RpcMethod.CHAIN_GET_HEADER, params);
return getResult(response, ChainGetHeaderResult.class);
}
}
2 changes: 1 addition & 1 deletion src/main/java/com/limechain/rpc/RpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static String sendRpcRequest(String method, Object[] params) {
* @return The {@link RpcResponse} representation of the received RPC json result.
*/
protected static RpcResponse sendRpcRequest(RpcMethod method, List<Object> params) {
String jsonResult = HttpRequest.asyncHttpRequest(POST, LOAD_BALANCER.getNextEndpoint(),
String jsonResult = HttpRequest.createHttpRequest(POST, LOAD_BALANCER.getNextEndpoint(),
createRpcRequestJson(method.getMethod(), params));
return OBJECT_MAPPER.mapToClass(jsonResult, RpcResponse.class);
}
Expand Down
26 changes: 13 additions & 13 deletions src/main/java/com/limechain/sync/warpsync/WarpSyncState.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import com.limechain.chain.lightsyncstate.Authority;
import com.limechain.network.Network;
import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage;
import com.limechain.network.protocol.grandpa.messages.commit.CommitMessage;
import com.limechain.network.protocol.grandpa.messages.neighbour.NeighbourMessage;
import com.limechain.network.protocol.warp.dto.BlockHeader;
import com.limechain.network.protocol.warp.dto.ConsensusEngine;
import com.limechain.network.protocol.warp.dto.DigestType;
import com.limechain.network.protocol.warp.dto.HeaderDigest;
Expand Down Expand Up @@ -49,9 +49,9 @@ public class WarpSyncState {

public WarpSyncState(SyncState syncState, Network network) {
this(syncState,
network,
new HashSet<>(),
new PriorityQueue<>(Comparator.comparing(Pair::getValue0)));
network,
new HashSet<>(),
new PriorityQueue<>(Comparator.comparing(Pair::getValue0)));
}

public WarpSyncState(SyncState syncState, Network network,
Expand All @@ -64,17 +64,17 @@ public WarpSyncState(SyncState syncState, Network network,
}

/**
* Update the state with information from a block announce message.
* Update the state with information from a block header.
* Schedule runtime updates found in header, to be executed when block is verified.
*
* @param blockAnnounceMessage received block announce message
* @param header the block header to check for a runtime update digest
*/
public void syncBlockAnnounce(BlockAnnounceMessage blockAnnounceMessage) {
boolean hasRuntimeUpdate = Arrays.stream(blockAnnounceMessage.getHeader().getDigest())
public void syncRuntimeUpdate(BlockHeader header) {
boolean hasRuntimeUpdate = Arrays.stream(header.getDigest())
.anyMatch(d -> d.getType() == DigestType.RUN_ENV_UPDATED);

if (hasRuntimeUpdate) {
scheduledRuntimeUpdateBlocks.add(blockAnnounceMessage.getHeader().getBlockNumber());
scheduledRuntimeUpdateBlocks.add(header.getBlockNumber());
}
}

Expand All @@ -94,10 +94,10 @@ public synchronized void syncCommit(CommitMessage commitMessage, String peerId)
}

log.log(Level.INFO, "Received commit message from peer " + peerId
+ " for block #" + commitMessage.getVote().getBlockNumber()
+ " with hash " + commitMessage.getVote().getBlockHash()
+ " with setId " + commitMessage.getSetId() + " and round " + commitMessage.getRoundNumber()
+ " with " + commitMessage.getPrecommits().length + " voters");
+ " for block #" + commitMessage.getVote().getBlockNumber()
+ " with hash " + commitMessage.getVote().getBlockHash()
+ " with setId " + commitMessage.getSetId() + " and round " + commitMessage.getRoundNumber()
+ " with " + commitMessage.getPrecommits().length + " voters");

boolean verified = JustificationVerifier.verify(commitMessage.getPrecommits(), commitMessage.getRoundNumber());
if (!verified) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
package com.limechain.sync.warpsync.action;

import com.limechain.network.protocol.warp.dto.BlockHeader;
import com.limechain.network.protocol.warp.dto.HeaderDigest;
import com.limechain.network.protocol.warp.scale.reader.HeaderDigestReader;
import com.limechain.polkaj.Hash256;
import com.limechain.polkaj.reader.ScaleCodecReader;
import com.limechain.rpc.ChainRpcClient;
import com.limechain.rpc.GrandpaRpcClient;
import com.limechain.rpc.dto.ChainGetHeaderResult;
import com.limechain.rpc.dto.GrandpaRoundStateResult;
import com.limechain.rpc.server.AppBean;
import com.limechain.storage.block.SyncState;
import com.limechain.sync.warpsync.WarpSyncMachine;
import com.limechain.utils.StringUtils;
import com.limechain.utils.RpcUtils;
import lombok.extern.java.Log;

import java.math.BigInteger;
import java.util.List;
import java.util.logging.Level;

/**
Expand Down Expand Up @@ -55,22 +50,7 @@ public void handle(WarpSyncMachine sync) {
ChainGetHeaderResult headerResult = ChainRpcClient.getHeader(latestFinalizedHashResult.toString());
GrandpaRoundStateResult roundStateResult = GrandpaRpcClient.getGrandpaRoundState();

BlockHeader latestFinalizedHeader = new BlockHeader();
latestFinalizedHeader.setBlockNumber(new BigInteger(
StringUtils.remove0xPrefix(headerResult.getNumber()), 16));
latestFinalizedHeader.setParentHash(Hash256.from(headerResult.getParentHash()));
latestFinalizedHeader.setStateRoot(Hash256.from(headerResult.getStateRoot()));
latestFinalizedHeader.setExtrinsicsRoot(Hash256.from(headerResult.getExtrinsicsRoot()));

List<String> digestHexes = headerResult.getDigest().getLogs();
HeaderDigest[] digests = new HeaderDigest[digestHexes.size()];
for (int i = 0; i < digestHexes.size(); i++) {
digests[i] = new HeaderDigestReader().read(
new ScaleCodecReader(StringUtils.hexToBytes(digestHexes.get(i))));
}
latestFinalizedHeader.setDigest(digests);

syncState.finalizeHeader(latestFinalizedHeader);
syncState.finalizeHeader(RpcUtils.toBlockHeader(headerResult));
syncState.setSetId(BigInteger.valueOf(roundStateResult.getSetId()));
syncState.resetRound();

Expand Down
20 changes: 20 additions & 0 deletions src/main/java/com/limechain/teavm/TeaVMScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.limechain.teavm;

import org.teavm.jso.JSBody;
import org.teavm.jso.JSFunctor;
import org.teavm.jso.JSObject;

public class TeaVMScheduler {

public static void schedule(TeaVMRunnable task, int intervalMillis) {
scheduleNative(task, intervalMillis);
}

@JSBody(params = {"callback", "interval"}, script = "setInterval(callback, interval);")
private static native void scheduleNative(TeaVMRunnable callback, int intervalMillis);

@JSFunctor
public interface TeaVMRunnable extends JSObject {
void run();
}
}
37 changes: 37 additions & 0 deletions src/main/java/com/limechain/utils/RpcUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.limechain.utils;

import com.limechain.network.protocol.warp.dto.BlockHeader;
import com.limechain.network.protocol.warp.dto.HeaderDigest;
import com.limechain.network.protocol.warp.scale.reader.HeaderDigestReader;
import com.limechain.polkaj.Hash256;
import com.limechain.polkaj.reader.ScaleCodecReader;
import com.limechain.rpc.dto.ChainGetHeaderResult;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

import java.math.BigInteger;
import java.util.List;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class RpcUtils {

public static BlockHeader toBlockHeader(ChainGetHeaderResult result) {
BlockHeader header = new BlockHeader();

header.setBlockNumber(new BigInteger(
StringUtils.remove0xPrefix(result.getNumber()), 16));
header.setParentHash(Hash256.from(result.getParentHash()));
header.setStateRoot(Hash256.from(result.getStateRoot()));
header.setExtrinsicsRoot(Hash256.from(result.getExtrinsicsRoot()));

List<String> digestHexes = result.getDigest().getLogs();
HeaderDigest[] digests = new HeaderDigest[digestHexes.size()];
for (int i = 0; i < digestHexes.size(); i++) {
digests[i] = new HeaderDigestReader().read(
new ScaleCodecReader(StringUtils.hexToBytes(digestHexes.get(i))));
}
header.setDigest(digests);

return header;
}
}
18 changes: 18 additions & 0 deletions src/main/java/com/limechain/utils/Stopwatch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.limechain.utils;

public class Stopwatch {

private long startTime;

public Stopwatch() {
reset();
}

public void reset() {
this.startTime = System.currentTimeMillis();
}

public long getElapsedTime() {
return System.currentTimeMillis() - startTime;
}
}

0 comments on commit 1123686

Please sign in to comment.