openvidu-server: Session ReadWriteLock for closing operation

pull/391/head
pabloFuente 2020-01-24 11:16:49 +01:00
parent 4fcb6839d0
commit 46d6199d42
6 changed files with 182 additions and 91 deletions

View File

@ -23,6 +23,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function; import java.util.function.Function;
import com.google.gson.JsonArray; import com.google.gson.JsonArray;
@ -51,6 +53,17 @@ public class Session implements SessionInterface {
protected volatile boolean closed = false; protected volatile boolean closed = false;
protected AtomicInteger activePublishers = new AtomicInteger(0); protected AtomicInteger activePublishers = new AtomicInteger(0);
/**
* This lock protects the following operations with read lock: [REST API](POST
* /api/tokens, POST /sessions/{sessionId}/connection), [RPC](joinRoom).
*
* All of them get it with tryLock, immediately failing if written locked
*
* Lock is written-locked upon session close up. That is: everywhere in the code
* calling method SessionManager#closeSessionAndEmptyCollections (5 times)
*/
public ReadWriteLock closingLock = new ReentrantReadWriteLock();
public final AtomicBoolean recordingManuallyStopped = new AtomicBoolean(false); public final AtomicBoolean recordingManuallyStopped = new AtomicBoolean(false);
public Session(Session previousSession) { public Session(Session previousSession) {

View File

@ -489,7 +489,15 @@ public abstract class SessionManager {
// This code should only be executed when there were no participants connected // This code should only be executed when there were no participants connected
// to the session. That is: if the session was in the automatic recording stop // to the session. That is: if the session was in the automatic recording stop
// timeout with INDIVIDUAL recording (no docker participant connected) // timeout with INDIVIDUAL recording (no docker participant connected)
try {
session.closingLock.writeLock().lock();
if (session.isClosed()) {
return;
}
this.closeSessionAndEmptyCollections(session, reason, true); this.closeSessionAndEmptyCollections(session, reason, true);
} finally {
session.closingLock.writeLock().unlock();
}
} }
} }
@ -500,13 +508,6 @@ public abstract class SessionManager {
recordingManager.stopRecording(session, null, RecordingManager.finalReason(reason)); recordingManager.stopRecording(session, null, RecordingManager.finalReason(reason));
} }
if (EndReason.automaticStop.equals(reason) && !session.getParticipants().isEmpty()
&& !session.onlyRecorderParticipant()) {
log.warn(
"Some user connected to the session between automatic recording stop and session close up. Canceling session close up");
return;
}
final String mediaNodeId = session.getMediaNodeId(); final String mediaNodeId = session.getMediaNodeId();
if (session.close(reason)) { if (session.close(reason)) {

View File

@ -89,7 +89,7 @@ public class KurentoSessionManager extends SessionManager {
if (kSession == null) { if (kSession == null) {
// First user connecting to the session // First user connecting to the session
Session sessionNotActive = sessionsNotActive.remove(sessionId); Session sessionNotActive = sessionsNotActive.get(sessionId);
if (sessionNotActive == null && this.isInsecureParticipant(participant.getParticipantPrivateId())) { if (sessionNotActive == null && this.isInsecureParticipant(participant.getParticipantPrivateId())) {
// Insecure user directly call joinRoom RPC method, without REST API use // Insecure user directly call joinRoom RPC method, without REST API use
@ -213,10 +213,19 @@ public class KurentoSessionManager extends SessionManager {
this.openviduConfig.getOpenviduRecordingAutostopTimeout(), sessionId); this.openviduConfig.getOpenviduRecordingAutostopTimeout(), sessionId);
recordingManager.initAutomaticRecordingStopThread(session); recordingManager.initAutomaticRecordingStopThread(session);
} else { } else {
try {
session.closingLock.writeLock().lock();
if (session.isClosed()) {
return false;
}
log.info("No more participants in session '{}', removing it and closing it", sessionId); log.info("No more participants in session '{}', removing it and closing it", sessionId);
this.closeSessionAndEmptyCollections(session, reason, true); this.closeSessionAndEmptyCollections(session, reason, true);
sessionClosedByLastParticipant = true; sessionClosedByLastParticipant = true;
showTokens(); showTokens();
} finally {
session.closingLock.writeLock().unlock();
}
} }
} else if (remainingParticipants.size() == 1 && openviduConfig.isRecordingModuleEnabled() } else if (remainingParticipants.size() == 1 && openviduConfig.isRecordingModuleEnabled()
&& MediaMode.ROUTED.equals(session.getSessionProperties().mediaMode()) && MediaMode.ROUTED.equals(session.getSessionProperties().mediaMode())
@ -514,6 +523,7 @@ public class KurentoSessionManager extends SessionManager {
} }
session = new KurentoSession(sessionNotActive, kms, kurentoSessionEventsHandler, kurentoEndpointConfig); session = new KurentoSession(sessionNotActive, kms, kurentoSessionEventsHandler, kurentoEndpointConfig);
sessionsNotActive.remove(session.getSessionId());
KurentoSession oldSession = (KurentoSession) sessions.putIfAbsent(session.getSessionId(), session); KurentoSession oldSession = (KurentoSession) sessions.putIfAbsent(session.getSessionId(), session);
if (oldSession != null) { if (oldSession != null) {
log.warn("Session '{}' has just been created by another thread", session.getSessionId()); log.warn("Session '{}' has just been created by another thread", session.getSessionId());

View File

@ -482,6 +482,14 @@ public class RecordingManager {
recordingId, this.openviduConfig.getOpenviduRecordingAutostopTimeout()); recordingId, this.openviduConfig.getOpenviduRecordingAutostopTimeout());
if (this.automaticRecordingStopThreads.remove(session.getSessionId()) != null) { if (this.automaticRecordingStopThreads.remove(session.getSessionId()) != null) {
boolean alreadyUnlocked = false;
try {
session.closingLock.writeLock().lock();
if (session.isClosed()) {
return;
}
if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) { if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) {
// Close session if there are no participants connected (RECORDER does not // Close session if there are no participants connected (RECORDER does not
// count) and publishing // count) and publishing
@ -491,11 +499,18 @@ public class RecordingManager {
sessionManager.showTokens(); sessionManager.showTokens();
} else { } else {
// There are users connected, but no one is publishing // There are users connected, but no one is publishing
session.closingLock.writeLock().unlock(); // We don't need the lock if session's not closing
alreadyUnlocked = true;
log.info( log.info(
"Automatic stopping recording {}. There are users connected to session {}, but no one is publishing", "Automatic stopping recording {}. There are users connected to session {}, but no one is publishing",
recordingId, session.getSessionId()); recordingId, session.getSessionId());
this.stopRecording(session, recordingId, EndReason.automaticStop); this.stopRecording(session, recordingId, EndReason.automaticStop);
} }
} finally {
if (!alreadyUnlocked) {
session.closingLock.writeLock().unlock();
}
}
} else { } else {
// This code shouldn't be reachable // This code shouldn't be reachable
log.warn("Recording {} was already automatically stopped by a previous thread", recordingId); log.warn("Recording {} was already automatically stopped by a previous thread", recordingId);
@ -510,6 +525,11 @@ public class RecordingManager {
ScheduledFuture<?> future = this.automaticRecordingStopThreads.remove(session.getSessionId()); ScheduledFuture<?> future = this.automaticRecordingStopThreads.remove(session.getSessionId());
if (future != null) { if (future != null) {
boolean cancelled = future.cancel(false); boolean cancelled = future.cancel(false);
try {
session.closingLock.writeLock().lock();
if (session.isClosed()) {
return false;
}
if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) { if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) {
// Close session if there are no participants connected (except for RECORDER). // Close session if there are no participants connected (except for RECORDER).
// This code will only be executed if recording is manually stopped during the // This code will only be executed if recording is manually stopped during the
@ -521,6 +541,9 @@ public class RecordingManager {
sessionManager.showTokens(); sessionManager.showTokens();
} }
return cancelled; return cancelled;
} finally {
session.closingLock.writeLock().unlock();
}
} else { } else {
return true; return true;
} }

View File

@ -223,9 +223,17 @@ public class SessionRestController {
Session sessionNotActive = this.sessionManager.getSessionNotActive(sessionId); Session sessionNotActive = this.sessionManager.getSessionNotActive(sessionId);
if (sessionNotActive != null) { if (sessionNotActive != null) {
try {
sessionNotActive.closingLock.writeLock().lock();
if (sessionNotActive.isClosed()) {
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
this.sessionManager.closeSessionAndEmptyCollections(sessionNotActive, EndReason.sessionClosedByServer, this.sessionManager.closeSessionAndEmptyCollections(sessionNotActive, EndReason.sessionClosedByServer,
true); true);
return new ResponseEntity<>(HttpStatus.NO_CONTENT); return new ResponseEntity<>(HttpStatus.NO_CONTENT);
} finally {
sessionNotActive.closingLock.writeLock().unlock();
}
} else { } else {
return new ResponseEntity<>(HttpStatus.NOT_FOUND); return new ResponseEntity<>(HttpStatus.NOT_FOUND);
} }
@ -310,7 +318,8 @@ public class SessionRestController {
HttpStatus.BAD_REQUEST); HttpStatus.BAD_REQUEST);
} }
if (this.sessionManager.getSessionWithNotActive(sessionId) == null) { final Session session = this.sessionManager.getSessionWithNotActive(sessionId);
if (session == null) {
return this.generateErrorResponse("Session " + sessionId + " not found", "/api/tokens", return this.generateErrorResponse("Session " + sessionId + " not found", "/api/tokens",
HttpStatus.NOT_FOUND); HttpStatus.NOT_FOUND);
} }
@ -350,6 +359,9 @@ public class SessionRestController {
metadata = (metadata != null) ? metadata : ""; metadata = (metadata != null) ? metadata : "";
// While closing a session tokens can't be generated
if (session.closingLock.readLock().tryLock()) {
try {
String token = sessionManager.newToken(sessionId, role, metadata, kurentoTokenOptions); String token = sessionManager.newToken(sessionId, role, metadata, kurentoTokenOptions);
JsonObject responseJson = new JsonObject(); JsonObject responseJson = new JsonObject();
@ -387,6 +399,14 @@ public class SessionRestController {
responseJson.add("kurentoOptions", kurentoOptsResponse); responseJson.add("kurentoOptions", kurentoOptsResponse);
} }
return new ResponseEntity<>(responseJson.toString(), getResponseHeaders(), HttpStatus.OK); return new ResponseEntity<>(responseJson.toString(), getResponseHeaders(), HttpStatus.OK);
} finally {
session.closingLock.readLock().unlock();
}
} else {
log.error("Session {} is in the process of closing. Token couldn't be generated", sessionId);
return this.generateErrorResponse("Session " + sessionId + " not found", "/api/tokens",
HttpStatus.NOT_FOUND);
}
} }
@RequestMapping(value = "/recordings/start", method = RequestMethod.POST) @RequestMapping(value = "/recordings/start", method = RequestMethod.POST)
@ -717,7 +737,12 @@ public class SessionRestController {
videoActive, typeOfVideo, frameRate, videoDimensions, null, false, rtspUri, adaptativeBitrate, videoActive, typeOfVideo, frameRate, videoDimensions, null, false, rtspUri, adaptativeBitrate,
onlyPlayWithSubscribers); onlyPlayWithSubscribers);
// While closing a session IP cameras can't be published
if (session.closingLock.readLock().tryLock()) {
try { try {
if (session.isClosed()) {
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
Participant ipcamParticipant = this.sessionManager.publishIpcam(session, mediaOptions, data); Participant ipcamParticipant = this.sessionManager.publishIpcam(session, mediaOptions, data);
return new ResponseEntity<>(ipcamParticipant.toJson().toString(), getResponseHeaders(), HttpStatus.OK); return new ResponseEntity<>(ipcamParticipant.toJson().toString(), getResponseHeaders(), HttpStatus.OK);
} catch (MalformedURLException e) { } catch (MalformedURLException e) {
@ -726,6 +751,11 @@ public class SessionRestController {
} catch (Exception e) { } catch (Exception e) {
return this.generateErrorResponse(e.getMessage(), "/api/sessions/" + sessionId + "/connection", return this.generateErrorResponse(e.getMessage(), "/api/sessions/" + sessionId + "/connection",
HttpStatus.INTERNAL_SERVER_ERROR); HttpStatus.INTERNAL_SERVER_ERROR);
} finally {
session.closingLock.readLock().unlock();
}
} else {
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
} }
} }

View File

@ -245,10 +245,14 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
Token tokenObj = sessionManager.consumeToken(sessionId, participantPrivatetId, token); Token tokenObj = sessionManager.consumeToken(sessionId, participantPrivatetId, token);
Participant participant; Participant participant;
io.openvidu.server.core.Session session = sessionManager.getSessionWithNotActive(sessionId);
// While closing a session users can't join
if (session.closingLock.readLock().tryLock()) {
try {
if (generateRecorderParticipant) { if (generateRecorderParticipant) {
participant = sessionManager.newRecorderParticipant(sessionId, participantPrivatetId, tokenObj, participant = sessionManager.newRecorderParticipant(sessionId, participantPrivatetId,
clientMetadata); tokenObj, clientMetadata);
} else { } else {
participant = sessionManager.newParticipant(sessionId, participantPrivatetId, tokenObj, participant = sessionManager.newParticipant(sessionId, participantPrivatetId, tokenObj,
clientMetadata, location, platform, clientMetadata, location, platform,
@ -258,6 +262,16 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
rpcConnection.setSessionId(sessionId); rpcConnection.setSessionId(sessionId);
sessionManager.joinRoom(participant, sessionId, request.getId()); sessionManager.joinRoom(participant, sessionId, request.getId());
} finally {
session.closingLock.readLock().unlock();
}
} else {
log.error(
"ERROR: The session {} is in the process of closing while participant {} (privateId) was joining",
sessionId, participantPrivatetId);
throw new OpenViduException(Code.ROOM_CLOSED_ERROR_CODE,
"Unable to join the session. Session " + sessionId + " was in the process of closing");
}
} else { } else {
log.error("ERROR: Metadata format set in client-side is incorrect"); log.error("ERROR: Metadata format set in client-side is incorrect");
throw new OpenViduException(Code.USER_METADATA_FORMAT_INVALID_ERROR_CODE, throw new OpenViduException(Code.USER_METADATA_FORMAT_INVALID_ERROR_CODE,