Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve reconnection for hypha #557

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
*.egg-info
*/dist
.pypirc
.ipynb_checkpoints
#----------------------------------------------------------------------------
# EMBER-CLI DEFAULT
Expand Down
2 changes: 1 addition & 1 deletion javascript/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion javascript/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "imjoy-rpc",
"version": "0.5.47",
"version": "0.5.48",
"description": "Remote procedure calls for ImJoy.",
"module": "index.js",
"types": "index.d.ts",
Expand Down
3 changes: 2 additions & 1 deletion javascript/src/hypha/rpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ export class RPC extends MessageEmitter {
name: "RPC built-in services",
config: { require_context: true, visibility: "public" },
ping: this._ping.bind(this),
get_client_info: this.get_client_info.bind(this),
get_service: this.get_local_service.bind(this),
register_service: this.register_service.bind(this),
message_cache: {
Expand Down Expand Up @@ -893,7 +894,7 @@ export class RPC extends MessageEmitter {
}
}

get_client_info() {
get_client_info(context) {
const services = [];
for (let service of Object.values(this._services)) {
services.push({
Expand Down
204 changes: 115 additions & 89 deletions javascript/src/hypha/websocket-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ export { version as VERSION } from "../../package.json";
export { loadRequirements };
export { getRTCService, registerRTCService };

const MAX_RETRY = 10000;

class WebsocketRPCConnection {
constructor(
server_url,
Expand All @@ -19,135 +17,161 @@ class WebsocketRPCConnection {
WebSocketClass = null
) {
assert(server_url && client_id, "server_url and client_id are required");
server_url = server_url + "?client_id=" + client_id;
if (workspace) {
server_url += "&workspace=" + workspace;
}
if (token) {
server_url += "&token=" + token;
}
this._server_url = server_url;
this._client_id = client_id;
this._workspace = workspace;
this._token = token;
this._reconnection_token = null;
this._websocket = null;
this._handle_message = null;
this._reconnection_token = null;
this._server_url = server_url;
this._timeout = timeout * 1000; // converting to ms
this._disconnect_handler = null; // Disconnection event handler
this._on_open = null; // Connection open event handler
this._timeout = timeout * 1000; // Convert seconds to milliseconds
this._WebSocketClass = WebSocketClass || WebSocket; // Allow overriding the WebSocket class
this._opening = null;
this._retry_count = 0;
this._closing = false;
// Allow to override the WebSocket class for mocking or testing
this._WebSocketClass = WebSocketClass || WebSocket;
}

set_reconnection_token(token) {
this._reconnection_token = token;
this._legacy_auth = null;
}

on_message(handler) {
assert(handler, "handler is required");
this._handle_message = handler;
}

async open() {
if (this._opening) {
return this._opening;
}
this._opening = new Promise((resolve, reject) => {
const server_url = this._reconnection_token
? `${this._server_url}&reconnection_token=${this._reconnection_token}`
: this._server_url;
console.info("Creating a new connection to ", server_url.split("?")[0]);
on_disconnected(handler) {
this._disconnect_handler = handler;
}

on_open(handler) {
this._on_open = handler;
}

set_reconnection_token(token) {
this._reconnection_token = token;
}

async _attempt_connection(server_url, attempt_fallback = true) {
return new Promise((resolve, reject) => {
this._legacy_auth = false;
const websocket = new this._WebSocketClass(server_url);
websocket.binaryType = "arraybuffer";
websocket.onmessage = event => {
const data = event.data;
this._handle_message(data);
};

websocket.onopen = () => {
this._websocket = websocket;
console.info("WebSocket connection established");
this._retry_count = 0; // Reset retry count
resolve();
resolve(websocket);
};

websocket.onerror = event => {
console.error("WebSocket connection error:", event);
reject(event);
};

websocket.onclose = event => {
console.log("websocket closed");
if (!this._closing) {
console.log("Websocket connection interrupted, retrying...");
this._retry_count++;
setTimeout(() => this.open(), this._timeout);
if (event.code === 1003 && attempt_fallback) {
console.info(
"Received 1003 error, attempting connection with query parameters."
);
this._attempt_connection_with_query_params(server_url)
.then(resolve)
.catch(reject);
} else if (this._disconnect_handler) {
this._disconnect_handler(this, event.reason);
}
this._websocket = null;
};

websocket.onerror = event => {
console.log("Error occurred in websocket connection: ", event);
reject(new Error("Websocket connection failed."));
this._websocket = null;
websocket.onmessage = event => {
const data = event.data;
this._handle_message(data);
};
}).finally(() => {
this._opening = null;
});
return this._opening;
}

async emit_message(data) {
assert(this._handle_message, "No handler for message");
if (!this._websocket || this._websocket.readyState !== WebSocket.OPEN) {
await this.open();
}
return new Promise((resolve, reject) => {
if (!this._websocket) {
reject(new Error("Websocket connection not available"));
} else if (this._websocket.readyState === WebSocket.CONNECTING) {
const timeout = setTimeout(() => {
reject(new Error("WebSocket connection timed out"));
}, this._timeout);
async _attempt_connection_with_query_params(server_url) {
// Initialize an array to hold parts of the query string
const queryParamsParts = [];

this._websocket.addEventListener("open", () => {
clearTimeout(timeout);
try {
this._websocket.send(data);
resolve();
} catch (exp) {
console.error(`Failed to send data, error: ${exp}`);
reject(exp);
}
// Conditionally add each parameter if it has a non-empty value
if (this._client_id)
queryParamsParts.push(`client_id=${encodeURIComponent(this._client_id)}`);
if (this._workspace)
queryParamsParts.push(`workspace=${encodeURIComponent(this._workspace)}`);
if (this._token)
queryParamsParts.push(`token=${encodeURIComponent(this._token)}`);
if (this._reconnection_token)
queryParamsParts.push(
`reconnection_token=${encodeURIComponent(this._reconnection_token)}`
);

// Join the parts with '&' to form the final query string, prepend '?' if there are any parameters
const queryString =
queryParamsParts.length > 0 ? `?${queryParamsParts.join("&")}` : "";

// Construct the full URL by appending the query string if it exists
const full_url = server_url + queryString;

this._legacy_auth = true; // Assuming this flag is needed for some other logic
return await this._attempt_connection(full_url, false);
}

async open() {
if (this._closing || this._websocket) {
return; // Avoid opening a new connection if closing or already open
}
try {
this._opening = true;
this._websocket = await this._attempt_connection(this._server_url);
if (this._legacy_auth) {
// Send authentication info as the first message if connected without query params
const authInfo = JSON.stringify({
client_id: this._client_id,
workspace: this._workspace,
token: this._token,
reconnection_token: this._reconnection_token
});
} else if (this._websocket.readyState === WebSocket.OPEN) {
try {
this._websocket.send(data);
resolve();
} catch (exp) {
console.error(`Failed to send data, error: ${exp}`);
reject(exp);
}
} else {
reject(new Error("WebSocket is not in the OPEN or CONNECTING state"));
this._websocket.send(authInfo);
}
});

if (this._on_open) {
this._on_open();
}
} catch (error) {
console.error("Failed to connect to", this._server_url, error);
} finally {
this._opening = false;
}
}

async emit_message(data) {
if (this._closing) {
throw new Error("Connection is closing");
}
await this._opening;
if (!this._websocket || this._websocket.readyState !== WebSocket.OPEN) {
throw new Error("WebSocket connection is not open");
}
try {
this._websocket.send(data);
} catch (exp) {
console.error(`Failed to send data, error: ${exp}`);
throw exp;
}
}

disconnect(reason) {
this._closing = true;
const ws = this._websocket;
this._websocket = null;
if (ws && ws.readyState === WebSocket.OPEN) {
ws.close(1000, reason);
if (this._websocket && this._websocket.readyState === WebSocket.OPEN) {
this._websocket.close(1000, reason);
console.info(`WebSocket connection disconnected (${reason})`);
}
console.info(`Websocket connection disconnected (${reason})`);
}
}

function normalizeServerUrl(server_url) {
if (!server_url) throw new Error("server_url is required");
if (server_url.startsWith("http://")) {
server_url =
server_url.replace("http://", "ws://").replace(/\/$/, "") + "/ws";
return server_url.replace("http://", "ws://").replace(/\/$/, "") + "/ws";
} else if (server_url.startsWith("https://")) {
server_url =
server_url.replace("https://", "wss://").replace(/\/$/, "") + "/ws";
return server_url.replace("https://", "wss://").replace(/\/$/, "") + "/ws";
}
return server_url;
}
Expand Down Expand Up @@ -227,6 +251,8 @@ export async function connectToServer(config) {
wm.listPlugins = wm.listServices;
wm.disconnect = disconnect;
wm.registerCodec = rpc.register_codec.bind(rpc);
wm.on_disconnected = connection.on_disconnected.bind(connection);
wm.on_open = connection.on_open.bind(connection);
if (config.webrtc) {
await registerRTCService(wm, clientId + "-rtc", config.webrtc_config);
}
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion python/imjoy_rpc/VERSION
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"version": "0.5.47"
"version": "0.5.48"
}
1 change: 1 addition & 0 deletions python/imjoy_rpc/connection/colab_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def registered(comm, open_msg):

def init(self, config=None):
"""Initialize the connection."""

# register a minimal plugin api
def setup():
"""Set up plugin."""
Expand Down
1 change: 1 addition & 0 deletions python/imjoy_rpc/connection/jupyter_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def registered(comm, open_msg):

def init(self, config=None):
"""Initialize the connection."""

# register a minimal plugin api
def setup():
pass
Expand Down
1 change: 1 addition & 0 deletions python/imjoy_rpc/connection/socketio_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def register_codec(self, config):

def init(self, config=None):
"""Initialize the connection."""

# register a minimal plugin api
def setup():
pass
Expand Down
4 changes: 2 additions & 2 deletions python/imjoy_rpc/hypha/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def __init__(
"name": "RPC built-in services",
"config": {"require_context": True, "visibility": "public"},
"ping": self._ping,
"get_client_info": self.get_client_info,
"get_service": self.get_local_service,
"register_service": self.register_service,
"message_cache": {
Expand Down Expand Up @@ -884,7 +885,7 @@ async def _notify_service_update(self):
exp,
)

def get_client_info(self):
def get_client_info(self, context=None):
"""Get client info."""
return {
"id": self._client_id,
Expand Down Expand Up @@ -1118,7 +1119,6 @@ def _encode(
return b_object

if callable(a_object):

if a_object in self._method_annotations:
annotation = self._method_annotations[a_object]
b_object = {
Expand Down
1 change: 1 addition & 0 deletions python/imjoy_rpc/hypha/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ def on(self, event, handler):

def once(self, event, handler):
"""Register an event handler that should only run once."""

# wrap the handler function,
# this is needed because setting property
# won't work for member function of a class instance
Expand Down
Loading
Loading