From a08d92e745787752c53d849c5970b3c23608293b Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Thu, 9 Apr 2020 19:42:45 +0200 Subject: [PATCH] openvidu-server: new Session lock protecting joinRoom/leaveRoom methods --- .../java/io/openvidu/server/core/Session.java | 8 + .../kurento/core/KurentoSessionManager.java | 269 ++++++++++-------- .../server/kurento/kms/KmsManager.java | 14 +- 3 files changed, 173 insertions(+), 118 deletions(-) diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/Session.java b/openvidu-server/src/main/java/io/openvidu/server/core/Session.java index 15a6f704..88e98b1d 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/Session.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/Session.java @@ -23,7 +23,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; @@ -70,6 +72,12 @@ public class Session implements SessionInterface { */ public ReadWriteLock closingLock = new ReentrantReadWriteLock(); + /** + * This lock protects the operations of SessionManager#joinRoom and + * SessionManager#leaveRoom + */ + public Lock joinLeaveLock = new ReentrantLock(); + public final AtomicBoolean recordingManuallyStopped = new AtomicBoolean(false); public Session(Session previousSession) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index 165ca537..2e32fd5f 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -88,11 +88,9 @@ public class KurentoSessionManager extends SessionManager { /* Protected by Session.closingLock.readLock */ public void joinRoom(Participant participant, String sessionId, Integer transactionId) { Set existingParticipants = null; - boolean lockAcquired = false; try { KurentoSession kSession = (KurentoSession) sessions.get(sessionId); - if (kSession == null) { // First user connecting to the session Session sessionNotActive = sessionsNotActive.get(sessionId); @@ -108,20 +106,29 @@ public class KurentoSessionManager extends SessionManager { try { if (KmsManager.selectAndRemoveKmsLock.tryLock(MS_MAX_LOCK_WAIT, TimeUnit.SECONDS)) { - lockAcquired = true; - Kms lessLoadedKms = null; try { - lessLoadedKms = this.kmsManager.getLessLoadedConnectedAndRunningKms(); - } catch (NoSuchElementException e) { - // Restore session not active - this.cleanCollections(sessionId); - this.storeSessionNotActive(sessionNotActive); - throw new OpenViduException(Code.ROOM_CANNOT_BE_CREATED_ERROR_CODE, - "There is no available Media Node where to initialize session '" + sessionId + "'"); + kSession = (KurentoSession) sessions.get(sessionId); + + if (kSession == null) { + // Session still null. It was not created by other thread while waiting for lock + Kms lessLoadedKms = null; + try { + lessLoadedKms = this.kmsManager.getLessLoadedConnectedAndRunningKms(); + } catch (NoSuchElementException e) { + // Restore session not active + this.cleanCollections(sessionId); + this.storeSessionNotActive(sessionNotActive); + throw new OpenViduException(Code.ROOM_CANNOT_BE_CREATED_ERROR_CODE, + "There is no available Media Node where to initialize session '" + sessionId + + "'"); + } + log.info("KMS less loaded is {} with a load of {}", lessLoadedKms.getUri(), + lessLoadedKms.getLoad()); + kSession = createSession(sessionNotActive, lessLoadedKms); + } + } finally { + KmsManager.selectAndRemoveKmsLock.unlock(); } - log.info("KMS less loaded is {} with a load of {}", lessLoadedKms.getUri(), - lessLoadedKms.getLoad()); - kSession = createSession(sessionNotActive, lessLoadedKms); } else { String error = "Timeout of " + MS_MAX_LOCK_WAIT + " seconds waiting to acquire lock"; @@ -145,19 +152,35 @@ public class KurentoSessionManager extends SessionManager { + "' is trying to join session '" + sessionId + "' but it is closing"); } - existingParticipants = getParticipants(sessionId); - kSession.join(participant); + try { + if (kSession.joinLeaveLock.tryLock(15, TimeUnit.SECONDS)) { + try { + existingParticipants = getParticipants(sessionId); + kSession.join(participant); + sessionEventsHandler.onParticipantJoined(participant, sessionId, existingParticipants, + transactionId, null); + } finally { + kSession.joinLeaveLock.unlock(); + } + } else { + log.error( + "Timeout waiting for join-leave Session lock to be available for participant {} of session {} in joinRoom", + participant.getParticipantPublicId(), sessionId); + sessionEventsHandler.onParticipantJoined(participant, sessionId, null, transactionId, + new OpenViduException(Code.GENERIC_ERROR_CODE, "Timeout waiting for Session lock")); + } + } catch (InterruptedException e) { + log.error( + "InterruptedException waiting for join-leave Session lock to be available for participant {} of session {} in joinRoom", + participant.getParticipantPublicId(), sessionId); + sessionEventsHandler.onParticipantJoined(participant, sessionId, null, transactionId, + new OpenViduException(Code.GENERIC_ERROR_CODE, + "InterruptedException waiting for Session lock")); + } } catch (OpenViduException e) { - log.warn("PARTICIPANT {}: Error joining/creating session {}", participant.getParticipantPublicId(), + log.error("PARTICIPANT {}: Error joining/creating session {}", participant.getParticipantPublicId(), sessionId, e); sessionEventsHandler.onParticipantJoined(participant, sessionId, null, transactionId, e); - } finally { - if (lockAcquired) { - KmsManager.selectAndRemoveKmsLock.unlock(); - } - } - if (existingParticipants != null) { - sessionEventsHandler.onParticipantJoined(participant, sessionId, existingParticipants, transactionId, null); } } @@ -179,101 +202,125 @@ public class KurentoSessionManager extends SessionManager { throw new OpenViduException(Code.ROOM_CLOSED_ERROR_CODE, "'" + participant.getParticipantPublicId() + "' is trying to leave from session '" + sessionId + "' but it is closing"); } - session.leave(participant.getParticipantPrivateId(), reason); - // Update control data structures - - if (sessionidParticipantpublicidParticipant.get(sessionId) != null) { - Participant p = sessionidParticipantpublicidParticipant.get(sessionId) - .remove(participant.getParticipantPublicId()); - - if (this.coturnCredentialsService.isCoturnAvailable()) { - this.coturnCredentialsService.deleteUser(p.getToken().getTurnCredentials().getUsername()); - } - - // TODO: why is this necessary?? - if (insecureUsers.containsKey(p.getParticipantPrivateId())) { - boolean stillParticipant = false; - for (Session s : sessions.values()) { - if (!s.isClosed() && (s.getParticipantByPrivateId(p.getParticipantPrivateId()) != null)) { - stillParticipant = true; - break; - } - } - if (!stillParticipant) { - insecureUsers.remove(p.getParticipantPrivateId()); - } - } - } - - // Close Session if no more participants - - Set remainingParticipants = null; try { - remainingParticipants = getParticipants(sessionId); - } catch (OpenViduException e) { - log.info("Possible collision when closing the session '{}' (not found)", sessionId); - remainingParticipants = Collections.emptySet(); - } - sessionEventsHandler.onParticipantLeft(participant, sessionId, remainingParticipants, transactionId, null, - reason); + if (session.joinLeaveLock.tryLock(15, TimeUnit.SECONDS)) { + try { - if (!EndReason.sessionClosedByServer.equals(reason)) { - // If session is closed by a call to "DELETE /api/sessions" do NOT stop the - // recording. Will be stopped after in method - // "SessionManager.closeSessionAndEmptyCollections" - if (remainingParticipants.isEmpty()) { - if (openviduConfig.isRecordingModuleEnabled() - && MediaMode.ROUTED.equals(session.getSessionProperties().mediaMode()) - && (this.recordingManager.sessionIsBeingRecorded(sessionId))) { - // Start countdown to stop recording. Will be aborted if a Publisher starts - // before timeout - log.info( - "Last participant left. Starting {} seconds countdown for stopping recording of session {}", - this.openviduConfig.getOpenviduRecordingAutostopTimeout(), sessionId); - recordingManager.initAutomaticRecordingStopThread(session); - } else { - try { - if (session.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) { - try { - if (session.isClosed()) { - return false; - } - log.info("No more participants in session '{}', removing it and closing it", sessionId); - this.closeSessionAndEmptyCollections(session, reason, true); - sessionClosedByLastParticipant = true; - } finally { - session.closingLock.writeLock().unlock(); - } - } else { - log.error( - "Timeout waiting for Session {} closing lock to be available for closing as last participant left", - sessionId); + session.leave(participant.getParticipantPrivateId(), reason); + + // Update control data structures + + if (sessionidParticipantpublicidParticipant.get(sessionId) != null) { + Participant p = sessionidParticipantpublicidParticipant.get(sessionId) + .remove(participant.getParticipantPublicId()); + + if (this.coturnCredentialsService.isCoturnAvailable()) { + this.coturnCredentialsService.deleteUser(p.getToken().getTurnCredentials().getUsername()); + } + + // TODO: why is this necessary?? + if (insecureUsers.containsKey(p.getParticipantPrivateId())) { + boolean stillParticipant = false; + for (Session s : sessions.values()) { + if (!s.isClosed() + && (s.getParticipantByPrivateId(p.getParticipantPrivateId()) != null)) { + stillParticipant = true; + break; + } + } + if (!stillParticipant) { + insecureUsers.remove(p.getParticipantPrivateId()); + } } - } catch (InterruptedException e) { - log.error( - "InterruptedException while waiting for Session {} closing lock to be available for closing as last participant left", - sessionId); } + + // Close Session if no more participants + + Set remainingParticipants = null; + try { + remainingParticipants = getParticipants(sessionId); + } catch (OpenViduException e) { + log.info("Possible collision when closing the session '{}' (not found)", sessionId); + remainingParticipants = Collections.emptySet(); + } + sessionEventsHandler.onParticipantLeft(participant, sessionId, remainingParticipants, transactionId, + null, reason); + + if (!EndReason.sessionClosedByServer.equals(reason)) { + // If session is closed by a call to "DELETE /api/sessions" do NOT stop the + // recording. Will be stopped after in method + // "SessionManager.closeSessionAndEmptyCollections" + if (remainingParticipants.isEmpty()) { + if (openviduConfig.isRecordingModuleEnabled() + && MediaMode.ROUTED.equals(session.getSessionProperties().mediaMode()) + && (this.recordingManager.sessionIsBeingRecorded(sessionId))) { + // Start countdown to stop recording. Will be aborted if a Publisher starts + // before timeout + log.info( + "Last participant left. Starting {} seconds countdown for stopping recording of session {}", + this.openviduConfig.getOpenviduRecordingAutostopTimeout(), sessionId); + recordingManager.initAutomaticRecordingStopThread(session); + } else { + try { + if (session.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) { + try { + if (session.isClosed()) { + return false; + } + log.info("No more participants in session '{}', removing it and closing it", + sessionId); + this.closeSessionAndEmptyCollections(session, reason, true); + sessionClosedByLastParticipant = true; + } finally { + session.closingLock.writeLock().unlock(); + } + } else { + log.error( + "Timeout waiting for Session {} closing lock to be available for closing as last participant left", + sessionId); + } + } catch (InterruptedException e) { + log.error( + "InterruptedException while waiting for Session {} closing lock to be available for closing as last participant left", + sessionId); + } + } + } else if (remainingParticipants.size() == 1 && openviduConfig.isRecordingModuleEnabled() + && MediaMode.ROUTED.equals(session.getSessionProperties().mediaMode()) + && this.recordingManager.sessionIsBeingRecorded(sessionId) + && ProtocolElements.RECORDER_PARTICIPANT_PUBLICID + .equals(remainingParticipants.iterator().next().getParticipantPublicId())) { + // RECORDER participant is the last one standing. Start countdown + log.info( + "Last participant left. Starting {} seconds countdown for stopping recording of session {}", + this.openviduConfig.getOpenviduRecordingAutostopTimeout(), sessionId); + recordingManager.initAutomaticRecordingStopThread(session); + } + } + + // Finally close websocket session if required + if (closeWebSocket) { + sessionEventsHandler.closeRpcSession(participant.getParticipantPrivateId()); + } + + return sessionClosedByLastParticipant; + + } finally { + session.joinLeaveLock.unlock(); } - } else if (remainingParticipants.size() == 1 && openviduConfig.isRecordingModuleEnabled() - && MediaMode.ROUTED.equals(session.getSessionProperties().mediaMode()) - && this.recordingManager.sessionIsBeingRecorded(sessionId) - && ProtocolElements.RECORDER_PARTICIPANT_PUBLICID - .equals(remainingParticipants.iterator().next().getParticipantPublicId())) { - // RECORDER participant is the last one standing. Start countdown - log.info("Last participant left. Starting {} seconds countdown for stopping recording of session {}", - this.openviduConfig.getOpenviduRecordingAutostopTimeout(), sessionId); - recordingManager.initAutomaticRecordingStopThread(session); + } else { + log.error( + "Timeout waiting for join-leave Session lock to be available for participant {} of session {} in leaveRoom", + kParticipant.getParticipantPublicId(), session.getSessionId()); + return false; } + } catch (InterruptedException e) { + log.error( + "Timeout waiting for join-leave Session lock to be available for participant {} of session {} in leaveRoom", + kParticipant.getParticipantPublicId(), session.getSessionId()); + return false; } - - // Finally close websocket session if required - if (closeWebSocket) { - sessionEventsHandler.closeRpcSession(participant.getParticipantPrivateId()); - } - - return sessionClosedByLastParticipant; } /** diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index 299e6ff9..ea2e92f8 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -202,17 +202,17 @@ public abstract class KmsManager { @Override public void disconnected() { final Kms kms = kmss.get(kmsId); - + kms.setKurentoClientConnected(false); kms.setTimeOfKurentoClientDisconnection(System.currentTimeMillis()); - if(kms.getKurentoClient().isClosed()) { + if (kms.getKurentoClient().isClosed()) { log.info("Kurento Client \"disconnected\" event for KMS {} [{}]. Closed explicitely", kms.getUri(), kms.getKurentoClient().toString()); return; } else { - log.info("Kurento Client \"disconnected\" event for KMS {} [{}]. Waiting reconnection", kms.getUri(), - kms.getKurentoClient().toString()); + log.info("Kurento Client \"disconnected\" event for KMS {} [{}]. Waiting reconnection", + kms.getUri(), kms.getKurentoClient().toString()); } // TODO: this is a fix for the lack of reconnected event @@ -225,7 +225,7 @@ public abstract class KmsManager { try { if (kmsReconnectionLocks.get(kms.getId()).tryLock(5, TimeUnit.SECONDS)) { lockAcquired = true; - + if (kms.isKurentoClientConnected()) { // reconnected listener already executed log.info( @@ -234,8 +234,8 @@ public abstract class KmsManager { timer.cancel(); return; } - - if(kms.getKurentoClient().isClosed()) { + + if (kms.getKurentoClient().isClosed()) { log.info( "Timer of KMS with uri {} and KurentoClient [{}] has been closed. Cancelling Timer", kms.getUri(), kms.getKurentoClient().toString());