Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-web-devtools does not work for grpc calls made in a worker thread #35

Open
iam-johntan opened this issue Jun 19, 2019 · 4 comments

Comments

@iam-johntan
Copy link

We offload some of the heavier, streaming grpc calls to a worker thread for performance reasons. self.__GRPCWEB_DEVTOOLS__ is not defined in the worker's global scope.

Is it possible to make grpc-web-dev-tools available in the worker thread?

@rogchap
Copy link
Contributor

rogchap commented Jun 27, 2019

Interesting; and good idea to use a worker thread.
__GRPCWEB_DEVTOOLS__ is currently injected into the main content window. The devtools give you a very limited API to get running scripts inside of user window, so it may be hard to get a connection inside the worker thread.
I'll take a closer look to see if this is possible, but I'm not overly hopeful.

@markns
Copy link

markns commented Aug 31, 2019

Have the same use case: streaming connection in a worker thread.
Wondering if you'd determined the possibility of getting a connection in the worker thread?

@rogchap
Copy link
Contributor

rogchap commented Aug 31, 2019

I haven’t looked into this; a quick Google does not show much information about dev tools APIs and connections to worker threads.

If you could point me to any reference material or docs that you have come across, I’m happy to take a closer look.

@dnasir
Copy link

dnasir commented Dec 4, 2020

I recently ran into this problem. I also found out that I could not communicate with the devtool from the worker

My solution was to reuse the client code, but instead of calling postMessage on the window object, I had it call self.postMessage to send the data from the worker to the main app. On the app side, I have an event listener attached to the worker, listening for messages with the type property set to __GRPCWEB_DEVTOOLS__, and I then relay the entire object to the devtool using window.postMessage.

Here's the reused code:

import { Message } from 'google-protobuf';
import {
    ClientReadableStream,
    Status,
    Metadata,
    MethodDescriptor,
    Error,
    AbstractClientBase
} from 'grpc-web';

// This plugin is based on the gRPC-Web Devtool (https://github.com/SafetyCulture/grpc-web-devtools)
// but repurposed to work within a Web Worker context.

const postType = '__GRPCWEB_DEVTOOLS__';

/**
 * Helper method that relays messages from the worker
 * to the main gRPC-Web Devtool plugin.
 * @param worker Worker instance
 */
export function __GRPCWEB_DEVTOOLS__(worker: Worker) {
    worker.addEventListener(
        'message',
        function (event: MessageEvent) {
            if (event.data?.type == postType) {
                window.postMessage(event.data, '*');
            }
        },
        false
    );
}

class StreamInterceptor {
    _stream: ClientReadableStream<Message>;
    _callbacks = {};

    constructor(
        method: string,
        request: Message,
        stream: ClientReadableStream<Message>,
        ctx: Worker
    ) {
        const methodType = 'server_streaming';
        ctx.postMessage({
            type: postType,
            method,
            methodType,
            request: request.toObject()
        });
        stream.on('data', (response: Message) => {
            ctx.postMessage({
                type: postType,
                method,
                methodType,
                response: response.toObject()
            });
            if (!!this._callbacks['data']) {
                this._callbacks['data'](response);
            }
        });
        stream.on('status', (status: Status) => {
            if (status.code === 0) {
                ctx.postMessage({
                    type: postType,
                    method,
                    methodType,
                    response: 'EOF'
                });
            }
            if (!!this._callbacks['status']) {
                this._callbacks['status'](status);
            }
        });
        stream.on('error', (error: Error) => {
            if (error.code !== 0) {
                ctx.postMessage({
                    type: postType,
                    method,
                    methodType,
                    error: {
                        code: error.code,
                        message: error.message
                    }
                });
            }
            if (!!this._callbacks['error']) {
                this._callbacks['error'](error);
            }
        });
        this._stream = stream;
    }

    on(eventType: any, callback: any) {
        this._stream.on(eventType, callback);
        return this;
    }

    removeListener(eventType: any, callback: any) {
        this._stream.removeListener(eventType, callback);
    }

    cancel() {
        this._stream.cancel();
    }
}

/**
 * The helper method that adds interceptors to
 * the gRPC client inside a Worker.
 * @param client gRPC client instance
 * @param ctx Worker context
 */
export function __GRPCWEB_WORKER_DEVTOOLS__(
    client: { client_: AbstractClientBase },
    ctx: Worker
) {
    client.client_['rpcCall_'] = client.client_.rpcCall;
    client.client_['rpcCall2'] = function (
        method: string,
        request: Message,
        metadata: Metadata,
        methodDescriptor: MethodDescriptor<Message, Message>,
        callback: (err: Error, response: Message) => void
    ): ClientReadableStream<Message> {
        let posted = false;
        const newCallback = function (err: Error, response: Message) {
            if (!posted) {
                ctx.postMessage({
                    type: postType,
                    method,
                    methodType: 'unary',
                    request: (request as any).toObject(),
                    response: err ? undefined : response.toObject(),
                    error: err || undefined
                });
                posted = true;
            }
            callback(err, response);
        };

        return this['rpcCall_'](
            method,
            request,
            metadata,
            methodDescriptor,
            newCallback
        );
    };
    client.client_.rpcCall = client.client_['rpcCall2'];
    client.client_['unaryCall'] = function (
        method: string,
        request: Message,
        metadata: Metadata,
        methodDescriptor: MethodDescriptor<Message, Message>
    ) {
        return new Promise((resolve, reject) => {
            this['rpcCall2'](
                method,
                request,
                metadata,
                methodDescriptor,
                function (error: Error, response: Message) {
                    error ? reject(error) : resolve(response);
                }
            );
        });
    };
    client.client_['serverStreaming_'] = client.client_.serverStreaming;
    client.client_['serverStreaming2'] = function (
        method: string,
        request: Message,
        metadata: Metadata,
        methodDescriptor: MethodDescriptor<Message, Message>
    ) {
        const stream = client.client_['serverStreaming_'](
            method,
            request,
            metadata,
            methodDescriptor
        );
        const si = new StreamInterceptor(method, request, stream, ctx);
        return si;
    };
    client.client_.serverStreaming = client.client_['serverStreaming2'];
}

And here's how you use it:

// app.ts
import Worker from 'worker-loader!./worker.ts';

const worker = new Worker();
__GRPCWEB_DEVTOOLS__(worker);

// worker.ts
const client = new EchoServiceClient('https://localhost:5051');
const ctx: Worker = self as any;
__GRPCWEB_WORKER_DEVTOOLS__(client, ctx);

Please adjust your code as necessary.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants