diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java index dc90fb1d..5f89f474 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java @@ -179,7 +179,7 @@ public class SessionEventsHandler { } public void onParticipantLeft(Participant participant, String sessionId, Set remainingParticipants, - Integer transactionId, OpenViduException error, EndReason reason) { + Integer transactionId, OpenViduException error, EndReason reason, boolean scheduleWebsocketClose) { if (error != null) { rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error); return; @@ -205,6 +205,13 @@ public class SessionEventsHandler { rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, new JsonObject()); } + if (scheduleWebsocketClose) { + // Schedule the close up of this WebSocket connection. This is only as an extra + // guarantee: the client-side should always close it after receiving the + // response to "leaveRoom" method + this.rpcNotificationService.scheduleCloseRpcSession(participant.getParticipantPrivateId(), 10000); + } + if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(participant.getParticipantPublicId())) { CDR.recordParticipantLeft(participant, sessionId, reason); } @@ -483,6 +490,11 @@ public class SessionEventsHandler { ProtocolElements.PARTICIPANTEVICTED_METHOD, params); } } + + // Schedule the close up of this WebSocket connection. This is only as an extra + // guarantee: the client-side should always close it after receiving the + // participantEvicted notification + this.rpcNotificationService.scheduleCloseRpcSession(evictedParticipant.getParticipantPrivateId(), 10000); } public void sendRecordingStartedNotification(Session session, Recording recording) { @@ -624,14 +636,6 @@ public class SessionEventsHandler { rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, new JsonObject()); } - public void closeRpcSession(String participantPrivateId) { - this.rpcNotificationService.closeRpcSession(participantPrivateId); - } - - public void storeRecordingToSendClientEvent(Recording recording) { - recordingsToSendClientEvents.put(recording.getSessionId(), recording); - } - /** * This handler must be called before cleaning any sessions or recordings hosted * by the crashed Media Node @@ -643,6 +647,10 @@ public class SessionEventsHandler { public void onMasterNodeCrashed() { } + public void storeRecordingToSendClientEvent(Recording recording) { + recordingsToSendClientEvents.put(recording.getSessionId(), recording); + } + protected Set filterParticipantsByRole(OpenViduRole[] roles, Set participants) { return participants.stream().filter(part -> { if (ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(part.getParticipantPublicId())) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java index 2a854b46..0c9d23d7 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java @@ -106,7 +106,7 @@ public abstract class SessionManager { public abstract void joinRoom(Participant participant, String sessionId, Integer transactionId); public abstract boolean leaveRoom(Participant participant, Integer transactionId, EndReason reason, - boolean closeWebSocket); + boolean scheduleWebocketClose); public abstract void publishVideo(Participant participant, MediaOptions mediaOptions, Integer transactionId); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index 32dee158..918e85c7 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -192,7 +192,8 @@ public class KurentoSessionManager extends SessionManager { } @Override - public boolean leaveRoom(Participant participant, Integer transactionId, EndReason reason, boolean closeWebSocket) { + public boolean leaveRoom(Participant participant, Integer transactionId, EndReason reason, + boolean scheduleWebsocketClose) { log.info("Request [LEAVE_ROOM] for participant {} of session {} with reason {}", participant.getParticipantPublicId(), participant.getSessionId(), reason != null ? reason.name() : "NULL"); @@ -252,7 +253,7 @@ public class KurentoSessionManager extends SessionManager { remainingParticipants = Collections.emptySet(); } sessionEventsHandler.onParticipantLeft(participant, sessionId, remainingParticipants, transactionId, - null, reason); + null, reason, scheduleWebsocketClose); if (!EndReason.sessionClosedByServer.equals(reason)) { // If session is closed by a call to "DELETE /api/sessions" do NOT stop the @@ -316,11 +317,6 @@ public class KurentoSessionManager extends SessionManager { } } - // Finally close websocket session if required - if (closeWebSocket) { - sessionEventsHandler.closeRpcSession(participant.getParticipantPrivateId()); - } - return sessionClosedByLastParticipant; } finally { @@ -789,20 +785,20 @@ public class KurentoSessionManager extends SessionManager { boolean sessionClosedByLastParticipant = false; if (evictedParticipant != null) { + KurentoParticipant kParticipant = (KurentoParticipant) evictedParticipant; Set participants = kParticipant.getSession().getParticipants(); sessionClosedByLastParticipant = this.leaveRoom(kParticipant, null, reason, false); - this.sessionEventsHandler.onForceDisconnect(moderator, evictedParticipant, participants, transactionId, - null, reason); - sessionEventsHandler.closeRpcSession(evictedParticipant.getParticipantPrivateId()); - } else { - if (moderator != null && transactionId != null) { - this.sessionEventsHandler.onForceDisconnect(moderator, evictedParticipant, - new HashSet<>(Arrays.asList(moderator)), transactionId, - new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE, - "Connection not found when calling 'forceDisconnect'"), - null); - } + sessionEventsHandler.onForceDisconnect(moderator, evictedParticipant, participants, transactionId, null, + reason); + + } else if (moderator != null && transactionId != null) { + + this.sessionEventsHandler.onForceDisconnect(moderator, evictedParticipant, + new HashSet<>(Arrays.asList(moderator)), transactionId, + new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE, + "Connection not found when calling 'forceDisconnect'"), + null); } return sessionClosedByLastParticipant; diff --git a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java index ab956dac..21598e3c 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java @@ -767,7 +767,7 @@ public class RpcHandler extends DefaultJsonRpcHandler { } if (!message.isEmpty()) { - RpcConnection rpc = this.notificationService.closeRpcSession(rpcSessionId); + RpcConnection rpc = this.notificationService.immediatelyCloseRpcSession(rpcSessionId); if (rpc != null && rpc.getSessionId() != null) { io.openvidu.server.core.Session session = this.sessionManager.getSession(rpc.getSessionId()); if (session != null && session.getParticipantByPrivateId(rpc.getParticipantPrivateId()) != null) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcNotificationService.java b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcNotificationService.java index 75d31ad9..07b88a66 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcNotificationService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcNotificationService.java @@ -20,6 +20,9 @@ package io.openvidu.server.rpc; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.kurento.jsonrpc.Session; import org.kurento.jsonrpc.Transaction; @@ -38,6 +41,9 @@ public class RpcNotificationService { private ConcurrentMap rpcConnections = new ConcurrentHashMap<>(); + private ScheduledExecutorService closeWsScheduler = Executors + .newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); + public RpcConnection newRpcConnection(Transaction t, Request request) { String participantPrivateId = t.getSession().getSessionId(); RpcConnection connection = new RpcConnection(t.getSession()); @@ -109,26 +115,32 @@ public class RpcNotificationService { } } - public RpcConnection closeRpcSession(String participantPrivateId) { + public RpcConnection immediatelyCloseRpcSession(String participantPrivateId) { RpcConnection rpcSession = rpcConnections.remove(participantPrivateId); if (rpcSession == null || rpcSession.getSession() == null) { if (!isIpcamParticipant(participantPrivateId)) { - log.error("No session found for private id {}, unable to cleanup", participantPrivateId); + log.error("No rpc session found for private id {}, unable to cleanup", participantPrivateId); } return null; } Session s = rpcSession.getSession(); try { s.close(); - log.info("Closed session for participant with private id {}", participantPrivateId); + log.info("Closed rpc session for participant with private id {}", participantPrivateId); this.showRpcConnections(); return rpcSession; } catch (IOException e) { - log.error("Error closing session for participant with private id {}", participantPrivateId, e); + log.error("Error closing rpc session for participant with private id {}: {}", participantPrivateId, + e.getMessage()); } return null; } + public void scheduleCloseRpcSession(String participantPrivateId, int timeoutMs) { + closeWsScheduler.schedule(() -> immediatelyCloseRpcSession(participantPrivateId), timeoutMs, + TimeUnit.MILLISECONDS); + } + private Transaction getAndRemoveTransaction(String participantPrivateId, Integer transactionId) { RpcConnection rpcSession = rpcConnections.get(participantPrivateId); if (rpcSession == null) {