diff --git a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java index c37c90fb6..ad91d73a2 100644 --- a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java +++ b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java @@ -2,9 +2,9 @@ import com.limechain.exception.scale.ScaleEncodingException; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshakeBuilder; -import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage; import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceHandshakeScaleWriter; import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceMessageScaleReader; +import com.limechain.network.protocol.warp.dto.BlockHeader; import com.limechain.polkaj.reader.ScaleCodecReader; import com.limechain.polkaj.writer.ScaleCodecWriter; import com.limechain.rpc.server.AppBean; @@ -27,14 +27,14 @@ public class BlockAnnounceEngine { public static void handleBlockAnnounce(byte[] msg, String peerId) { ScaleCodecReader reader = new ScaleCodecReader(msg); - BlockAnnounceMessage announce = reader.read(new BlockAnnounceMessageScaleReader()); + BlockHeader announceHeader = reader.read(new BlockAnnounceMessageScaleReader()).getHeader(); // connectionManager.updatePeer(peerId, announce); - AppBean.getBean(WarpSyncState.class).syncBlockAnnounce(announce); - log.log(Level.FINE, "Received block announce for block #" + announce.getHeader().getBlockNumber() + - " from " + peerId + - " with hash:0x" + announce.getHeader().getHash() + - " parentHash:" + announce.getHeader().getParentHash() + - " stateRoot:" + announce.getHeader().getStateRoot()); + AppBean.getBean(WarpSyncState.class).syncRuntimeUpdate(announceHeader); + log.log(Level.FINE, "Received block announce for block #" + announceHeader.getBlockNumber() + + " from " + peerId + + " with hash:0x" + announceHeader.getHash() + + " parentHash:" + announceHeader.getParentHash() + + " stateRoot:" + announceHeader.getStateRoot()); } public static String getHandshake() { @@ -48,29 +48,29 @@ public static String getHandshake() { } @JSBody(params = {"handshake", "protocolId"}, script = "window.fruzhin.libp.getConnections().forEach(async (peer) => {" + - " let stream = await ItPbStream.pbStream(await window.fruzhin.libp.dialProtocol(peer.remotePeer, protocolId));" + - " stream.writeLP(Ed25519.h2b(handshake));" + - "});") + " let stream = await ItPbStream.pbStream(await window.fruzhin.libp.dialProtocol(peer.remotePeer, protocolId));" + + " stream.writeLP(Ed25519.h2b(handshake));" + + "});") public static native void sendHandshakeToAll(String handshake, String protocolId); @JSBody(params = {"announceExport", "protocolId"}, script = "window.fruzhin.libp.handle(protocolId, async ({connection, stream}) => {" + - " ItPipe.pipe(stream, async function (source) {" + - " for await (const msg of source) {" + - " let subarr = msg.subarray();" + - " if(subarr.length === 69) {" + - " let handshake = announceExport.getHandshake();" + - " (await ItPbStream.pbStream(stream)).writeLP(Ed25519.h2b(handshake));" + - " } else if (subarr.length > 1) {" + - " announceExport.blockAnnounce(Ed25519.b2h(subarr.slice(2)), connection.remotePeer.toString());" + - " }" + - " }" + - " });" + - "});" + - "fruzhin.libp.addEventListener('peer:connect', async (evt) => {" + - " let handshake = announceExport.getHandshake();" + - " (await ItPbStream.pbStream(await window.fruzhin.libp.dialProtocol(evt.detail, protocolId))).writeLP(Ed25519.h2b(handshake));" + - "});") + " ItPipe.pipe(stream, async function (source) {" + + " for await (const msg of source) {" + + " let subarr = msg.subarray();" + + " if(subarr.length === 69) {" + + " let handshake = announceExport.getHandshake();" + + " (await ItPbStream.pbStream(stream)).writeLP(Ed25519.h2b(handshake));" + + " } else if (subarr.length > 1) {" + + " announceExport.blockAnnounce(Ed25519.b2h(subarr.slice(2)), connection.remotePeer.toString());" + + " }" + + " }" + + " });" + + "});" + + "fruzhin.libp.addEventListener('peer:connect', async (evt) => {" + + " let handshake = announceExport.getHandshake();" + + " (await ItPbStream.pbStream(await window.fruzhin.libp.dialProtocol(evt.detail, protocolId))).writeLP(Ed25519.h2b(handshake));" + + "});") public static native void registerHandler(JSObject announceExport, String protocolId); } diff --git a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceService.java b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceService.java index 055070f3c..bc85a0b0e 100644 --- a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceService.java +++ b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceService.java @@ -1,14 +1,27 @@ package com.limechain.network.protocol.blockannounce; import com.limechain.network.protocol.blockannounce.teavm.BlockAnnounceHandler; +import com.limechain.network.protocol.warp.dto.BlockHeader; +import com.limechain.rpc.ChainRpcClient; +import com.limechain.rpc.dto.ChainGetHeaderResult; +import com.limechain.rpc.server.AppBean; +import com.limechain.sync.warpsync.WarpSyncState; +import com.limechain.teavm.TeaVMScheduler; +import com.limechain.utils.RpcUtils; +import com.limechain.utils.Stopwatch; import lombok.extern.java.Log; +import java.util.logging.Level; + @Log public class BlockAnnounceService { private final String protocolId; private boolean isRegistered = false; + // 10 second threshold starts the fallback after one. + private static final long FALLBACK_THRESHOLD = 10_000; + public BlockAnnounceService(String protocolId) { this.protocolId = protocolId; } @@ -16,8 +29,11 @@ public BlockAnnounceService(String protocolId) { public void sendHandshake() { try { if (!isRegistered) { - BlockAnnounceEngine.registerHandler(new BlockAnnounceHandler(), protocolId); + Stopwatch stopwatch = new Stopwatch(); + BlockAnnounceEngine.registerHandler(new BlockAnnounceHandler(stopwatch), protocolId); isRegistered = true; + + registerFallbackRpcScheduler(stopwatch); } } catch (IllegalStateException e) { log.warning("Error registering block announce handler"); @@ -28,4 +44,19 @@ public void sendHandshake() { log.warning("Error sending block announce handshake request"); } } + + private static void registerFallbackRpcScheduler(Stopwatch stopwatch) { + TeaVMScheduler.schedule(() -> { + if (stopwatch.getElapsedTime() > FALLBACK_THRESHOLD) { + ChainGetHeaderResult rpcResult = ChainRpcClient.getHeader(null); + BlockHeader fallbackHeader = RpcUtils.toBlockHeader(rpcResult); + AppBean.getBean(WarpSyncState.class).syncRuntimeUpdate(fallbackHeader); + + log.log(Level.INFO, "Synced block announce via RPC for block #" + fallbackHeader.getBlockNumber() + + " with hash:0x" + fallbackHeader.getHash() + + " parentHash:" + fallbackHeader.getParentHash() + + " stateRoot:" + fallbackHeader.getStateRoot()); + } + }, 6_000); + } } diff --git a/src/main/java/com/limechain/network/protocol/blockannounce/teavm/BlockAnnounceHandler.java b/src/main/java/com/limechain/network/protocol/blockannounce/teavm/BlockAnnounceHandler.java index 217cd4c55..19fa7840c 100644 --- a/src/main/java/com/limechain/network/protocol/blockannounce/teavm/BlockAnnounceHandler.java +++ b/src/main/java/com/limechain/network/protocol/blockannounce/teavm/BlockAnnounceHandler.java @@ -1,16 +1,24 @@ package com.limechain.network.protocol.blockannounce.teavm; import com.limechain.network.protocol.blockannounce.BlockAnnounceEngine; +import com.limechain.utils.Stopwatch; import com.limechain.utils.StringUtils; public class BlockAnnounceHandler implements BlockAnnounceExport { + + private final Stopwatch stopwatch; + + public BlockAnnounceHandler(Stopwatch stopwatch) { + this.stopwatch = stopwatch; + } + public void blockAnnounce(String announce, String peerId) { BlockAnnounceEngine.handleBlockAnnounce(StringUtils.fromHex(announce), peerId); + stopwatch.reset(); } public String getHandshake() { return BlockAnnounceEngine.getHandshake(); } - -} +} \ No newline at end of file diff --git a/src/main/java/com/limechain/rpc/ChainRpcClient.java b/src/main/java/com/limechain/rpc/ChainRpcClient.java index 791480cd6..1e6fcf9e5 100644 --- a/src/main/java/com/limechain/rpc/ChainRpcClient.java +++ b/src/main/java/com/limechain/rpc/ChainRpcClient.java @@ -18,7 +18,11 @@ public static Hash256 getLastFinalizedBlockHash() { } public static ChainGetHeaderResult getHeader(String blockHash) { - RpcResponse response = sendRpcRequest(RpcMethod.CHAIN_GET_HEADER, List.of(blockHash)); + List params = blockHash == null + ? List.of() + : List.of(blockHash); + + RpcResponse response = sendRpcRequest(RpcMethod.CHAIN_GET_HEADER, params); return getResult(response, ChainGetHeaderResult.class); } } diff --git a/src/main/java/com/limechain/rpc/RpcClient.java b/src/main/java/com/limechain/rpc/RpcClient.java index 4b2dca289..67ad05844 100644 --- a/src/main/java/com/limechain/rpc/RpcClient.java +++ b/src/main/java/com/limechain/rpc/RpcClient.java @@ -45,7 +45,7 @@ public static String sendRpcRequest(String method, Object[] params) { * @return The {@link RpcResponse} representation of the received RPC json result. */ protected static RpcResponse sendRpcRequest(RpcMethod method, List params) { - String jsonResult = HttpRequest.asyncHttpRequest(POST, LOAD_BALANCER.getNextEndpoint(), + String jsonResult = HttpRequest.createHttpRequest(POST, LOAD_BALANCER.getNextEndpoint(), createRpcRequestJson(method.getMethod(), params)); return OBJECT_MAPPER.mapToClass(jsonResult, RpcResponse.class); } diff --git a/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java b/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java index 9124c3df2..8ac72d8cd 100644 --- a/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java +++ b/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java @@ -2,9 +2,9 @@ import com.limechain.chain.lightsyncstate.Authority; import com.limechain.network.Network; -import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage; import com.limechain.network.protocol.grandpa.messages.commit.CommitMessage; import com.limechain.network.protocol.grandpa.messages.neighbour.NeighbourMessage; +import com.limechain.network.protocol.warp.dto.BlockHeader; import com.limechain.network.protocol.warp.dto.ConsensusEngine; import com.limechain.network.protocol.warp.dto.DigestType; import com.limechain.network.protocol.warp.dto.HeaderDigest; @@ -49,9 +49,9 @@ public class WarpSyncState { public WarpSyncState(SyncState syncState, Network network) { this(syncState, - network, - new HashSet<>(), - new PriorityQueue<>(Comparator.comparing(Pair::getValue0))); + network, + new HashSet<>(), + new PriorityQueue<>(Comparator.comparing(Pair::getValue0))); } public WarpSyncState(SyncState syncState, Network network, @@ -64,17 +64,17 @@ public WarpSyncState(SyncState syncState, Network network, } /** - * Update the state with information from a block announce message. + * Update the state with information from a block header. * Schedule runtime updates found in header, to be executed when block is verified. * - * @param blockAnnounceMessage received block announce message + * @param header the block header to check for a runtime update digest */ - public void syncBlockAnnounce(BlockAnnounceMessage blockAnnounceMessage) { - boolean hasRuntimeUpdate = Arrays.stream(blockAnnounceMessage.getHeader().getDigest()) + public void syncRuntimeUpdate(BlockHeader header) { + boolean hasRuntimeUpdate = Arrays.stream(header.getDigest()) .anyMatch(d -> d.getType() == DigestType.RUN_ENV_UPDATED); if (hasRuntimeUpdate) { - scheduledRuntimeUpdateBlocks.add(blockAnnounceMessage.getHeader().getBlockNumber()); + scheduledRuntimeUpdateBlocks.add(header.getBlockNumber()); } } @@ -94,10 +94,10 @@ public synchronized void syncCommit(CommitMessage commitMessage, String peerId) } log.log(Level.INFO, "Received commit message from peer " + peerId - + " for block #" + commitMessage.getVote().getBlockNumber() - + " with hash " + commitMessage.getVote().getBlockHash() - + " with setId " + commitMessage.getSetId() + " and round " + commitMessage.getRoundNumber() - + " with " + commitMessage.getPrecommits().length + " voters"); + + " for block #" + commitMessage.getVote().getBlockNumber() + + " with hash " + commitMessage.getVote().getBlockHash() + + " with setId " + commitMessage.getSetId() + " and round " + commitMessage.getRoundNumber() + + " with " + commitMessage.getPrecommits().length + " voters"); boolean verified = JustificationVerifier.verify(commitMessage.getPrecommits(), commitMessage.getRoundNumber()); if (!verified) { diff --git a/src/main/java/com/limechain/sync/warpsync/action/RpcFallbackAction.java b/src/main/java/com/limechain/sync/warpsync/action/RpcFallbackAction.java index 7c2f1b3a0..df7537175 100644 --- a/src/main/java/com/limechain/sync/warpsync/action/RpcFallbackAction.java +++ b/src/main/java/com/limechain/sync/warpsync/action/RpcFallbackAction.java @@ -1,10 +1,6 @@ package com.limechain.sync.warpsync.action; -import com.limechain.network.protocol.warp.dto.BlockHeader; -import com.limechain.network.protocol.warp.dto.HeaderDigest; -import com.limechain.network.protocol.warp.scale.reader.HeaderDigestReader; import com.limechain.polkaj.Hash256; -import com.limechain.polkaj.reader.ScaleCodecReader; import com.limechain.rpc.ChainRpcClient; import com.limechain.rpc.GrandpaRpcClient; import com.limechain.rpc.dto.ChainGetHeaderResult; @@ -12,11 +8,10 @@ import com.limechain.rpc.server.AppBean; import com.limechain.storage.block.SyncState; import com.limechain.sync.warpsync.WarpSyncMachine; -import com.limechain.utils.StringUtils; +import com.limechain.utils.RpcUtils; import lombok.extern.java.Log; import java.math.BigInteger; -import java.util.List; import java.util.logging.Level; /** @@ -55,22 +50,7 @@ public void handle(WarpSyncMachine sync) { ChainGetHeaderResult headerResult = ChainRpcClient.getHeader(latestFinalizedHashResult.toString()); GrandpaRoundStateResult roundStateResult = GrandpaRpcClient.getGrandpaRoundState(); - BlockHeader latestFinalizedHeader = new BlockHeader(); - latestFinalizedHeader.setBlockNumber(new BigInteger( - StringUtils.remove0xPrefix(headerResult.getNumber()), 16)); - latestFinalizedHeader.setParentHash(Hash256.from(headerResult.getParentHash())); - latestFinalizedHeader.setStateRoot(Hash256.from(headerResult.getStateRoot())); - latestFinalizedHeader.setExtrinsicsRoot(Hash256.from(headerResult.getExtrinsicsRoot())); - - List digestHexes = headerResult.getDigest().getLogs(); - HeaderDigest[] digests = new HeaderDigest[digestHexes.size()]; - for (int i = 0; i < digestHexes.size(); i++) { - digests[i] = new HeaderDigestReader().read( - new ScaleCodecReader(StringUtils.hexToBytes(digestHexes.get(i)))); - } - latestFinalizedHeader.setDigest(digests); - - syncState.finalizeHeader(latestFinalizedHeader); + syncState.finalizeHeader(RpcUtils.toBlockHeader(headerResult)); syncState.setSetId(BigInteger.valueOf(roundStateResult.getSetId())); syncState.resetRound(); diff --git a/src/main/java/com/limechain/teavm/TeaVMScheduler.java b/src/main/java/com/limechain/teavm/TeaVMScheduler.java new file mode 100644 index 000000000..0e727ece9 --- /dev/null +++ b/src/main/java/com/limechain/teavm/TeaVMScheduler.java @@ -0,0 +1,20 @@ +package com.limechain.teavm; + +import org.teavm.jso.JSBody; +import org.teavm.jso.JSFunctor; +import org.teavm.jso.JSObject; + +public class TeaVMScheduler { + + public static void schedule(TeaVMRunnable task, int intervalMillis) { + scheduleNative(task, intervalMillis); + } + + @JSBody(params = {"callback", "interval"}, script = "setInterval(callback, interval);") + private static native void scheduleNative(TeaVMRunnable callback, int intervalMillis); + + @JSFunctor + public interface TeaVMRunnable extends JSObject { + void run(); + } +} diff --git a/src/main/java/com/limechain/utils/RpcUtils.java b/src/main/java/com/limechain/utils/RpcUtils.java new file mode 100644 index 000000000..85f2e7d53 --- /dev/null +++ b/src/main/java/com/limechain/utils/RpcUtils.java @@ -0,0 +1,37 @@ +package com.limechain.utils; + +import com.limechain.network.protocol.warp.dto.BlockHeader; +import com.limechain.network.protocol.warp.dto.HeaderDigest; +import com.limechain.network.protocol.warp.scale.reader.HeaderDigestReader; +import com.limechain.polkaj.Hash256; +import com.limechain.polkaj.reader.ScaleCodecReader; +import com.limechain.rpc.dto.ChainGetHeaderResult; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.math.BigInteger; +import java.util.List; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class RpcUtils { + + public static BlockHeader toBlockHeader(ChainGetHeaderResult result) { + BlockHeader header = new BlockHeader(); + + header.setBlockNumber(new BigInteger( + StringUtils.remove0xPrefix(result.getNumber()), 16)); + header.setParentHash(Hash256.from(result.getParentHash())); + header.setStateRoot(Hash256.from(result.getStateRoot())); + header.setExtrinsicsRoot(Hash256.from(result.getExtrinsicsRoot())); + + List digestHexes = result.getDigest().getLogs(); + HeaderDigest[] digests = new HeaderDigest[digestHexes.size()]; + for (int i = 0; i < digestHexes.size(); i++) { + digests[i] = new HeaderDigestReader().read( + new ScaleCodecReader(StringUtils.hexToBytes(digestHexes.get(i)))); + } + header.setDigest(digests); + + return header; + } +} diff --git a/src/main/java/com/limechain/utils/Stopwatch.java b/src/main/java/com/limechain/utils/Stopwatch.java new file mode 100644 index 000000000..88a13daeb --- /dev/null +++ b/src/main/java/com/limechain/utils/Stopwatch.java @@ -0,0 +1,18 @@ +package com.limechain.utils; + +public class Stopwatch { + + private long startTime; + + public Stopwatch() { + reset(); + } + + public void reset() { + this.startTime = System.currentTimeMillis(); + } + + public long getElapsedTime() { + return System.currentTimeMillis() - startTime; + } +}