Skip to content

Commit

Permalink
feat: Offload xmpp/smack processing threads. (#500)
Browse files Browse the repository at this point in the history
* feat: Offload xmpp/smack processing threads.

Make sure we do not send and/or wait for responses in the smack threads.

* squash: Fixes possible NPE.

* squash: Updates the NPE check for JvbConference.

* squash: Updates code to use PacketQueue from jitsi-utils.
  • Loading branch information
damencho authored Sep 27, 2023
1 parent 7ae8def commit b9e9572
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 27 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>jitsi-utils</artifactId>
<version>1.0-114-g43815ed</version>
<version>1.0-126-g02b0c86</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
Expand Down
129 changes: 107 additions & 22 deletions src/main/java/org/jitsi/jigasi/JvbConference.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.jitsi.jigasi.version.*;
import org.jitsi.utils.*;
import org.jitsi.utils.logging.Logger;
import org.jitsi.utils.queue.*;
import org.jitsi.xmpp.extensions.*;
import org.jitsi.xmpp.extensions.colibri.*;
import org.jitsi.xmpp.extensions.jitsimeet.*;
Expand All @@ -48,6 +49,7 @@
import org.jivesoftware.smackx.nick.packet.*;
import org.jivesoftware.smackx.xdata.packet.*;
import org.jivesoftware.smackx.xdata.*;
import org.json.simple.*;
import org.jxmpp.jid.*;
import org.jxmpp.jid.impl.*;
import org.jxmpp.jid.parts.*;
Expand All @@ -57,6 +59,7 @@
import java.beans.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;

import static net.java.sip.communicator.service.protocol.event.LocalUserChatRoomPresenceChangeEvent.*;
import static org.jivesoftware.smack.packet.StanzaError.Condition.*;
Expand All @@ -77,7 +80,8 @@ public class JvbConference
ChatRoomMemberPresenceListener,
LocalUserChatRoomPresenceListener,
CallPeerConferenceListener,
PropertyChangeListener
PropertyChangeListener,
OperationSetJitsiMeetTools.JitsiMeetRequestListener
{
/**
* The logger.
Expand Down Expand Up @@ -150,6 +154,32 @@ public class JvbConference
*/
private String meetingId;

/**
* A queue used to offload xmpp execution in a new thread to avoid blocking xmpp threads,
* by executing the tasks in new thread
*/
public static final PacketQueue<Runnable> xmppInvokeQueue = new PacketQueue<>(
Integer.MAX_VALUE,
false,
"xmpp-invoke-queue",
r -> {
// do process and try
try
{
r.run();

return true;
}
catch (Exception e)
{
logger.error("Error processing xmpp queue item", e);

return false;
}
},
Util.createNewThreadPool("xmpp-executor-pool")
);

/**
* Adds the features supported by jigasi to a specific
* <tt>OperationSetJitsiMeetTools</tt> instance.
Expand Down Expand Up @@ -603,7 +633,7 @@ private synchronized void setXmppProvider(

if (xmppProvider.isRegistered())
{
joinConferenceRoom();
xmppInvokeQueue.add(this::joinConferenceRoom);
}
else
{
Expand All @@ -617,8 +647,12 @@ public ProtocolProviderService getXmppProvider()
}

@Override
public synchronized void registrationStateChanged(
RegistrationStateChangeEvent evt)
public void registrationStateChanged(RegistrationStateChangeEvent evt)
{
xmppInvokeQueue.add(() -> registrationStateChangedInternal(evt));
}

private synchronized void registrationStateChangedInternal(RegistrationStateChangeEvent evt)
{
if (started
&& mucRoom == null
Expand Down Expand Up @@ -707,20 +741,26 @@ public boolean isStarted()
return started;
}

/**
* When calling this method, make sure it is not executed in any of the Smack threads.
*/
public void joinConferenceRoom()
{
OperationSetMultiUserChat muc = xmppProvider.getOperationSet(OperationSetMultiUserChat.class);
muc.addPresenceListener(this);

OperationSetIncomingDTMF opSet = this.xmppProvider.getOperationSet(OperationSetIncomingDTMF.class);
if (opSet != null)
{
// this executes only sip provider logic
opSet.addDTMFListener(gatewaySession);
}

this.jitsiMeetTools = xmppProvider.getOperationSet(OperationSetJitsiMeetToolsJabber.class);

if (this.jitsiMeetTools != null)
{
this.jitsiMeetTools.addRequestListener(this.gatewaySession);
this.jitsiMeetTools.addRequestListener(this);
}

Localpart lobbyLocalpart = null;
Expand Down Expand Up @@ -784,7 +824,7 @@ public void joinConferenceRoom()

initiator.addChildExtension(he);
});
if (initiator.getChildExtensions().size() > 0)
if (!initiator.getChildExtensions().isEmpty())
{
((ChatRoomJabberImpl)mucRoom).addPresencePacketExtensions(initiator);
}
Expand Down Expand Up @@ -886,7 +926,7 @@ public void joinConferenceRoom()

if (this.jitsiMeetTools != null)
{
this.jitsiMeetTools.removeRequestListener(this.gatewaySession);
this.jitsiMeetTools.removeRequestListener(this);
}

DataObject dataObject = opex.getDataObject();
Expand Down Expand Up @@ -1081,6 +1121,11 @@ public void serviceChanged(ServiceEvent serviceEvent)

@Override
public void memberPresenceChanged(ChatRoomMemberPresenceChangeEvent evt)
{
xmppInvokeQueue.add(() -> memberPresenceChangedInternal(evt));
}

private void memberPresenceChangedInternal(ChatRoomMemberPresenceChangeEvent evt)
{
if (logger.isTraceEnabled())
{
Expand Down Expand Up @@ -1234,6 +1279,11 @@ private void processChatRoomMemberLeft(ChatRoomMember member)
*/
@Override
public void localUserPresenceChanged(LocalUserChatRoomPresenceChangeEvent evt)
{
xmppInvokeQueue.add(() -> localUserPresenceChangedLocal(evt));
}

private void localUserPresenceChangedLocal(LocalUserChatRoomPresenceChangeEvent evt)
{
try
{
Expand Down Expand Up @@ -1308,21 +1358,47 @@ public String getMeetingUrl()
@Override
public void propertyChange(PropertyChangeEvent evt)
{
if (evt.getPropertyName().equals(CallPeerJabberImpl.TRANSPORT_REPLACE_PROPERTY_NAME))
xmppInvokeQueue.add(() ->
{
Statistics.incrementTotalCallsWithJvbMigrate();
if (evt.getPropertyName().equals(CallPeerJabberImpl.TRANSPORT_REPLACE_PROPERTY_NAME))
{
Statistics.incrementTotalCallsWithJvbMigrate();

leaveConferenceRoom();
leaveConferenceRoom();

joinConferenceRoom();
}
joinConferenceRoom();
}
});
}

@Override
public void onJoinJitsiMeetRequest(Call call, String room, Map<String, String> data)
{
xmppInvokeQueue.add(() -> this.gatewaySession.onJoinJitsiMeetRequest(call, room, data));
}

@Override
public void onSessionStartMuted(boolean[] startMutedFlags)
{
xmppInvokeQueue.add(() -> this.gatewaySession.onSessionStartMuted(startMutedFlags));
}

@Override
public void onJSONReceived(CallPeer callPeer, JSONObject jsonObject, Map<String, Object> parameters)
{
xmppInvokeQueue.add(() -> this.gatewaySession.onJSONReceived(callPeer, jsonObject, parameters));
}

private class JvbCallListener
implements CallListener
{
@Override
public void incomingCallReceived(CallEvent event)
{
xmppInvokeQueue.add(() -> incomingCallReceivedInternal(event));
}

private void incomingCallReceivedInternal(CallEvent event)
{
CallPeer peer = event.getSourceCall().getCallPeers().next();
String peerAddress;
Expand Down Expand Up @@ -1424,7 +1500,12 @@ private class JvbCallChangeListener
extends CallChangeAdapter
{
@Override
public synchronized void callStateChanged(CallChangeEvent evt)
public void callStateChanged(CallChangeEvent evt)
{
xmppInvokeQueue.add(() -> callStateChangedInternal(evt));
}

private synchronized void callStateChangedInternal(CallChangeEvent evt)
{
if (jvbCall != evt.getSourceCall())
{
Expand Down Expand Up @@ -1749,6 +1830,7 @@ public XMPPConnection getConnection()
* Called whenever password is known. In case of lobby, while waiting in the lobby, the user can enter the password
* and that can be signalled through SIP Info messages, and we can leve the lobby and enter the room with the
* password received, if the password is wrong we will fail joining and the call will be dropped.
* This is executed in sip thread.
*
* @param pwd <tt>String</tt> room password.
*/
Expand Down Expand Up @@ -1992,17 +2074,20 @@ private class RoomConfigurationChangeListener
@Override
public void processStanza(Stanza stanza)
{
MUCUser mucUser = stanza.getExtension(MUCUser.class);

if (mucUser == null)
xmppInvokeQueue.add(() ->
{
return;
}
MUCUser mucUser = stanza.getExtension(MUCUser.class);

if (mucUser.getStatus().contains(MUCUser.Status.create(104)))
{
updateFromRoomConfiguration();
}
if (mucUser == null)
{
return;
}

if (mucUser.getStatus().contains(MUCUser.Status.create(104)))
{
updateFromRoomConfiguration();
}
});
}
}

Expand Down
14 changes: 12 additions & 2 deletions src/main/java/org/jitsi/jigasi/lobby/Lobby.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,15 @@ protected void leaveRoom()
* Used to get <tt>ChatRoomInvitationListener</tt> events. After participant is allowed to join this method will
* be called.
*
* @param chatRoomInvitationReceivedEvent <tt>ChatRoomInvitationReceivedEvent</tt> contains invitation info.
* @param evt <tt>ChatRoomInvitationReceivedEvent</tt> contains invitation info.
*/
@Override
public void invitationReceived(ChatRoomInvitationReceivedEvent chatRoomInvitationReceivedEvent)
public void invitationReceived(ChatRoomInvitationReceivedEvent evt)
{
JvbConference.xmppInvokeQueue.add(() -> invitationReceivedInternal(evt));
}

private void invitationReceivedInternal(ChatRoomInvitationReceivedEvent chatRoomInvitationReceivedEvent)
{
try
{
Expand Down Expand Up @@ -249,6 +254,11 @@ private void notifyAccessGranted()
*/
@Override
public void localUserPresenceChanged(LocalUserChatRoomPresenceChangeEvent evt)
{
JvbConference.xmppInvokeQueue.add(() -> localUserPresenceChangedInternal(evt));
}

private void localUserPresenceChangedInternal(LocalUserChatRoomPresenceChangeEvent evt)
{
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,9 +623,10 @@ public void notifyChatRoomMemberJoined(ChatRoomMember member)
*/
public void notifyChatRoomMemberLeft(ChatRoomMember member)
{
JvbConference jvbConference = gatewaySession.getJvbConference();

// if this is the sip hanging up (stopping) skip playing
if (gatewaySession.getJvbConference().isStarted()
&& gatewaySession.getSipCall() != null)
if (jvbConference != null && jvbConference.isStarted() && gatewaySession.getSipCall() != null)
{
playParticipantLeftNotification();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ private void initializeNewProvider(ProtocolProviderService pps)
*/
@Override
public void registrationStateChanged(RegistrationStateChangeEvent evt)
{
threadPool.execute(() -> registrationStateChangedInternal(evt));
}

private void registrationStateChangedInternal(RegistrationStateChangeEvent evt)
{
ProtocolProviderService provider = evt.getProvider();

Expand Down

0 comments on commit b9e9572

Please sign in to comment.