From 46d6199d427298ea0f1b6c64d055771f0bbb0da9 Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Fri, 24 Jan 2020 11:16:49 +0100 Subject: [PATCH] openvidu-server: Session ReadWriteLock for closing operation --- .../java/io/openvidu/server/core/Session.java | 13 ++ .../openvidu/server/core/SessionManager.java | 17 +-- .../kurento/core/KurentoSessionManager.java | 20 ++- .../recording/service/RecordingManager.java | 69 ++++++---- .../server/rest/SessionRestController.java | 120 +++++++++++------- .../io/openvidu/server/rpc/RpcHandler.java | 34 +++-- 6 files changed, 182 insertions(+), 91 deletions(-) diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/Session.java b/openvidu-server/src/main/java/io/openvidu/server/core/Session.java index 9e402202..310feb0f 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/Session.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/Session.java @@ -23,6 +23,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import com.google.gson.JsonArray; @@ -51,6 +53,17 @@ public class Session implements SessionInterface { protected volatile boolean closed = false; protected AtomicInteger activePublishers = new AtomicInteger(0); + /** + * This lock protects the following operations with read lock: [REST API](POST + * /api/tokens, POST /sessions/{sessionId}/connection), [RPC](joinRoom). + * + * All of them get it with tryLock, immediately failing if written locked + * + * Lock is written-locked upon session close up. That is: everywhere in the code + * calling method SessionManager#closeSessionAndEmptyCollections (5 times) + */ + public ReadWriteLock closingLock = new ReentrantReadWriteLock(); + public final AtomicBoolean recordingManuallyStopped = new AtomicBoolean(false); public Session(Session previousSession) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java index d4214b43..f276a0d0 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java @@ -489,7 +489,15 @@ public abstract class SessionManager { // This code should only be executed when there were no participants connected // to the session. That is: if the session was in the automatic recording stop // timeout with INDIVIDUAL recording (no docker participant connected) - this.closeSessionAndEmptyCollections(session, reason, true); + try { + session.closingLock.writeLock().lock(); + if (session.isClosed()) { + return; + } + this.closeSessionAndEmptyCollections(session, reason, true); + } finally { + session.closingLock.writeLock().unlock(); + } } } @@ -500,13 +508,6 @@ public abstract class SessionManager { recordingManager.stopRecording(session, null, RecordingManager.finalReason(reason)); } - if (EndReason.automaticStop.equals(reason) && !session.getParticipants().isEmpty() - && !session.onlyRecorderParticipant()) { - log.warn( - "Some user connected to the session between automatic recording stop and session close up. Canceling session close up"); - return; - } - final String mediaNodeId = session.getMediaNodeId(); if (session.close(reason)) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index 76f35817..4ec3a164 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -89,7 +89,7 @@ public class KurentoSessionManager extends SessionManager { if (kSession == null) { // First user connecting to the session - Session sessionNotActive = sessionsNotActive.remove(sessionId); + Session sessionNotActive = sessionsNotActive.get(sessionId); if (sessionNotActive == null && this.isInsecureParticipant(participant.getParticipantPrivateId())) { // Insecure user directly call joinRoom RPC method, without REST API use @@ -213,10 +213,19 @@ public class KurentoSessionManager extends SessionManager { this.openviduConfig.getOpenviduRecordingAutostopTimeout(), sessionId); recordingManager.initAutomaticRecordingStopThread(session); } else { - log.info("No more participants in session '{}', removing it and closing it", sessionId); - this.closeSessionAndEmptyCollections(session, reason, true); - sessionClosedByLastParticipant = true; - showTokens(); + try { + session.closingLock.writeLock().lock(); + if (session.isClosed()) { + return false; + } + log.info("No more participants in session '{}', removing it and closing it", sessionId); + this.closeSessionAndEmptyCollections(session, reason, true); + sessionClosedByLastParticipant = true; + showTokens(); + } finally { + session.closingLock.writeLock().unlock(); + } + } } else if (remainingParticipants.size() == 1 && openviduConfig.isRecordingModuleEnabled() && MediaMode.ROUTED.equals(session.getSessionProperties().mediaMode()) @@ -514,6 +523,7 @@ public class KurentoSessionManager extends SessionManager { } session = new KurentoSession(sessionNotActive, kms, kurentoSessionEventsHandler, kurentoEndpointConfig); + sessionsNotActive.remove(session.getSessionId()); KurentoSession oldSession = (KurentoSession) sessions.putIfAbsent(session.getSessionId(), session); if (oldSession != null) { log.warn("Session '{}' has just been created by another thread", session.getSessionId()); diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java index 6f46c657..6a9f9cad 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java @@ -482,19 +482,34 @@ public class RecordingManager { recordingId, this.openviduConfig.getOpenviduRecordingAutostopTimeout()); if (this.automaticRecordingStopThreads.remove(session.getSessionId()) != null) { - if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) { - // Close session if there are no participants connected (RECORDER does not - // count) and publishing - log.info("Closing session {} after automatic stop of recording {}", session.getSessionId(), - recordingId); - sessionManager.closeSessionAndEmptyCollections(session, EndReason.automaticStop, true); - sessionManager.showTokens(); - } else { - // There are users connected, but no one is publishing - log.info( - "Automatic stopping recording {}. There are users connected to session {}, but no one is publishing", - recordingId, session.getSessionId()); - this.stopRecording(session, recordingId, EndReason.automaticStop); + + boolean alreadyUnlocked = false; + try { + session.closingLock.writeLock().lock(); + if (session.isClosed()) { + return; + } + + if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) { + // Close session if there are no participants connected (RECORDER does not + // count) and publishing + log.info("Closing session {} after automatic stop of recording {}", session.getSessionId(), + recordingId); + sessionManager.closeSessionAndEmptyCollections(session, EndReason.automaticStop, true); + sessionManager.showTokens(); + } else { + // There are users connected, but no one is publishing + session.closingLock.writeLock().unlock(); // We don't need the lock if session's not closing + alreadyUnlocked = true; + log.info( + "Automatic stopping recording {}. There are users connected to session {}, but no one is publishing", + recordingId, session.getSessionId()); + this.stopRecording(session, recordingId, EndReason.automaticStop); + } + } finally { + if (!alreadyUnlocked) { + session.closingLock.writeLock().unlock(); + } } } else { // This code shouldn't be reachable @@ -510,17 +525,25 @@ public class RecordingManager { ScheduledFuture future = this.automaticRecordingStopThreads.remove(session.getSessionId()); if (future != null) { boolean cancelled = future.cancel(false); - if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) { - // Close session if there are no participants connected (except for RECORDER). - // This code will only be executed if recording is manually stopped during the - // automatic stop timeout, so the session must be also closed - log.info( - "Ongoing recording of session {} was explicetly stopped within timeout for automatic recording stop. Closing session", - session.getSessionId()); - sessionManager.closeSessionAndEmptyCollections(session, reason, false); - sessionManager.showTokens(); + try { + session.closingLock.writeLock().lock(); + if (session.isClosed()) { + return false; + } + if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) { + // Close session if there are no participants connected (except for RECORDER). + // This code will only be executed if recording is manually stopped during the + // automatic stop timeout, so the session must be also closed + log.info( + "Ongoing recording of session {} was explicetly stopped within timeout for automatic recording stop. Closing session", + session.getSessionId()); + sessionManager.closeSessionAndEmptyCollections(session, reason, false); + sessionManager.showTokens(); + } + return cancelled; + } finally { + session.closingLock.writeLock().unlock(); } - return cancelled; } else { return true; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java b/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java index fefdbaa5..789c168d 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java +++ b/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java @@ -223,9 +223,17 @@ public class SessionRestController { Session sessionNotActive = this.sessionManager.getSessionNotActive(sessionId); if (sessionNotActive != null) { - this.sessionManager.closeSessionAndEmptyCollections(sessionNotActive, EndReason.sessionClosedByServer, - true); - return new ResponseEntity<>(HttpStatus.NO_CONTENT); + try { + sessionNotActive.closingLock.writeLock().lock(); + if (sessionNotActive.isClosed()) { + return new ResponseEntity<>(HttpStatus.NOT_FOUND); + } + this.sessionManager.closeSessionAndEmptyCollections(sessionNotActive, EndReason.sessionClosedByServer, + true); + return new ResponseEntity<>(HttpStatus.NO_CONTENT); + } finally { + sessionNotActive.closingLock.writeLock().unlock(); + } } else { return new ResponseEntity<>(HttpStatus.NOT_FOUND); } @@ -310,7 +318,8 @@ public class SessionRestController { HttpStatus.BAD_REQUEST); } - if (this.sessionManager.getSessionWithNotActive(sessionId) == null) { + final Session session = this.sessionManager.getSessionWithNotActive(sessionId); + if (session == null) { return this.generateErrorResponse("Session " + sessionId + " not found", "/api/tokens", HttpStatus.NOT_FOUND); } @@ -350,43 +359,54 @@ public class SessionRestController { metadata = (metadata != null) ? metadata : ""; - String token = sessionManager.newToken(sessionId, role, metadata, kurentoTokenOptions); + // While closing a session tokens can't be generated + if (session.closingLock.readLock().tryLock()) { + try { + String token = sessionManager.newToken(sessionId, role, metadata, kurentoTokenOptions); - JsonObject responseJson = new JsonObject(); - responseJson.addProperty("id", token); - responseJson.addProperty("session", sessionId); - responseJson.addProperty("role", role.toString()); - responseJson.addProperty("data", metadata); - responseJson.addProperty("token", token); + JsonObject responseJson = new JsonObject(); + responseJson.addProperty("id", token); + responseJson.addProperty("session", sessionId); + responseJson.addProperty("role", role.toString()); + responseJson.addProperty("data", metadata); + responseJson.addProperty("token", token); - if (kurentoOptions != null) { - JsonObject kurentoOptsResponse = new JsonObject(); - if (kurentoTokenOptions.getVideoMaxRecvBandwidth() != null) { - kurentoOptsResponse.addProperty("videoMaxRecvBandwidth", - kurentoTokenOptions.getVideoMaxRecvBandwidth()); - } - if (kurentoTokenOptions.getVideoMinRecvBandwidth() != null) { - kurentoOptsResponse.addProperty("videoMinRecvBandwidth", - kurentoTokenOptions.getVideoMinRecvBandwidth()); - } - if (kurentoTokenOptions.getVideoMaxSendBandwidth() != null) { - kurentoOptsResponse.addProperty("videoMaxSendBandwidth", - kurentoTokenOptions.getVideoMaxSendBandwidth()); - } - if (kurentoTokenOptions.getVideoMinSendBandwidth() != null) { - kurentoOptsResponse.addProperty("videoMinSendBandwidth", - kurentoTokenOptions.getVideoMinSendBandwidth()); - } - if (kurentoTokenOptions.getAllowedFilters().length > 0) { - JsonArray filters = new JsonArray(); - for (String filter : kurentoTokenOptions.getAllowedFilters()) { - filters.add(filter); + if (kurentoOptions != null) { + JsonObject kurentoOptsResponse = new JsonObject(); + if (kurentoTokenOptions.getVideoMaxRecvBandwidth() != null) { + kurentoOptsResponse.addProperty("videoMaxRecvBandwidth", + kurentoTokenOptions.getVideoMaxRecvBandwidth()); + } + if (kurentoTokenOptions.getVideoMinRecvBandwidth() != null) { + kurentoOptsResponse.addProperty("videoMinRecvBandwidth", + kurentoTokenOptions.getVideoMinRecvBandwidth()); + } + if (kurentoTokenOptions.getVideoMaxSendBandwidth() != null) { + kurentoOptsResponse.addProperty("videoMaxSendBandwidth", + kurentoTokenOptions.getVideoMaxSendBandwidth()); + } + if (kurentoTokenOptions.getVideoMinSendBandwidth() != null) { + kurentoOptsResponse.addProperty("videoMinSendBandwidth", + kurentoTokenOptions.getVideoMinSendBandwidth()); + } + if (kurentoTokenOptions.getAllowedFilters().length > 0) { + JsonArray filters = new JsonArray(); + for (String filter : kurentoTokenOptions.getAllowedFilters()) { + filters.add(filter); + } + kurentoOptsResponse.add("allowedFilters", filters); + } + responseJson.add("kurentoOptions", kurentoOptsResponse); } - kurentoOptsResponse.add("allowedFilters", filters); + return new ResponseEntity<>(responseJson.toString(), getResponseHeaders(), HttpStatus.OK); + } finally { + session.closingLock.readLock().unlock(); } - responseJson.add("kurentoOptions", kurentoOptsResponse); + } else { + log.error("Session {} is in the process of closing. Token couldn't be generated", sessionId); + return this.generateErrorResponse("Session " + sessionId + " not found", "/api/tokens", + HttpStatus.NOT_FOUND); } - return new ResponseEntity<>(responseJson.toString(), getResponseHeaders(), HttpStatus.OK); } @RequestMapping(value = "/recordings/start", method = RequestMethod.POST) @@ -717,15 +737,25 @@ public class SessionRestController { videoActive, typeOfVideo, frameRate, videoDimensions, null, false, rtspUri, adaptativeBitrate, onlyPlayWithSubscribers); - try { - Participant ipcamParticipant = this.sessionManager.publishIpcam(session, mediaOptions, data); - return new ResponseEntity<>(ipcamParticipant.toJson().toString(), getResponseHeaders(), HttpStatus.OK); - } catch (MalformedURLException e) { - return this.generateErrorResponse("\"rtspUri\" parameter is not a valid rtsp uri", - "/api/sessions/" + sessionId + "/connection", HttpStatus.BAD_REQUEST); - } catch (Exception e) { - return this.generateErrorResponse(e.getMessage(), "/api/sessions/" + sessionId + "/connection", - HttpStatus.INTERNAL_SERVER_ERROR); + // While closing a session IP cameras can't be published + if (session.closingLock.readLock().tryLock()) { + try { + if (session.isClosed()) { + return new ResponseEntity<>(HttpStatus.NOT_FOUND); + } + Participant ipcamParticipant = this.sessionManager.publishIpcam(session, mediaOptions, data); + return new ResponseEntity<>(ipcamParticipant.toJson().toString(), getResponseHeaders(), HttpStatus.OK); + } catch (MalformedURLException e) { + return this.generateErrorResponse("\"rtspUri\" parameter is not a valid rtsp uri", + "/api/sessions/" + sessionId + "/connection", HttpStatus.BAD_REQUEST); + } catch (Exception e) { + return this.generateErrorResponse(e.getMessage(), "/api/sessions/" + sessionId + "/connection", + HttpStatus.INTERNAL_SERVER_ERROR); + } finally { + session.closingLock.readLock().unlock(); + } + } else { + return new ResponseEntity<>(HttpStatus.NOT_FOUND); } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java index 25debc26..522a54b1 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/rpc/RpcHandler.java @@ -245,19 +245,33 @@ public class RpcHandler extends DefaultJsonRpcHandler { Token tokenObj = sessionManager.consumeToken(sessionId, participantPrivatetId, token); Participant participant; + io.openvidu.server.core.Session session = sessionManager.getSessionWithNotActive(sessionId); - if (generateRecorderParticipant) { - participant = sessionManager.newRecorderParticipant(sessionId, participantPrivatetId, tokenObj, - clientMetadata); + // While closing a session users can't join + if (session.closingLock.readLock().tryLock()) { + try { + if (generateRecorderParticipant) { + participant = sessionManager.newRecorderParticipant(sessionId, participantPrivatetId, + tokenObj, clientMetadata); + } else { + participant = sessionManager.newParticipant(sessionId, participantPrivatetId, tokenObj, + clientMetadata, location, platform, + httpSession.getId().substring(0, Math.min(16, httpSession.getId().length()))); + } + + rpcConnection.setSessionId(sessionId); + sessionManager.joinRoom(participant, sessionId, request.getId()); + + } finally { + session.closingLock.readLock().unlock(); + } } else { - participant = sessionManager.newParticipant(sessionId, participantPrivatetId, tokenObj, - clientMetadata, location, platform, - httpSession.getId().substring(0, Math.min(16, httpSession.getId().length()))); + log.error( + "ERROR: The session {} is in the process of closing while participant {} (privateId) was joining", + sessionId, participantPrivatetId); + throw new OpenViduException(Code.ROOM_CLOSED_ERROR_CODE, + "Unable to join the session. Session " + sessionId + " was in the process of closing"); } - - rpcConnection.setSessionId(sessionId); - sessionManager.joinRoom(participant, sessionId, request.getId()); - } else { log.error("ERROR: Metadata format set in client-side is incorrect"); throw new OpenViduException(Code.USER_METADATA_FORMAT_INVALID_ERROR_CODE,