From c5778d50b8c3fb4ccf5c96b78951817ea0e15e8d Mon Sep 17 00:00:00 2001 From: damencho Date: Tue, 6 Aug 2024 11:00:38 +0300 Subject: [PATCH 1/6] feat: Bumps ice4j. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index e5f3fb0a..6948abfe 100644 --- a/pom.xml +++ b/pom.xml @@ -210,7 +210,7 @@ ${project.groupId} ice4j - 3.0-59-g71e244d + 3.0-72-g824cd4b org.opentelecoms.sip From 130ada86f11bb0550c8e663498b8e8f4e94a261a Mon Sep 17 00:00:00 2001 From: damencho Date: Tue, 6 Aug 2024 14:02:17 +0300 Subject: [PATCH 2/6] feat(visitors): Reads a header to request to be visitor. --- .../java/org/jitsi/jigasi/CallContext.java | 15 +++++++++++++++ .../java/org/jitsi/jigasi/JvbConference.java | 4 ++++ .../org/jitsi/jigasi/SipGatewaySession.java | 19 +++++++++++++++++++ 3 files changed, 38 insertions(+) diff --git a/src/main/java/org/jitsi/jigasi/CallContext.java b/src/main/java/org/jitsi/jigasi/CallContext.java index f2de0ae4..6788f873 100644 --- a/src/main/java/org/jitsi/jigasi/CallContext.java +++ b/src/main/java/org/jitsi/jigasi/CallContext.java @@ -123,6 +123,11 @@ public class CallContext */ private String authUserId; + /** + * Whether to request visitor when joining. + */ + private boolean requestVisitor = false; + /** * Optional bosh url that we use to join a room with the * xmpp account. @@ -619,4 +624,14 @@ public Map getExtraHeaders() { return Collections.unmodifiableMap(this.extraHeaders); } + + public boolean isRequestVisitor() + { + return requestVisitor; + } + + public void setRequestVisitor(boolean requestVisitor) + { + this.requestVisitor = requestVisitor; + } } diff --git a/src/main/java/org/jitsi/jigasi/JvbConference.java b/src/main/java/org/jitsi/jigasi/JvbConference.java index 085a2930..51a260cf 100644 --- a/src/main/java/org/jitsi/jigasi/JvbConference.java +++ b/src/main/java/org/jitsi/jigasi/JvbConference.java @@ -1955,6 +1955,10 @@ private String inviteFocus(final EntityBareJid roomIdentifier) if (JigasiBundleActivator.isSipVisitorsEnabled() && !this.isTranscriber) { focusInviteIQ.addProperty("visitors-version", "1"); + if (callContext.isRequestVisitor()) + { + focusInviteIQ.addProperty("visitor", Boolean.TRUE.toString()); + } } try diff --git a/src/main/java/org/jitsi/jigasi/SipGatewaySession.java b/src/main/java/org/jitsi/jigasi/SipGatewaySession.java index a8d7f87f..58744a9c 100644 --- a/src/main/java/org/jitsi/jigasi/SipGatewaySession.java +++ b/src/main/java/org/jitsi/jigasi/SipGatewaySession.java @@ -159,6 +159,21 @@ public class SipGatewaySession private static final String JITSI_MEET_DOMAIN_TENANT_HEADER_PROPERTY = "JITSI_MEET_DOMAIN_TENANT_HEADER_NAME"; + /** + * The name of the header to search in the INVITE headers whether to request joining as a visitor. + */ + private final String visitorHeaderName; + + /** + * Default value optional INVITE header which specifies whether to join as visitor. + */ + public static final String JITSI_MEET_VISITOR_HEADER_DEFAULT = "Jitsi-Visitor"; + + /** + * The account property to use to set custom header name for domain tenant. + */ + private static final String JITSI_MEET_VISITOR_HEADER_PROPERTY = "JITSI_MEET_VISITOR_HEADER_NAME"; + /** * The account property to use to set outbound prefix to be added to all outgoing calls. */ @@ -387,6 +402,9 @@ public SipGatewaySession(SipGateway gateway, CallContext callContext) JITSI_MEET_DOMAIN_TENANT_HEADER_PROPERTY, JITSI_MEET_DOMAIN_TENANT_HEADER_DEFAULT); + visitorHeaderName = sipProvider.getAccountID() + .getAccountPropertyString(JITSI_MEET_VISITOR_HEADER_PROPERTY, JITSI_MEET_VISITOR_HEADER_DEFAULT); + heartbeatPeriodInSec = sipProvider.getAccountID() .getAccountPropertyInt(HEARTBEAT_SECONDS_PROPERTY, heartbeatPeriodInSec); @@ -747,6 +765,7 @@ public void onJoinJitsiMeetRequest( callContext.setAuthUserId(data.get(authUserIdHeaderName)); callContext.setMucAddressPrefix(sipProvider.getAccountID() .getAccountPropertyString(CallContext.MUC_DOMAIN_PREFIX_PROP, null)); + callContext.setRequestVisitor(Boolean.parseBoolean(data.get(visitorHeaderName))); joinJvbConference(callContext); } From a0750e748b6f20a3f612162953ccf5551b98c1b6 Mon Sep 17 00:00:00 2001 From: damencho Date: Wed, 7 Aug 2024 11:59:55 +0300 Subject: [PATCH 3/6] feat: Moves generate jwt to a common util. --- .../transcription/WhisperWebsocket.java | 29 +------------- src/main/java/org/jitsi/jigasi/util/Util.java | 38 ++++++++++++++++++- 2 files changed, 39 insertions(+), 28 deletions(-) diff --git a/src/main/java/org/jitsi/jigasi/transcription/WhisperWebsocket.java b/src/main/java/org/jitsi/jigasi/transcription/WhisperWebsocket.java index 69a75801..6f6a5f7a 100644 --- a/src/main/java/org/jitsi/jigasi/transcription/WhisperWebsocket.java +++ b/src/main/java/org/jitsi/jigasi/transcription/WhisperWebsocket.java @@ -17,7 +17,6 @@ */ package org.jitsi.jigasi.transcription; -import io.jsonwebtoken.*; import org.eclipse.jetty.websocket.api.*; import org.eclipse.jetty.websocket.api.annotations.*; import org.eclipse.jetty.websocket.client.*; @@ -29,8 +28,6 @@ import java.io.*; import java.net.*; import java.nio.*; -import java.security.*; -import java.security.spec.*; import java.time.*; import java.util.*; import java.util.concurrent.*; @@ -137,29 +134,6 @@ public class WhisperWebsocket logger.info("Websocket transcription streaming endpoint: " + websocketUrlConfig); } - private String getJWT() throws NoSuchAlgorithmException, InvalidKeySpecException, IOException - { - if (privateKey.isEmpty() || privateKeyName.isEmpty()) - { - throw new IOException("Failed generating JWT for Whisper. Missing private key or key name."); - } - long nowMillis = System.currentTimeMillis(); - Date now = new Date(nowMillis); - KeyFactory kf = KeyFactory.getInstance("RSA"); - PKCS8EncodedKeySpec keySpecPKCS8 = new PKCS8EncodedKeySpec(Base64.getDecoder().decode(privateKey)); - PrivateKey finalPrivateKey = kf.generatePrivate(keySpecPKCS8); - JwtBuilder builder = Jwts.builder() - .setHeaderParam("kid", privateKeyName) - .setIssuedAt(now) - .setAudience(jwtAudience) - .setIssuer("jigasi") - .signWith(finalPrivateKey, SignatureAlgorithm.RS256); - long expires = nowMillis + (60 * 5 * 1000); - Date expiry = new Date(expires); - builder.setExpiration(expiry); - return builder.compact(); - } - /** * Creates a connection url by concatenating the websocket * url with the Connection Id; @@ -192,7 +166,8 @@ void connect() generateWebsocketUrl(); logger.info("Connecting to " + websocketUrl); ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); - upgradeRequest.setHeader("Authorization", "Bearer " + getJWT()); + upgradeRequest.setHeader("Authorization", "Bearer " + + org.jitsi.jigasi.util.Util.generateAsapToken(privateKey, privateKeyName, jwtAudience, "jigasi")); ws = new WebSocketClient(); ws.start(); wsSession = ws.connect(this, new URI(websocketUrl), upgradeRequest).get(); diff --git a/src/main/java/org/jitsi/jigasi/util/Util.java b/src/main/java/org/jitsi/jigasi/util/Util.java index f480569c..e3d1dbcd 100644 --- a/src/main/java/org/jitsi/jigasi/util/Util.java +++ b/src/main/java/org/jitsi/jigasi/util/Util.java @@ -17,9 +17,11 @@ */ package org.jitsi.jigasi.util; +import io.jsonwebtoken.*; import net.java.sip.communicator.impl.protocol.jabber.*; import net.java.sip.communicator.service.protocol.*; import net.java.sip.communicator.service.protocol.media.*; +import org.apache.commons.lang3.StringUtils; import org.jitsi.jigasi.*; import org.jitsi.service.neomedia.*; import org.jitsi.service.neomedia.format.*; @@ -32,7 +34,9 @@ import org.json.simple.*; import org.json.simple.parser.*; +import java.io.*; import java.lang.reflect.*; +import java.security.spec.*; import java.util.*; import java.math.*; @@ -150,7 +154,7 @@ public static String stringToMD5hash(String toHash) } catch (NoSuchAlgorithmException e) { - e.printStackTrace(); + Logger.getLogger(Util.class).error("Error creating hash", e); } return null; @@ -339,4 +343,36 @@ public static boolean isJibri(ChatRoomMemberJabberImpl member) { return checkForFeature(member, JIBRI_FEATURE_NAME); } + + /** + * Generates asap token. + * @return the generated token. + */ + public static String generateAsapToken( + String privateKey, String privateKeyId, String audience, String issuer) + throws NoSuchAlgorithmException, + InvalidKeySpecException, + IOException + { + if (StringUtils.isEmpty(privateKey) || StringUtils.isEmpty(privateKeyId)) + { + throw new IOException("Failed generating JWT for Whisper. Missing private key or key name."); + } + + long nowMillis = System.currentTimeMillis(); + Date now = new Date(nowMillis); + KeyFactory kf = KeyFactory.getInstance("RSA"); + PKCS8EncodedKeySpec keySpecPKCS8 = new PKCS8EncodedKeySpec(Base64.getDecoder().decode(privateKey)); + PrivateKey finalPrivateKey = kf.generatePrivate(keySpecPKCS8); + + JwtBuilder builder = Jwts.builder() + .setHeaderParam("kid", privateKeyId) + .setIssuedAt(now) + .setAudience(audience) + .setIssuer(issuer) + .signWith(finalPrivateKey, SignatureAlgorithm.RS256); + builder.setExpiration(new Date(nowMillis + (60 * 5 * 1000))); + + return builder.compact(); + } } From 0725d38429c1c52e0623aa0f7186131d80459b9d Mon Sep 17 00:00:00 2001 From: damencho Date: Wed, 7 Aug 2024 12:01:56 +0300 Subject: [PATCH 4/6] feat(visitors): Uses visitors queue service to wait for live conference. --- pom.xml | 23 ++ .../java/org/jitsi/jigasi/JvbConference.java | 46 ++++ .../jitsi/jigasi/visitor/WebsocketClient.java | 242 ++++++++++++++++++ 3 files changed, 311 insertions(+) create mode 100644 src/main/java/org/jitsi/jigasi/visitor/WebsocketClient.java diff --git a/pom.xml b/pom.xml index 6948abfe..95140f0c 100644 --- a/pom.xml +++ b/pom.xml @@ -24,6 +24,7 @@ 1.1-140-g8f45a9f 11.0.20 + 6.1.11 @@ -126,6 +127,28 @@ 4.1.0 + + + org.springframework + spring-websocket + ${springboot.version} + + + org.springframework + spring-messaging + ${springboot.version} + + + org.apache.tomcat.embed + tomcat-embed-websocket + 10.1.26 + + + org.eclipse + yasson + 3.0.4 + + ${project.groupId} diff --git a/src/main/java/org/jitsi/jigasi/JvbConference.java b/src/main/java/org/jitsi/jigasi/JvbConference.java index 51a260cf..ff87a37e 100644 --- a/src/main/java/org/jitsi/jigasi/JvbConference.java +++ b/src/main/java/org/jitsi/jigasi/JvbConference.java @@ -32,6 +32,7 @@ import org.jitsi.jigasi.stats.*; import org.jitsi.jigasi.util.*; import org.jitsi.jigasi.version.*; +import org.jitsi.jigasi.visitor.*; import org.jitsi.jigasi.xmpp.extensions.*; import org.jitsi.utils.*; import org.jitsi.utils.logging.Logger; @@ -142,6 +143,32 @@ public class JvbConference */ private static final int JVB_ACTIVITY_CHECK_DELAY = 5000; + /** + * The name of the property which enables visitors queue service. + */ + public static final String P_NAME_VISITORS_QUEUE_SERVICE = "org.jitsi.jigasi.VISITOR_QUEUE_SERVICE"; + + /** + * The visitors queue service url. + */ + private static String visitorsQueueServiceUrl = null; + static + { + visitorsQueueServiceUrl = JigasiBundleActivator.getConfigurationService() + .getString(P_NAME_VISITORS_QUEUE_SERVICE); + } + + /** + * The error code used to indicate that the meeting is not live. + * (number that is not clashing with OperationFailedException error codes) + */ + private static final int NOT_LIVE_ERROR_CODE = 101; + + /** + * The websocket client to connect to visitors queue if configured. + */ + private WebsocketClient websocketClient; + /** * A timer which will be used to schedule a quick non-blocking check whether there is any activity * on the bridge side of the call. @@ -614,6 +641,11 @@ public synchronized void stop() leaveConferenceRoom(); + if (this.websocketClient != null) + { + this.websocketClient.disconnect(); + } + if (jvbCall != null) { CallManager.hangupCall(jvbCall, true); @@ -1129,6 +1161,15 @@ public void joinConferenceRoom() } } } + else if (opex.getErrorCode() == NOT_LIVE_ERROR_CODE) + { + logger.info(this.callContext + " Conference is not live yet."); + + websocketClient = new WebsocketClient(this, visitorsQueueServiceUrl, this.callContext); + websocketClient.connect(); + + return; + } } if (e.getCause() instanceof XMPPException.XMPPErrorException) @@ -1941,6 +1982,7 @@ public void conferenceMemberRemoved(CallPeerConferenceEvent conferenceEvent) * @return Returns vnode if one exist in focus response. */ private String inviteFocus(final EntityBareJid roomIdentifier) + throws OperationFailedException { if (callContext == null || callContext.getRoomJidDomain() == null) { @@ -1983,6 +2025,10 @@ private String inviteFocus(final EntityBareJid roomIdentifier) collector = getConnection().createStanzaCollectorAndSend(focusInviteIQ); ConferenceIq res = collector.nextResultOrThrow(); + if (visitorsQueueServiceUrl != null && !Boolean.parseBoolean(res.getPropertiesMap().get("live"))) + { + throw new OperationFailedException("Not live conference", NOT_LIVE_ERROR_CODE); + } return res.getVnode(); } catch (SmackException diff --git a/src/main/java/org/jitsi/jigasi/visitor/WebsocketClient.java b/src/main/java/org/jitsi/jigasi/visitor/WebsocketClient.java new file mode 100644 index 00000000..c8bc8146 --- /dev/null +++ b/src/main/java/org/jitsi/jigasi/visitor/WebsocketClient.java @@ -0,0 +1,242 @@ +/* + * Jigasi, the JItsi GAteway to SIP. + * + * Copyright @ 2018 - present 8x8, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jitsi.jigasi.visitor; + +import org.apache.commons.io.*; +import org.bouncycastle.util.io.pem.*; +import org.jitsi.jigasi.*; +import org.jitsi.jigasi.util.*; +import org.jitsi.utils.logging.*; +import org.json.simple.*; +import org.json.simple.parser.*; +import org.springframework.http.*; +import org.springframework.lang.*; +import org.springframework.messaging.converter.*; +import org.springframework.messaging.simp.stomp.*; +import org.springframework.scheduling.concurrent.*; +import org.springframework.web.socket.*; +import org.springframework.web.socket.client.*; +import org.springframework.web.socket.client.standard.*; +import org.springframework.web.socket.messaging.*; + +import java.io.*; +import java.lang.reflect.*; +import java.nio.charset.*; +import java.util.*; + +/** + * The websocket client to connect to visitors queue. + */ +public class WebsocketClient +{ + /** + * The logger. + */ + private final static Logger logger = Logger.getLogger(WebsocketClient.class); + + /** + * The name of the property of the private key we use for jwt token to connect to visitors queue service. + */ + public static final String P_NAME_VISITORS_QUEUE_SERVICE_PRIVATE_KEY_PATH + = "org.jitsi.jigasi.VISITOR_QUEUE_SERVICE_PRIVATE_KEY_PATH"; + + /** + * The name of the property which holds the key id (kid) we use for jwt to connect to visitors queue service. + */ + public static final String P_NAME_VISITORS_QUEUE_SERVICE_PRIVATE_KEY_ID + = "org.jitsi.jigasi.VISITOR_QUEUE_SERVICE_PRIVATE_KEY_ID"; + + /** + * The private key to use for generating jwt token to access service. + */ + private static final String privateKeyFilePath; + private static final String privateKeyId; + static + { + privateKeyFilePath + = JigasiBundleActivator.getConfigurationService().getString(P_NAME_VISITORS_QUEUE_SERVICE_PRIVATE_KEY_PATH); + privateKeyId + = JigasiBundleActivator.getConfigurationService().getString(P_NAME_VISITORS_QUEUE_SERVICE_PRIVATE_KEY_ID); + } + + /** + * The thread pool for heartbeat of the sockets. + */ + private static final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); + static + { + taskScheduler.setPoolSize(1); + taskScheduler.setThreadNamePrefix("stomp-heartbeat-client-thread-"); + taskScheduler.initialize(); + } + + /** + * The call context used to for the current conference. + */ + private final CallContext callContext; + + /** + * The service URL to use to connect to for visitor queue. + */ + private final String serviceUrl; + + /** + * The parent conference. + */ + private final JvbConference conference; + + /** + * The stomp client that is active when connected. + */ + private WebSocketStompClient stompClient; + + /** + * A timer which will be used to schedule connection to conference after going live. + */ + private static final Timer connectTimer = new Timer(); + + public WebsocketClient(JvbConference conference, String serviceUrl, CallContext callContext) + { + this.conference = conference; + this.serviceUrl = serviceUrl; + this.callContext = callContext; + } + + /** + * Connects to the service. + */ + public void connect() + { + String token; + try ( + PemReader pemReader = new PemReader(new InputStreamReader(new FileInputStream(privateKeyFilePath))); + ) + { + PemObject pemObject = pemReader.readPemObject(); + token = Util.generateAsapToken( + Base64.getEncoder().encodeToString(pemObject.getContent()), + privateKeyId, + "jitsi", + "jitsi"); + } + catch (Exception e) + { + logger.error(this.callContext + " Error generating token", e); + return; + } + + WebSocketClient webSocketClient = new StandardWebSocketClient(); + + this.stompClient = new WebSocketStompClient(webSocketClient); + this.stompClient.setMessageConverter(new JsonbMessageConverter()); + this.stompClient.setTaskScheduler(taskScheduler); // for heartbeats + + StompHeaders connectHeaders = new StompHeaders(); + connectHeaders.add(HttpHeaders.AUTHORIZATION, "Bearer " + token); + // to avoid the connection being closed by the server, client sends every 100ms, server sends every 100ms + connectHeaders.add(StompHeaders.HEARTBEAT, "100,100"); + + this.stompClient.connectAsync(this.serviceUrl, new WebSocketHttpHeaders(), connectHeaders, + new StompSessionHandlerAdapter() + { + public void handleException(StompSession session, + @Nullable StompCommand command, + StompHeaders headers, + byte[] payload, + Throwable exception) + { + logger.error(WebsocketClient.this.callContext + " headers:" + headers + + " payload:" + new String(payload), exception); + } + + public void handleTransportError(StompSession session, Throwable exception) + { + logger.error(WebsocketClient.this.callContext + " Transport error.", exception); + } + + @Override + public void afterConnected(StompSession session, StompHeaders connectedHeaders) + { + session.subscribe( + "/secured/conference/visitor/topic." + WebsocketClient.this.callContext.getRoomJid().toString(), + new StompFrameHandler() + { + @Override + public Type getPayloadType(StompHeaders headers) + { + return Object.class; + } + + @Override + public void handleFrame(StompHeaders headers, Object payload) + { + if (payload instanceof byte[]) + { + try + { + Object o = new JSONParser().parse(new String((byte[])payload)); + + if (o instanceof JSONObject) + { + JSONObject obj = (JSONObject)o; + if (obj.get("status").equals("live")) + { + WebsocketClient.this.callContext.setRequestVisitor(true); + + Long delayMs = (Long)obj.get("randomDelayMs"); + // now let's connect as visitor after some random delay. + + disconnect(); + + connectTimer.schedule(new TimerTask() + { + @Override + public void run() + { + WebsocketClient.this.conference.joinConferenceRoom(); + } + }, (long)(Math.random() * delayMs)); + } + } + } + catch (Exception e) + { + logger.error(WebsocketClient.this.callContext + + " Error parsing payload:" + new String((byte[])payload), e); + } + } + else + { + logger.warn(WebsocketClient.this.callContext + " Wrong payload type: " + payload); + } + } + }); + } + }); + + + } + + public void disconnect() + { + if (this.stompClient != null) + { + this.stompClient.stop(); + } + } +} From 686ae485c73b96805afe4e7df71b457c99a87531 Mon Sep 17 00:00:00 2001 From: damencho Date: Tue, 20 Aug 2024 13:53:09 -0500 Subject: [PATCH 5/6] squash: Drops springboot dependency and implements minimal STOMP client. --- pom.xml | 23 - .../java/org/jitsi/jigasi/JvbConference.java | 8 +- .../org/jitsi/jigasi/visitor/StompUtils.java | 75 ++++ .../jitsi/jigasi/visitor/WebsocketClient.java | 396 +++++++++++++----- 4 files changed, 372 insertions(+), 130 deletions(-) create mode 100644 src/main/java/org/jitsi/jigasi/visitor/StompUtils.java diff --git a/pom.xml b/pom.xml index 95140f0c..6948abfe 100644 --- a/pom.xml +++ b/pom.xml @@ -24,7 +24,6 @@ 1.1-140-g8f45a9f 11.0.20 - 6.1.11 @@ -127,28 +126,6 @@ 4.1.0 - - - org.springframework - spring-websocket - ${springboot.version} - - - org.springframework - spring-messaging - ${springboot.version} - - - org.apache.tomcat.embed - tomcat-embed-websocket - 10.1.26 - - - org.eclipse - yasson - 3.0.4 - - ${project.groupId} diff --git a/src/main/java/org/jitsi/jigasi/JvbConference.java b/src/main/java/org/jitsi/jigasi/JvbConference.java index ff87a37e..97788451 100644 --- a/src/main/java/org/jitsi/jigasi/JvbConference.java +++ b/src/main/java/org/jitsi/jigasi/JvbConference.java @@ -2025,9 +2025,13 @@ private String inviteFocus(final EntityBareJid roomIdentifier) collector = getConnection().createStanzaCollectorAndSend(focusInviteIQ); ConferenceIq res = collector.nextResultOrThrow(); - if (visitorsQueueServiceUrl != null && !Boolean.parseBoolean(res.getPropertiesMap().get("live"))) + if (visitorsQueueServiceUrl != null) { - throw new OperationFailedException("Not live conference", NOT_LIVE_ERROR_CODE); + String liveValue = res.getPropertiesMap().get("live"); + if (liveValue != null && !Boolean.parseBoolean(liveValue)) + { + throw new OperationFailedException("Not live conference", NOT_LIVE_ERROR_CODE); + } } return res.getVnode(); } diff --git a/src/main/java/org/jitsi/jigasi/visitor/StompUtils.java b/src/main/java/org/jitsi/jigasi/visitor/StompUtils.java new file mode 100644 index 00000000..57720d3e --- /dev/null +++ b/src/main/java/org/jitsi/jigasi/visitor/StompUtils.java @@ -0,0 +1,75 @@ +/* + * Jigasi, the JItsi GAteway to SIP. + * + * Copyright @ 2018 - present 8x8, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jitsi.jigasi.visitor; + +import java.nio.*; + +/** + * The utils for sending/receiving STOMP messages. + */ +public class StompUtils +{ + final static String NEW_LINE = "\n"; + final static String END = "\u0000"; + final static String EMPTY_LINE = ""; + final static String DELIMITER = ":"; + final static ByteBuffer PING_BODY = ByteBuffer.wrap(new byte[] {'\n'}); + + private static String buildHeader(String key, String value) + { + if (value != null) + { + return key + ':' + value + NEW_LINE; + } + else + { + return key + NEW_LINE; + } + } + + /** + * Builds the connect message to send. + * @param token The token to authenticate. + * @param heartbeatOutgoing The ms to send for outgoing heartbeat interval. + * @param heartbeatIncoming The ms to send for incoming heartbeat interval. + * @return The message. + */ + static String buildConnectMessage(String token, long heartbeatOutgoing, long heartbeatIncoming) + { + String headers = buildHeader("CONNECT", null); + headers += buildHeader("accept-version", "1.2,1.1,1.0"); + headers += buildHeader("heart-beat", heartbeatOutgoing + "," + heartbeatIncoming); + headers += buildHeader("Authorization", "Bearer " + token); + + return headers + NEW_LINE + END; + } + + /** + * Builds a subscribe message. + * @param topic The topic to subscribe to. + * @return The message. + */ + static String buildSubscribeMessage(String topic) + { + String headers = buildHeader("SUBSCRIBE", null); + headers += buildHeader("destination", topic); + headers += buildHeader("id", "1"); // this is the first and only message we send + + return headers + NEW_LINE + END; + } +} diff --git a/src/main/java/org/jitsi/jigasi/visitor/WebsocketClient.java b/src/main/java/org/jitsi/jigasi/visitor/WebsocketClient.java index c8bc8146..36572763 100644 --- a/src/main/java/org/jitsi/jigasi/visitor/WebsocketClient.java +++ b/src/main/java/org/jitsi/jigasi/visitor/WebsocketClient.java @@ -17,32 +17,30 @@ */ package org.jitsi.jigasi.visitor; -import org.apache.commons.io.*; import org.bouncycastle.util.io.pem.*; +import org.eclipse.jetty.websocket.client.*; +import org.eclipse.jetty.websocket.api.*; import org.jitsi.jigasi.*; import org.jitsi.jigasi.util.*; +import org.jitsi.utils.concurrent.*; import org.jitsi.utils.logging.*; import org.json.simple.*; import org.json.simple.parser.*; -import org.springframework.http.*; -import org.springframework.lang.*; -import org.springframework.messaging.converter.*; -import org.springframework.messaging.simp.stomp.*; -import org.springframework.scheduling.concurrent.*; -import org.springframework.web.socket.*; -import org.springframework.web.socket.client.*; -import org.springframework.web.socket.client.standard.*; -import org.springframework.web.socket.messaging.*; import java.io.*; -import java.lang.reflect.*; -import java.nio.charset.*; +import java.net.*; +import java.nio.channels.*; import java.util.*; +import java.util.concurrent.*; + +import static org.jitsi.jigasi.visitor.StompUtils.*; /** * The websocket client to connect to visitors queue. + * It implements the needed parts to use STOMP (https://stomp.github.io/stomp-specification-1.2.html). */ public class WebsocketClient + implements WebSocketListener { /** * The logger. @@ -77,13 +75,8 @@ public class WebsocketClient /** * The thread pool for heartbeat of the sockets. */ - private static final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); - static - { - taskScheduler.setPoolSize(1); - taskScheduler.setThreadNamePrefix("stomp-heartbeat-client-thread-"); - taskScheduler.initialize(); - } + private static final ScheduledExecutorService heartbeatThreadPool = Executors.newScheduledThreadPool( + 1, new CustomizableThreadFactory("stomp-heartbeat-client-thread-", true)); /** * The call context used to for the current conference. @@ -101,9 +94,34 @@ public class WebsocketClient private final JvbConference conference; /** - * The stomp client that is active when connected. + * The websocket session used to communicate with the service. + */ + private Session websocketSession; + + /** + * The last time we saw a message from the server. It should be sending us pings every 15 seconds or so. + */ + private long lastServerActivity; + + /** + * The outgoing heartbeat sent to server interval in ms. + */ + private long heartbeatOutgoing = 15000; + + /** + * The incoming heartbeat sent to server interval in ms. + */ + private long heartbeatIncoming = 15000; + + /** + * The tasks for sending heartbeats, used to cancel the pinger. + */ + private ScheduledFuture pinger; + + /** + * The tasks for checking of received heartbeats, used to cancel the ponger. */ - private WebSocketStompClient stompClient; + private ScheduledFuture ponger; /** * A timer which will be used to schedule connection to conference after going live. @@ -121,122 +139,290 @@ public WebsocketClient(JvbConference conference, String serviceUrl, CallContext * Connects to the service. */ public void connect() + { + try + { + WebSocketClient client = new WebSocketClient(); + client.start(); + + Future fut = client.connect(this, new URI(this.serviceUrl)); + + fut.get(5, TimeUnit.SECONDS); + } + catch (Exception e) + { + logger.error(this.callContext + " Error starting websocket client", e); + } + } + + public void disconnect() + { + if (this.websocketSession != null) + { + this.websocketSession.close(); + } + + if (this.pinger != null) + { + this.pinger.cancel(false); + } + if (this.ponger != null) + { + this.ponger.cancel(false); + } + } + + @Override + public void onWebSocketClose(int statusCode, String reason) + { + // local close + if (statusCode == 1006) + { + return; + } + + logger.error(this.callContext + " Visitors queue websocket closed: " + statusCode + " " + reason); + } + + @Override + public void onWebSocketConnect(Session session) + { + this.websocketSession = session; + + sendConnect(); + } + + @Override + public void onWebSocketError(Throwable cause) + { + // local close + if (cause instanceof ClosedChannelException) + { + return; + } + + logger.error(this.callContext + " Visitors queue websocket error: " + cause); + + reconnect(); + } + + /** + * Sends Stomp connect message over the websocket. Includes the jwt to authorize us. + */ + private void sendConnect() { String token; try ( - PemReader pemReader = new PemReader(new InputStreamReader(new FileInputStream(privateKeyFilePath))); + PemReader pemReader = new PemReader(new InputStreamReader(new FileInputStream(privateKeyFilePath))) ) { PemObject pemObject = pemReader.readPemObject(); - token = Util.generateAsapToken( - Base64.getEncoder().encodeToString(pemObject.getContent()), - privateKeyId, - "jitsi", - "jitsi"); + token = Util.generateAsapToken(Base64.getEncoder().encodeToString(pemObject.getContent()), + privateKeyId, "jitsi", "jitsi"); } catch (Exception e) { logger.error(this.callContext + " Error generating token", e); + this.disconnect(); + return; } - WebSocketClient webSocketClient = new StandardWebSocketClient(); + this.websocketSession.getRemote().sendString( + buildConnectMessage(token, this.heartbeatOutgoing, this.heartbeatIncoming), WriteCallback.NOOP); + } + + /** + * Receives data over the websocket. + * @param message the message that is received. + */ + @Override + public void onWebSocketText(String message) + { + lastServerActivity = System.currentTimeMillis(); - this.stompClient = new WebSocketStompClient(webSocketClient); - this.stompClient.setMessageConverter(new JsonbMessageConverter()); - this.stompClient.setTaskScheduler(taskScheduler); // for heartbeats + String[] splitMessage = message.split(NEW_LINE); - StompHeaders connectHeaders = new StompHeaders(); - connectHeaders.add(HttpHeaders.AUTHORIZATION, "Bearer " + token); - // to avoid the connection being closed by the server, client sends every 100ms, server sends every 100ms - connectHeaders.add(StompHeaders.HEARTBEAT, "100,100"); + if (splitMessage.length == 0) + { + // this is a ping from server. + return; + } + + String command = splitMessage[0]; - this.stompClient.connectAsync(this.serviceUrl, new WebSocketHttpHeaders(), connectHeaders, - new StompSessionHandlerAdapter() + int cursor = 1; + for (int i = cursor; i < splitMessage.length; i++) + { + // empty line + if (splitMessage[i].equals(EMPTY_LINE)) + { + // this is where the body starts + cursor = i; + break; + } + else { - public void handleException(StompSession session, - @Nullable StompCommand command, - StompHeaders headers, - byte[] payload, - Throwable exception) + String[] header = splitMessage[i].split(DELIMITER); + + if (header[0].equals("heart-beat")) { - logger.error(WebsocketClient.this.callContext + " headers:" + headers - + " payload:" + new String(payload), exception); + processHeartbeat(header[1]); } + } + } + + StringBuilder bodyBuffer = new StringBuilder(); + + for (int i = cursor; i < splitMessage.length; i++) + { + bodyBuffer.append(splitMessage[i]); + } + + handleCommand(command, bodyBuffer.toString()); + } + + /** + * Process the header about heartbeat coming from server on CONNECTED command, gets the max as per spec + * between local default values and remote settings. + * @param value the header value. + */ + private void processHeartbeat(String value) + { + String[] splitMessage = value.trim().split(","); + + if (splitMessage.length > 2) + { + return; + } + + long sx = Long.parseLong(splitMessage[0]); + long sy = Long.parseLong(splitMessage[1]); + + if (sy == 0) + { + this.heartbeatOutgoing = 0; + } + else + { + this.heartbeatOutgoing = Math.max(this.heartbeatOutgoing, sy); + } + + if (sx == 0) + { + this.heartbeatIncoming = 0; + } + else + { + this.heartbeatIncoming = Math.max(this.heartbeatIncoming, sx); + } + } - public void handleTransportError(StompSession session, Throwable exception) + /** + * Starts the executions of sending pings and checking for received pings from server. + */ + private void setupHeartbeat() + { + if (this.heartbeatOutgoing > 0) + { + this.pinger = heartbeatThreadPool.scheduleAtFixedRate(() -> { + if (!this.websocketSession.isOpen()) { - logger.error(WebsocketClient.this.callContext + " Transport error.", exception); + return; } - @Override - public void afterConnected(StompSession session, StompHeaders connectedHeaders) + try { - session.subscribe( - "/secured/conference/visitor/topic." + WebsocketClient.this.callContext.getRoomJid().toString(), - new StompFrameHandler() - { - @Override - public Type getPayloadType(StompHeaders headers) - { - return Object.class; - } + this.websocketSession.getRemote().sendBytes(PING_BODY); + } + catch (IOException e) + { + logger.error(this.callContext + " Error pinging websocket", e); + } + }, this.heartbeatOutgoing, this.heartbeatOutgoing, TimeUnit.MILLISECONDS); + + } + + if (this.heartbeatIncoming > 0) + { + this.ponger = heartbeatThreadPool.scheduleAtFixedRate(() -> { + // wait twice the interval to be tolerant of timing inaccuracies + if (System.currentTimeMillis() - lastServerActivity > this.heartbeatIncoming * 2) + { + logger.error(this.callContext + " Visitors queue websocket heartbeat incoming time out"); + + reconnect(); + } + }, this.heartbeatIncoming, this.heartbeatIncoming, TimeUnit.MILLISECONDS); + } + } + + private void handleCommand(String command, String body) + { + if (command.equals("CONNECTED")) + { + setupHeartbeat(); + + this.websocketSession.getRemote().sendString( + buildSubscribeMessage("/secured/conference/visitor/topic." + + this.callContext.getRoomJid().toString()), WriteCallback.NOOP); + } + else if (command.equals("MESSAGE")) + { + try + { + Object o = new JSONParser().parse(body.replace(END, "")); + + if (o instanceof JSONObject) + { + JSONObject obj = (JSONObject)o; + if (obj.get("status").equals("live")) + { + logger.info(this.callContext + " Conference is live now."); + + WebsocketClient.this.callContext.setRequestVisitor(true); + + Long delayMs = (Long)obj.get("randomDelayMs"); + // now let's connect as visitor after some random delay. + disconnect(); + + connectTimer.schedule(new TimerTask() + { @Override - public void handleFrame(StompHeaders headers, Object payload) + public void run() { - if (payload instanceof byte[]) - { - try - { - Object o = new JSONParser().parse(new String((byte[])payload)); - - if (o instanceof JSONObject) - { - JSONObject obj = (JSONObject)o; - if (obj.get("status").equals("live")) - { - WebsocketClient.this.callContext.setRequestVisitor(true); - - Long delayMs = (Long)obj.get("randomDelayMs"); - // now let's connect as visitor after some random delay. - - disconnect(); - - connectTimer.schedule(new TimerTask() - { - @Override - public void run() - { - WebsocketClient.this.conference.joinConferenceRoom(); - } - }, (long)(Math.random() * delayMs)); - } - } - } - catch (Exception e) - { - logger.error(WebsocketClient.this.callContext - + " Error parsing payload:" + new String((byte[])payload), e); - } - } - else - { - logger.warn(WebsocketClient.this.callContext + " Wrong payload type: " + payload); - } + WebsocketClient.this.conference.joinConferenceRoom(); } - }); + }, (long)(Math.random() * delayMs)); + } } - }); - - + } + catch (Exception e) + { + logger.error(this.callContext + " Error parsing payload:" + body, e); + } + } + else + { + logger.warn(this.callContext + " Unknown command: " + command); + } } - public void disconnect() + private void reconnect() { - if (this.stompClient != null) + this.disconnect(); + long delay = (long)(Math.random() * 5000); + + logger.info(this.callContext + " Reconnecting visitors queue in " + delay + " ms."); + + connectTimer.schedule(new TimerTask() { - this.stompClient.stop(); - } + @Override + public void run() + { + connect(); + } + }, delay); // let's reconnect randomly in the next 5 seconds } } From ad3a8a2e76c397222c773a0e2adc7bbaacc5c3c5 Mon Sep 17 00:00:00 2001 From: damencho Date: Tue, 20 Aug 2024 18:09:03 -0500 Subject: [PATCH 6/6] squash: Fix comment. --- src/main/java/org/jitsi/jigasi/util/Util.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/jitsi/jigasi/util/Util.java b/src/main/java/org/jitsi/jigasi/util/Util.java index e3d1dbcd..ee7aa103 100644 --- a/src/main/java/org/jitsi/jigasi/util/Util.java +++ b/src/main/java/org/jitsi/jigasi/util/Util.java @@ -356,7 +356,7 @@ public static String generateAsapToken( { if (StringUtils.isEmpty(privateKey) || StringUtils.isEmpty(privateKeyId)) { - throw new IOException("Failed generating JWT for Whisper. Missing private key or key name."); + throw new IOException("Failed generating JWT. Missing private key or key name."); } long nowMillis = System.currentTimeMillis();