From 7636d8a5169bc2e565cc2c8b8a8231f77d6b5f52 Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Thu, 9 Apr 2020 13:20:29 +0200 Subject: [PATCH] openvidu-server: Session closingLock from .lock() to .tryLock() --- .../openvidu/server/core/SessionManager.java | 49 +++++++--- .../kurento/core/KurentoParticipant.java | 10 +- .../kurento/core/KurentoSessionManager.java | 33 ++++--- .../recording/service/RecordingManager.java | 97 ++++++++++++------- .../server/rest/SessionRestController.java | 30 ++++-- 5 files changed, 145 insertions(+), 74 deletions(-) diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java index 5f4e60fc..559ded71 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java @@ -28,6 +28,7 @@ import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.annotation.PostConstruct; @@ -447,16 +448,27 @@ public abstract class SessionManager { long sessionExistsSince = currentMillis - sessionNotActive.getStartTime(); if (sessionExistsSince > (openviduConfig.getSessionGarbageThreshold() * 1000)) { try { - sessionNotActive.closingLock.writeLock().lock(); - if (sessions.containsKey(sessionId)) { - // The session passed to active during lock wait - continue; + if (sessionNotActive.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) { + try { + if (sessions.containsKey(sessionId)) { + // The session passed to active during lock wait + continue; + } + iter.remove(); + cleanCollections(sessionId); + log.info("Non active session {} cleaned up by garbage collector", sessionId); + } finally { + sessionNotActive.closingLock.writeLock().unlock(); + } + } else { + log.error( + "Timeout waiting for Session closing lock to be available for garbage collector to clean session {}", + sessionId); } - iter.remove(); - cleanCollections(sessionId); - log.info("Non active session {} cleaned up by garbage collector", sessionId); - } finally { - sessionNotActive.closingLock.writeLock().unlock(); + } catch (InterruptedException e) { + log.error( + "InterruptedException while waiting for Session closing lock to be available for garbage collector to clean session {}", + sessionId); } } } @@ -515,13 +527,20 @@ public abstract class SessionManager { // to the session. That is: if the session was in the automatic recording stop // timeout with INDIVIDUAL recording (no docker participant connected) try { - session.closingLock.writeLock().lock(); - if (session.isClosed()) { - return; + if (session.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) { + try { + if (session.isClosed()) { + return; + } + this.closeSessionAndEmptyCollections(session, reason, true); + } finally { + session.closingLock.writeLock().unlock(); + } + } else { + log.error("Timeout waiting for Session {} closing lock to be available", sessionId); } - this.closeSessionAndEmptyCollections(session, reason, true); - } finally { - session.closingLock.writeLock().unlock(); + } catch (InterruptedException e) { + log.error("InterruptedException while waiting for Session {} closing lock to be available", sessionId); } } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java index 7fedf892..c470b996 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java @@ -331,12 +331,18 @@ public class KurentoParticipant extends Participant { } finally { pub.closingLock.writeLock().unlock(); } + } else { + log.error( + "Timeout waiting for PublisherEndpoint closing lock of participant {} to be available for participant {} to call cancelReceveivingMedia", + senderName, this.getParticipantPublicId()); } } catch (InterruptedException e) { - subscribers.remove(senderName); log.error( - "Timeout wating for PublisherEndpoint closing lock of participant {} to be available for participant {} to call cancelReceveivingMedia", + "InterruptedException while waiting for PublisherEndpoint closing lock of participant {} to be available for participant {} to call cancelReceveivingMedia", senderName, this.getParticipantPublicId()); + } finally { + // Always clean map + subscribers.remove(senderName); } } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index 288bc496..165ca537 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -86,7 +86,7 @@ public class KurentoSessionManager extends SessionManager { @Override /* Protected by Session.closingLock.readLock */ - public synchronized void joinRoom(Participant participant, String sessionId, Integer transactionId) { + public void joinRoom(Participant participant, String sessionId, Integer transactionId) { Set existingParticipants = null; boolean lockAcquired = false; try { @@ -162,8 +162,7 @@ public class KurentoSessionManager extends SessionManager { } @Override - public synchronized boolean leaveRoom(Participant participant, Integer transactionId, EndReason reason, - boolean closeWebSocket) { + public boolean leaveRoom(Participant participant, Integer transactionId, EndReason reason, boolean closeWebSocket) { log.info("Request [LEAVE_ROOM] for participant {} of session {} with reason {}", participant.getParticipantPublicId(), participant.getSessionId(), reason != null ? reason.name() : "NULL"); @@ -235,17 +234,27 @@ public class KurentoSessionManager extends SessionManager { recordingManager.initAutomaticRecordingStopThread(session); } else { try { - session.closingLock.writeLock().lock(); - if (session.isClosed()) { - return false; + if (session.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) { + try { + if (session.isClosed()) { + return false; + } + log.info("No more participants in session '{}', removing it and closing it", sessionId); + this.closeSessionAndEmptyCollections(session, reason, true); + sessionClosedByLastParticipant = true; + } finally { + session.closingLock.writeLock().unlock(); + } + } else { + log.error( + "Timeout waiting for Session {} closing lock to be available for closing as last participant left", + sessionId); } - log.info("No more participants in session '{}', removing it and closing it", sessionId); - this.closeSessionAndEmptyCollections(session, reason, true); - sessionClosedByLastParticipant = true; - } finally { - session.closingLock.writeLock().unlock(); + } catch (InterruptedException e) { + log.error( + "InterruptedException while waiting for Session {} closing lock to be available for closing as last participant left", + sessionId); } - } } else if (remainingParticipants.size() == 1 && openviduConfig.isRecordingModuleEnabled() && MediaMode.ROUTED.equals(session.getSessionProperties().mediaMode()) diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java index b2117810..72736971 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java @@ -480,34 +480,46 @@ public class RecordingManager { recordingId, this.openviduConfig.getOpenviduRecordingAutostopTimeout()); if (this.automaticRecordingStopThreads.remove(session.getSessionId()) != null) { - boolean alreadyUnlocked = false; try { - session.closingLock.writeLock().lock(); - if (session.isClosed()) { - return; - } - - if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) { - // Close session if there are no participants connected (RECORDER does not - // count) and publishing - log.info("Closing session {} after automatic stop of recording {}", session.getSessionId(), - recordingId); - sessionManager.closeSessionAndEmptyCollections(session, EndReason.automaticStop, true); + if (session.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) { + try { + if (session.isClosed()) { + return; + } + if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) { + // Close session if there are no participants connected (RECORDER does not + // count) and publishing + log.info("Closing session {} after automatic stop of recording {}", + session.getSessionId(), recordingId); + sessionManager.closeSessionAndEmptyCollections(session, EndReason.automaticStop, + true); + } else { + // There are users connected, but no one is publishing + // We don't need the lock if session is not closing + session.closingLock.writeLock().unlock(); + alreadyUnlocked = true; + log.info( + "Automatic stopping recording {}. There are users connected to session {}, but no one is publishing", + recordingId, session.getSessionId()); + this.stopRecording(session, recordingId, EndReason.automaticStop); + } + } finally { + if (!alreadyUnlocked) { + session.closingLock.writeLock().unlock(); + } + } } else { - // There are users connected, but no one is publishing - session.closingLock.writeLock().unlock(); // We don't need the lock if session's not closing - alreadyUnlocked = true; - log.info( - "Automatic stopping recording {}. There are users connected to session {}, but no one is publishing", - recordingId, session.getSessionId()); - this.stopRecording(session, recordingId, EndReason.automaticStop); - } - } finally { - if (!alreadyUnlocked) { - session.closingLock.writeLock().unlock(); + log.error( + "Timeout waiting for Session {} closing lock to be available for automatic recording stop thred", + session.getSessionId()); } + } catch (InterruptedException e) { + log.error( + "InterruptedException while waiting for Session {} closing lock to be available for automatic recording stop thred", + session.getSessionId()); } + } else { // This code shouldn't be reachable log.warn("Recording {} was already automatically stopped by a previous thread", recordingId); @@ -523,23 +535,34 @@ public class RecordingManager { if (future != null) { boolean cancelled = future.cancel(false); try { - session.closingLock.writeLock().lock(); - if (session.isClosed()) { - return false; - } - if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) { - // Close session if there are no participants connected (except for RECORDER). - // This code will only be executed if recording is manually stopped during the - // automatic stop timeout, so the session must be also closed - log.info( - "Ongoing recording of session {} was explicetly stopped within timeout for automatic recording stop. Closing session", + if (session.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) { + try { + if (session.isClosed()) { + return false; + } + if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) { + // Close session if there are no participants connected (except for RECORDER). + // This code will only be executed if recording is manually stopped during the + // automatic stop timeout, so the session must be also closed + log.info( + "Ongoing recording of session {} was explicetly stopped within timeout for automatic recording stop. Closing session", + session.getSessionId()); + sessionManager.closeSessionAndEmptyCollections(session, reason, false); + } + } finally { + session.closingLock.writeLock().unlock(); + } + } else { + log.error( + "Timeout waiting for Session {} closing lock to be available for aborting automatic recording stop thred", session.getSessionId()); - sessionManager.closeSessionAndEmptyCollections(session, reason, false); } - return cancelled; - } finally { - session.closingLock.writeLock().unlock(); + } catch (InterruptedException e) { + log.error( + "InterruptedException while waiting for Session {} closing lock to be available for aborting automatic recording stop thred", + session.getSessionId()); } + return cancelled; } else { return true; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java b/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java index c9f02273..6b1897fd 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java +++ b/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java @@ -21,6 +21,7 @@ import java.net.MalformedURLException; import java.util.ArrayList; import java.util.Collection; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.lang3.RandomStringUtils; @@ -234,15 +235,28 @@ public class SessionRestController { Session sessionNotActive = this.sessionManager.getSessionNotActive(sessionId); if (sessionNotActive != null) { try { - sessionNotActive.closingLock.writeLock().lock(); - if (sessionNotActive.isClosed()) { - return new ResponseEntity<>(HttpStatus.NOT_FOUND); + if (sessionNotActive.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) { + try { + if (sessionNotActive.isClosed()) { + return new ResponseEntity<>(HttpStatus.NOT_FOUND); + } + this.sessionManager.closeSessionAndEmptyCollections(sessionNotActive, + EndReason.sessionClosedByServer, true); + return new ResponseEntity<>(HttpStatus.NO_CONTENT); + } finally { + sessionNotActive.closingLock.writeLock().unlock(); + } + } else { + String errorMsg = "Timeout waiting for Session " + sessionId + + " closing lock to be available for closing from DELETE /api/sessions"; + log.error(errorMsg); + return this.generateErrorResponse(errorMsg, "/api/sessions", HttpStatus.BAD_REQUEST); } - this.sessionManager.closeSessionAndEmptyCollections(sessionNotActive, EndReason.sessionClosedByServer, - true); - return new ResponseEntity<>(HttpStatus.NO_CONTENT); - } finally { - sessionNotActive.closingLock.writeLock().unlock(); + } catch (InterruptedException e) { + String errorMsg = "InterruptedException while waiting for Session " + sessionId + + " closing lock to be available for closing from DELETE /api/sessions"; + log.error(errorMsg); + return this.generateErrorResponse(errorMsg, "/api/sessions", HttpStatus.BAD_REQUEST); } } else { return new ResponseEntity<>(HttpStatus.NOT_FOUND);