diff --git a/src/main/java/com/limechain/Main.java b/src/main/java/com/limechain/Main.java index 7944a85fd..280b8ebb7 100644 --- a/src/main/java/com/limechain/Main.java +++ b/src/main/java/com/limechain/Main.java @@ -4,6 +4,8 @@ import com.limechain.client.LightClient; import com.limechain.rpc.RPCFunction; import com.limechain.rpc.RpcClient; +import com.limechain.rpc.WsRpcClient; +import com.limechain.rpc.WsRpcClientImpl; import com.limechain.rpc.server.RpcApp; import com.limechain.utils.DivLogger; import org.teavm.jso.JSBody; @@ -13,13 +15,16 @@ public class Main { - private static final String RPC_VARIABLE_NAME = "rpc"; + private static final String HTTP_RPC = "rpc"; + private static final String WS_RPC = "wsRpc"; private static final DivLogger log = new DivLogger(); public static void main(String[] args) { + exportHttpRpc(RpcClient::sendRpcRequest, JSString.valueOf(HTTP_RPC)); + exportWsRpc(new WsRpcClientImpl(), JSString.valueOf(WS_RPC)); + log.log("Starting LimeChain node..."); - exportAPI(RpcClient::sendRpcRequest, JSString.valueOf(RPC_VARIABLE_NAME)); RpcApp rpcApp = new RpcApp(); rpcApp.start(); @@ -33,6 +38,9 @@ public static void main(String[] args) { } @JSBody(params = {"f", "apiName"}, script = "window[apiName] = f;" + - "window.fruzhin.HTTP.changeRpcExported(true);") - private static native void exportAPI(RPCFunction f, JSString apiName); + "window.fruzhin.HTTP.changeRpcExported(true);") + private static native void exportHttpRpc(RPCFunction f, JSString apiName); + + @JSBody(params = {"c", "apiName"}, script = "window[apiName] = c;") + private static native void exportWsRpc(WsRpcClient c, JSString apiName); } \ No newline at end of file diff --git a/src/main/java/com/limechain/rpc/RpcClient.java b/src/main/java/com/limechain/rpc/RpcClient.java index 67ad05844..f7a534b44 100644 --- a/src/main/java/com/limechain/rpc/RpcClient.java +++ b/src/main/java/com/limechain/rpc/RpcClient.java @@ -34,7 +34,7 @@ public sealed class RpcClient permits ChainRpcClient, GrandpaRpcClient { */ public static String sendRpcRequest(String method, Object[] params) { return HttpRequest.createHttpRequest(POST, LOAD_BALANCER.getNextEndpoint(), - createRpcRequestJson(method, List.of(params))); + createRpcRequestJson(method, List.of(params))); } /** @@ -46,7 +46,7 @@ public static String sendRpcRequest(String method, Object[] params) { */ protected static RpcResponse sendRpcRequest(RpcMethod method, List params) { String jsonResult = HttpRequest.createHttpRequest(POST, LOAD_BALANCER.getNextEndpoint(), - createRpcRequestJson(method.getMethod(), params)); + createRpcRequestJson(method.getMethod(), params)); return OBJECT_MAPPER.mapToClass(jsonResult, RpcResponse.class); } @@ -67,7 +67,7 @@ private static String createRpcRequestJson(String method, List params) { protected static T getResult(RpcResponse response, Class klazz) { if (response.getError() != null) { throw new IllegalStateException("RPC request resulted in an error with code:" + response.getError().getCode() - + " and message:" + response.getError().getMessage()); + + " and message:" + response.getError().getMessage()); } return OBJECT_MAPPER.mapToClass(JsonUtil.stringify(response.getResult()), klazz); diff --git a/src/main/java/com/limechain/rpc/WebsocketState.java b/src/main/java/com/limechain/rpc/WebsocketState.java new file mode 100644 index 000000000..e8e353f73 --- /dev/null +++ b/src/main/java/com/limechain/rpc/WebsocketState.java @@ -0,0 +1,16 @@ +package com.limechain.rpc; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@Getter +@RequiredArgsConstructor +public enum WebsocketState { + + CONNECTING(0), + OPEN(1), + CLOSING(2), + CLOSED(3); + + private final int intValue; +} diff --git a/src/main/java/com/limechain/rpc/WsRpcClient.java b/src/main/java/com/limechain/rpc/WsRpcClient.java new file mode 100644 index 000000000..11f6c7ffe --- /dev/null +++ b/src/main/java/com/limechain/rpc/WsRpcClient.java @@ -0,0 +1,14 @@ +package com.limechain.rpc; + +import org.teavm.jso.JSObject; +import org.teavm.jso.core.JSString; + +/** + * TeaVM overlay interface for a client used to communicate with a full node's RPC server. + */ +public interface WsRpcClient extends JSObject { + + void send(JSString rpcString); + + String nextResponse(); +} diff --git a/src/main/java/com/limechain/rpc/WsRpcClientImpl.java b/src/main/java/com/limechain/rpc/WsRpcClientImpl.java new file mode 100644 index 000000000..e4a1dfa90 --- /dev/null +++ b/src/main/java/com/limechain/rpc/WsRpcClientImpl.java @@ -0,0 +1,89 @@ +package com.limechain.rpc; + +import com.limechain.constants.RpcConstants; +import lombok.SneakyThrows; +import lombok.extern.java.Log; +import org.teavm.jso.browser.Window; +import org.teavm.jso.core.JSString; +import org.teavm.jso.websocket.WebSocket; + +import java.util.ArrayDeque; +import java.util.Queue; + +/** + * The implementation of {@link WsRpcClient}. Uses a native JS Websocket implementation. + */ +@Log +public class WsRpcClientImpl implements WsRpcClient { + + private static final int WS_OPEN_WAIT_MS = 50; + + private WebSocket ws; + private final Queue responseQueue; + + public WsRpcClientImpl() { + responseQueue = new ArrayDeque<>(); + openWebsocketConnection(); + } + + private void openWebsocketConnection() { + log.info("Initializing RPC websocket connection..."); + //TODO change when configuring chain. + ws = new WebSocket(RpcConstants.POLKADOT_WS_RPC); + initHandlers(); + } + + private void initHandlers() { + ws.onClose(e -> { + log.info("RPC websocket connection was closed."); + log.info("Retrying connection..."); + Window.setTimeout(this::openWebsocketConnection, 1000); + }); + + ws.onError(e -> { + log.warning("There was an error in the RPC websocket connection. Closing connection..."); + ws.close(); + }); + + ws.onOpen(e -> log.info("Websocket connection is open.")); + ws.onMessage(e -> responseQueue.offer(e.getDataAsString())); + } + + /** + * Waits for the current ws connection to be in an opened state then sends an RPC request to the full node. + */ + @Override + public void send(JSString rpcString) { + new Thread(() -> { + handleSocketState(); + ws.send(rpcString.stringValue()); + }).start(); + } + + /** + * Handles the state of the websocket when sending a message. If the connection is in a closing (2) or a closed (3) + * state the client throws an error. + */ + @SneakyThrows + private void handleSocketState() { + int startState = ws.getReadyState(); + int openedState = WebsocketState.OPEN.getIntValue(); + + while (startState != openedState) { + if (startState > openedState) { + throw new Exception("Calling function of a closed websocket is prohibited."); + } + + Thread.sleep(WS_OPEN_WAIT_MS); + startState = ws.getReadyState(); + } + } + + /** + * Polls the first item in the queue and returns it as a string. + */ + @Override + public String nextResponse() { + return responseQueue.poll(); + } +} diff --git a/src/main/java/com/limechain/utils/json/JsonParser.java b/src/main/java/com/limechain/utils/json/JsonParser.java index eb98b652d..c4371e590 100644 --- a/src/main/java/com/limechain/utils/json/JsonParser.java +++ b/src/main/java/com/limechain/utils/json/JsonParser.java @@ -159,10 +159,10 @@ private Object parseNumber() { private boolean isValidNumberChar() { return (Character.isDigit(json.charAt(index)) - || json.charAt(index) == '-' - || json.charAt(index) == '.' - || json.charAt(index) == 'e' - || json.charAt(index) == 'E'); + || json.charAt(index) == '-' + || json.charAt(index) == '.' + || json.charAt(index) == 'e' + || json.charAt(index) == 'E'); } private void skipWhitespace() {