From 4b7a6788500d22597f824696cd79c096d8ef8be7 Mon Sep 17 00:00:00 2001 From: Fredrik Lindberg Date: Thu, 31 Aug 2023 21:56:59 +0200 Subject: [PATCH] fix: improve http ingress upgrade request handling --- src/ingress/http-ingress.js | 47 +++++++++++-- test/system/ingress/test_http_ingress.js | 80 ++++++++++++++++++++- test/unit/test-utils.ts | 88 +++++++++++++++++++++++- 3 files changed, 206 insertions(+), 9 deletions(-) diff --git a/src/ingress/http-ingress.js b/src/ingress/http-ingress.js index 8d129f6..a69a429 100644 --- a/src/ingress/http-ingress.js +++ b/src/ingress/http-ingress.js @@ -345,7 +345,6 @@ class HttpIngress { } async handleUpgradeRequest(req, sock, head, baseUrl) { - const _canonicalHttpResponse = (sock, request, response) => { sock.write(`HTTP/${request.httpVersion} ${response.status} ${response.statusLine}\r\n`); sock.write('\r\n'); @@ -391,8 +390,33 @@ class HttpIngress { port: this.httpListener.getPort(), } }; - const target = this.tunnelService.createConnection(tunnel.id, ctx); - if (target === undefined) { + const target = this.tunnelService.createConnection(tunnel.id, ctx, (err) => { + if (!err) { + return; + } + let statusCode; + let statusLine; + let msg; + if (err.code === 'EMFILE') { + statusCode = 429; + statusLine = 'Too Many Requests'; + msg = ERROR_TUNNEL_TRANSPORT_REQUEST_LIMIT; + } else if (err.code == 'ECONNRESET') { + statusCode = 503; + statusLine = 'Service Unavailable'; + msg = ERROR_TUNNEL_TARGET_CON_REFUSED; + } else { + statusCode = 503; + statusLine = 'Service Unavailable'; + msg = ERROR_TUNNEL_TARGET_CON_FAILED; + } + _canonicalHttpResponse(sock, req, { + status: statusCode, + statusLine, + body: JSON.stringify({error: msg}), + }); + }); + if (!target) { _canonicalHttpResponse(sock, req, { status: 503, statusLine: 'Service Unavailable', @@ -402,11 +426,22 @@ class HttpIngress { } const headers = this._requestHeaders(req, tunnel, baseUrl); - target.on('error', (err) => { - sock.end(); - }); + + const close = (err) => { + target.off('error', close); + target.off('close', close); + sock.off('error', close); + sock.off('close', close); + sock.destroy(); + target.destroy(); + }; target.on('connect', () => { + target.on('error', close); + target.on('close', close); + sock.on('error', close); + sock.on('close', close); + target.pipe(sock); sock.pipe(target); diff --git a/test/system/ingress/test_http_ingress.js b/test/system/ingress/test_http_ingress.js index 93efa0f..9a1dec4 100644 --- a/test/system/ingress/test_http_ingress.js +++ b/test/system/ingress/test_http_ingress.js @@ -5,10 +5,11 @@ import EventBus from "../../../src/cluster/eventbus.js"; import Config from "../../../src/config.js"; import Ingress from "../../../src/ingress/index.js"; import TunnelService from "../../../src/tunnel/tunnel-service.js"; -import { initClusterService, initStorageService, wsSocketPair, wsmPair } from "../../unit/test-utils.ts"; -import WebSocketTransport from '../../../src/transport/ws/ws-transport.js'; +import { createEchoHttpServer, initClusterService, initStorageService, wsSocketPair, wsmPair } from "../../unit/test-utils.ts"; import { setTimeout } from 'timers/promises'; import sinon from 'sinon'; +import net from 'net' +import http from 'http'; describe('http ingress', () => { let clock; @@ -117,4 +118,79 @@ describe('http ingress', () => { await sockPair.terminate(); }).timeout(2000); + + it(`http ingress can handle websocket upgrades`, async () => { + const sockPair = await wsSocketPair.create(9000) + const [sock1, sock2] = wsmPair(sockPair) + const echoServer = await createEchoHttpServer(20000); + + sock2.on('connection', (sock) => { + const targetSock = new net.Socket(); + targetSock.connect({ + host: 'localhost', + port: 20000 + }, () => { + targetSock.pipe(sock); + sock.pipe(targetSock); + }); + + const close = () => { + targetSock.unpipe(sock); + sock.unpipe(targetSock); + sock.destroy(); + targetSock.destroy(); + }; + + targetSock.on('close', close); + sock.on('close', close); + sock.on('error', () => { + close(); + }); + targetSock.on('error', () => { + close(); + }); + }); + + let res = await tunnelService.connect(tunnel.id, account.id, sock1, {peer: "127.0.0.1"}); + assert(res == true, "failed to connect tunnel"); + + let i = 0; + let tun; + do { + await setTimeout(100); + tun = await tunnelService._get(tunnel.id) + } while (tun.state().connected == false && i++ < 10); + assert(tun.state().connected == true, "tunnel not connected") + + const req = http.request({ + hostname: 'localhost', + port: 10000, + method: 'GET', + path: '/ws', + headers: { + "Host": `${tunnel.id}.localhost.example`, + "Connection": 'Upgrade', + "Upgrade": 'websocket', + "Origin": `http://${tunnel.id}.localhost.example`, + "Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ==", + "Sec-WebSocket-Version": "13" + } + }); + + const done = (resolve) => { + req.on('upgrade', (res, socket, head) => { + const body = head.subarray(2); + resolve(body); + }); + }; + req.end(); + + const wsRes = await new Promise(done); + assert(wsRes.equals(Buffer.from("ws echo connected")), `got ${wsRes}`); + + await sock1.destroy(); + await sock2.destroy(); + await sockPair.terminate(); + echoServer.destroy(); + }); }); \ No newline at end of file diff --git a/test/unit/test-utils.ts b/test/unit/test-utils.ts index 26612fa..0d4854e 100644 --- a/test/unit/test-utils.ts +++ b/test/unit/test-utils.ts @@ -1,7 +1,9 @@ +import * as http from 'node:http'; +import { Duplex } from 'node:stream'; +import * as url from 'node:url'; import { WebSocket, WebSocketServer } from "ws"; import ClusterService from "../../src/cluster/index.js"; import { StorageService } from "../../src/storage/index.js"; -import { Duplex } from 'stream'; import { WebSocketMultiplex } from "@exposr/ws-multiplex"; export const initStorageService = async () => { @@ -89,4 +91,88 @@ export const wsmPair = (socketPair: wsSocketPair, options?: Object): Array { + + const echoRequest = (request: http.IncomingMessage, response: http.ServerResponse) => { + let body: Array = []; + request.on('data', (chunk: Buffer) => { + body.push(chunk); + }).on('end', () => { + const buf = Buffer.concat(body).toString(); + response.statusCode = 200; + response.end(buf); + }); + }; + + const fileGenerator = (size: number, chunkSize: number, response: http.ServerResponse) => { + let sentBytes: number = 0; + + response.statusCode = 200; + response.setHeader("Content-Type", "application/octet-stream"); + response.setHeader('Content-Disposition', 'attachment; filename="file.bin"'); + response.setHeader("Content-Length", size); + + const writeChunk = () => { + if (sentBytes < size) { + const remainingBytes = size - sentBytes; + const chunkToSend = Math.min(chunkSize, remainingBytes); + + const buffer = Buffer.alloc(chunkToSend); + response.write(buffer); + + sentBytes += chunkToSend; + + setTimeout(writeChunk, 0); + } else { + response.end(); + } + } + + writeChunk(); + }; + + const wss = new WebSocketServer({ noServer: true }); + const handleUpgrade = (async (request: http.IncomingMessage, socket: Duplex, head: Buffer) => { + const parsedUrl = url.parse(request.url, true) + if (parsedUrl.pathname != '/ws') { + socket.write(`HTTP/${request.httpVersion} 404 Not found\r\n`); + socket.end(); + socket.destroy(); + } + + wss.handleUpgrade(request, socket, head, (ws) => { + ws.send("ws echo connected"); + ws.on('message', (data) => { + ws.send(data); + }); + }); + }); + + const handleRequest = (request: http.IncomingMessage, response: http.ServerResponse) => { + + const parsedUrl = url.parse(request.url, true) + + if (request.method == "GET" && parsedUrl.pathname == '/file') { + const size = Number(parsedUrl.query["size"] || "32"); + const chunkSize = Number(parsedUrl.query["chunk"] || "262144"); + return fileGenerator(size, chunkSize, response); + } else { + return echoRequest(request, response); + } + } + + const server = http.createServer(); + server.on('request', handleRequest); + server.on('upgrade', handleUpgrade); + + server.listen(port); + return { + destroy: () => { + server.removeAllListeners('request'); + server.removeAllListeners('upgrade'); + server.close(); + } + }; }; \ No newline at end of file