Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement WsRpcClient and support all required rpc methods. #21

Merged
merged 5 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions src/main/java/com/limechain/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.limechain.client.LightClient;
import com.limechain.rpc.RPCFunction;
import com.limechain.rpc.RpcClient;
import com.limechain.rpc.WsRpcClient;
import com.limechain.rpc.WsRpcClientImpl;
import com.limechain.rpc.server.RpcApp;
import com.limechain.utils.DivLogger;
import org.teavm.jso.JSBody;
Expand All @@ -13,13 +15,16 @@

public class Main {

private static final String RPC_VARIABLE_NAME = "rpc";
private static final String HTTP_RPC = "rpc";
private static final String WS_RPC = "wsRpc";

private static final DivLogger log = new DivLogger();

public static void main(String[] args) {
exportHttpRpc(RpcClient::sendRpcRequest, JSString.valueOf(HTTP_RPC));
exportWsRpc(new WsRpcClientImpl(), JSString.valueOf(WS_RPC));

log.log("Starting LimeChain node...");
exportAPI(RpcClient::sendRpcRequest, JSString.valueOf(RPC_VARIABLE_NAME));

RpcApp rpcApp = new RpcApp();
rpcApp.start();
Expand All @@ -33,6 +38,9 @@ public static void main(String[] args) {
}

@JSBody(params = {"f", "apiName"}, script = "window[apiName] = f;" +
"window.fruzhin.HTTP.changeRpcExported(true);")
private static native void exportAPI(RPCFunction f, JSString apiName);
"window.fruzhin.HTTP.changeRpcExported(true);")
private static native void exportHttpRpc(RPCFunction f, JSString apiName);

@JSBody(params = {"c", "apiName"}, script = "window[apiName] = c;")
private static native void exportWsRpc(WsRpcClient c, JSString apiName);
}
6 changes: 3 additions & 3 deletions src/main/java/com/limechain/rpc/RpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public sealed class RpcClient permits ChainRpcClient, GrandpaRpcClient {
*/
public static String sendRpcRequest(String method, Object[] params) {
return HttpRequest.createHttpRequest(POST, LOAD_BALANCER.getNextEndpoint(),
createRpcRequestJson(method, List.of(params)));
createRpcRequestJson(method, List.of(params)));
}

/**
Expand All @@ -46,7 +46,7 @@ public static String sendRpcRequest(String method, Object[] params) {
*/
protected static RpcResponse sendRpcRequest(RpcMethod method, List<Object> params) {
String jsonResult = HttpRequest.createHttpRequest(POST, LOAD_BALANCER.getNextEndpoint(),
createRpcRequestJson(method.getMethod(), params));
createRpcRequestJson(method.getMethod(), params));
return OBJECT_MAPPER.mapToClass(jsonResult, RpcResponse.class);
}

Expand All @@ -67,7 +67,7 @@ private static String createRpcRequestJson(String method, List<Object> params) {
protected static <T> T getResult(RpcResponse response, Class<T> klazz) {
if (response.getError() != null) {
throw new IllegalStateException("RPC request resulted in an error with code:" + response.getError().getCode()
+ " and message:" + response.getError().getMessage());
+ " and message:" + response.getError().getMessage());
}

return OBJECT_MAPPER.mapToClass(JsonUtil.stringify(response.getResult()), klazz);
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/com/limechain/rpc/WebsocketState.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.limechain.rpc;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

@Getter
@RequiredArgsConstructor
public enum WebsocketState {

CONNECTING(0),
OPEN(1),
CLOSING(2),
CLOSED(3);

private final int intValue;
}
14 changes: 14 additions & 0 deletions src/main/java/com/limechain/rpc/WsRpcClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.limechain.rpc;

import org.teavm.jso.JSObject;
import org.teavm.jso.core.JSString;

/**
* TeaVM overlay interface for a client used to communicate with a full node's RPC server.
*/
public interface WsRpcClient extends JSObject {

void send(JSString rpcString);

String nextResponse();
}
89 changes: 89 additions & 0 deletions src/main/java/com/limechain/rpc/WsRpcClientImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.limechain.rpc;

import com.limechain.constants.RpcConstants;
import lombok.SneakyThrows;
import lombok.extern.java.Log;
import org.teavm.jso.browser.Window;
import org.teavm.jso.core.JSString;
import org.teavm.jso.websocket.WebSocket;

import java.util.ArrayDeque;
import java.util.Queue;

/**
* The implementation of {@link WsRpcClient}. Uses a native JS Websocket implementation.
*/
@Log
public class WsRpcClientImpl implements WsRpcClient {

private static final int WS_OPEN_WAIT_MS = 50;

private WebSocket ws;
private final Queue<String> responseQueue;

public WsRpcClientImpl() {
responseQueue = new ArrayDeque<>();
openWebsocketConnection();
}

private void openWebsocketConnection() {
log.info("Initializing RPC websocket connection...");
//TODO change when configuring chain.
ws = new WebSocket(RpcConstants.POLKADOT_WS_RPC);
initHandlers();
}

private void initHandlers() {
ws.onClose(e -> {
log.info("RPC websocket connection was closed.");
log.info("Retrying connection...");
Window.setTimeout(this::openWebsocketConnection, 1000);
});

ws.onError(e -> {
log.warning("There was an error in the RPC websocket connection. Closing connection...");
ws.close();
});

ws.onOpen(e -> log.info("Websocket connection is open."));
ws.onMessage(e -> responseQueue.offer(e.getDataAsString()));
}

/**
* Waits for the current ws connection to be in an opened state then sends an RPC request to the full node.
*/
@Override
public void send(JSString rpcString) {
new Thread(() -> {
handleSocketState();
ws.send(rpcString.stringValue());
}).start();
}

/**
* Handles the state of the websocket when sending a message. If the connection is in a closing (2) or a closed (3)
* state the client throws an error.
*/
Zurcusa marked this conversation as resolved.
Show resolved Hide resolved
@SneakyThrows
private void handleSocketState() {
int startState = ws.getReadyState();
int openedState = WebsocketState.OPEN.getIntValue();

while (startState != openedState) {
if (startState > openedState) {
throw new Exception("Calling function of a closed websocket is prohibited.");
}

Thread.sleep(WS_OPEN_WAIT_MS);
startState = ws.getReadyState();
}
}

/**
* Polls the first item in the queue and returns it as a string.
*/
@Override
public String nextResponse() {
return responseQueue.poll();
}
}
8 changes: 4 additions & 4 deletions src/main/java/com/limechain/utils/json/JsonParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ private Object parseNumber() {

private boolean isValidNumberChar() {
return (Character.isDigit(json.charAt(index))
|| json.charAt(index) == '-'
|| json.charAt(index) == '.'
|| json.charAt(index) == 'e'
|| json.charAt(index) == 'E');
|| json.charAt(index) == '-'
|| json.charAt(index) == '.'
|| json.charAt(index) == 'e'
|| json.charAt(index) == 'E');
}

private void skipWhitespace() {
Expand Down
Loading