openvidu-server: new Session lock protecting joinRoom/leaveRoom methods

pull/431/head
pabloFuente 2020-04-09 19:42:45 +02:00
parent 8ab37ab97d
commit a08d92e745
3 changed files with 173 additions and 118 deletions

View File

@ -23,7 +23,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function; import java.util.function.Function;
@ -70,6 +72,12 @@ public class Session implements SessionInterface {
*/ */
public ReadWriteLock closingLock = new ReentrantReadWriteLock(); public ReadWriteLock closingLock = new ReentrantReadWriteLock();
/**
* This lock protects the operations of SessionManager#joinRoom and
* SessionManager#leaveRoom
*/
public Lock joinLeaveLock = new ReentrantLock();
public final AtomicBoolean recordingManuallyStopped = new AtomicBoolean(false); public final AtomicBoolean recordingManuallyStopped = new AtomicBoolean(false);
public Session(Session previousSession) { public Session(Session previousSession) {

View File

@ -88,11 +88,9 @@ public class KurentoSessionManager extends SessionManager {
/* Protected by Session.closingLock.readLock */ /* Protected by Session.closingLock.readLock */
public void joinRoom(Participant participant, String sessionId, Integer transactionId) { public void joinRoom(Participant participant, String sessionId, Integer transactionId) {
Set<Participant> existingParticipants = null; Set<Participant> existingParticipants = null;
boolean lockAcquired = false;
try { try {
KurentoSession kSession = (KurentoSession) sessions.get(sessionId); KurentoSession kSession = (KurentoSession) sessions.get(sessionId);
if (kSession == null) { if (kSession == null) {
// First user connecting to the session // First user connecting to the session
Session sessionNotActive = sessionsNotActive.get(sessionId); Session sessionNotActive = sessionsNotActive.get(sessionId);
@ -108,7 +106,11 @@ public class KurentoSessionManager extends SessionManager {
try { try {
if (KmsManager.selectAndRemoveKmsLock.tryLock(MS_MAX_LOCK_WAIT, TimeUnit.SECONDS)) { if (KmsManager.selectAndRemoveKmsLock.tryLock(MS_MAX_LOCK_WAIT, TimeUnit.SECONDS)) {
lockAcquired = true; try {
kSession = (KurentoSession) sessions.get(sessionId);
if (kSession == null) {
// Session still null. It was not created by other thread while waiting for lock
Kms lessLoadedKms = null; Kms lessLoadedKms = null;
try { try {
lessLoadedKms = this.kmsManager.getLessLoadedConnectedAndRunningKms(); lessLoadedKms = this.kmsManager.getLessLoadedConnectedAndRunningKms();
@ -117,11 +119,16 @@ public class KurentoSessionManager extends SessionManager {
this.cleanCollections(sessionId); this.cleanCollections(sessionId);
this.storeSessionNotActive(sessionNotActive); this.storeSessionNotActive(sessionNotActive);
throw new OpenViduException(Code.ROOM_CANNOT_BE_CREATED_ERROR_CODE, throw new OpenViduException(Code.ROOM_CANNOT_BE_CREATED_ERROR_CODE,
"There is no available Media Node where to initialize session '" + sessionId + "'"); "There is no available Media Node where to initialize session '" + sessionId
+ "'");
} }
log.info("KMS less loaded is {} with a load of {}", lessLoadedKms.getUri(), log.info("KMS less loaded is {} with a load of {}", lessLoadedKms.getUri(),
lessLoadedKms.getLoad()); lessLoadedKms.getLoad());
kSession = createSession(sessionNotActive, lessLoadedKms); kSession = createSession(sessionNotActive, lessLoadedKms);
}
} finally {
KmsManager.selectAndRemoveKmsLock.unlock();
}
} else { } else {
String error = "Timeout of " + MS_MAX_LOCK_WAIT + " seconds waiting to acquire lock"; String error = "Timeout of " + MS_MAX_LOCK_WAIT + " seconds waiting to acquire lock";
@ -145,19 +152,35 @@ public class KurentoSessionManager extends SessionManager {
+ "' is trying to join session '" + sessionId + "' but it is closing"); + "' is trying to join session '" + sessionId + "' but it is closing");
} }
try {
if (kSession.joinLeaveLock.tryLock(15, TimeUnit.SECONDS)) {
try {
existingParticipants = getParticipants(sessionId); existingParticipants = getParticipants(sessionId);
kSession.join(participant); kSession.join(participant);
sessionEventsHandler.onParticipantJoined(participant, sessionId, existingParticipants,
transactionId, null);
} finally {
kSession.joinLeaveLock.unlock();
}
} else {
log.error(
"Timeout waiting for join-leave Session lock to be available for participant {} of session {} in joinRoom",
participant.getParticipantPublicId(), sessionId);
sessionEventsHandler.onParticipantJoined(participant, sessionId, null, transactionId,
new OpenViduException(Code.GENERIC_ERROR_CODE, "Timeout waiting for Session lock"));
}
} catch (InterruptedException e) {
log.error(
"InterruptedException waiting for join-leave Session lock to be available for participant {} of session {} in joinRoom",
participant.getParticipantPublicId(), sessionId);
sessionEventsHandler.onParticipantJoined(participant, sessionId, null, transactionId,
new OpenViduException(Code.GENERIC_ERROR_CODE,
"InterruptedException waiting for Session lock"));
}
} catch (OpenViduException e) { } catch (OpenViduException e) {
log.warn("PARTICIPANT {}: Error joining/creating session {}", participant.getParticipantPublicId(), log.error("PARTICIPANT {}: Error joining/creating session {}", participant.getParticipantPublicId(),
sessionId, e); sessionId, e);
sessionEventsHandler.onParticipantJoined(participant, sessionId, null, transactionId, e); sessionEventsHandler.onParticipantJoined(participant, sessionId, null, transactionId, e);
} finally {
if (lockAcquired) {
KmsManager.selectAndRemoveKmsLock.unlock();
}
}
if (existingParticipants != null) {
sessionEventsHandler.onParticipantJoined(participant, sessionId, existingParticipants, transactionId, null);
} }
} }
@ -179,6 +202,11 @@ public class KurentoSessionManager extends SessionManager {
throw new OpenViduException(Code.ROOM_CLOSED_ERROR_CODE, "'" + participant.getParticipantPublicId() throw new OpenViduException(Code.ROOM_CLOSED_ERROR_CODE, "'" + participant.getParticipantPublicId()
+ "' is trying to leave from session '" + sessionId + "' but it is closing"); + "' is trying to leave from session '" + sessionId + "' but it is closing");
} }
try {
if (session.joinLeaveLock.tryLock(15, TimeUnit.SECONDS)) {
try {
session.leave(participant.getParticipantPrivateId(), reason); session.leave(participant.getParticipantPrivateId(), reason);
// Update control data structures // Update control data structures
@ -195,7 +223,8 @@ public class KurentoSessionManager extends SessionManager {
if (insecureUsers.containsKey(p.getParticipantPrivateId())) { if (insecureUsers.containsKey(p.getParticipantPrivateId())) {
boolean stillParticipant = false; boolean stillParticipant = false;
for (Session s : sessions.values()) { for (Session s : sessions.values()) {
if (!s.isClosed() && (s.getParticipantByPrivateId(p.getParticipantPrivateId()) != null)) { if (!s.isClosed()
&& (s.getParticipantByPrivateId(p.getParticipantPrivateId()) != null)) {
stillParticipant = true; stillParticipant = true;
break; break;
} }
@ -215,8 +244,8 @@ public class KurentoSessionManager extends SessionManager {
log.info("Possible collision when closing the session '{}' (not found)", sessionId); log.info("Possible collision when closing the session '{}' (not found)", sessionId);
remainingParticipants = Collections.emptySet(); remainingParticipants = Collections.emptySet();
} }
sessionEventsHandler.onParticipantLeft(participant, sessionId, remainingParticipants, transactionId, null, sessionEventsHandler.onParticipantLeft(participant, sessionId, remainingParticipants, transactionId,
reason); null, reason);
if (!EndReason.sessionClosedByServer.equals(reason)) { if (!EndReason.sessionClosedByServer.equals(reason)) {
// If session is closed by a call to "DELETE /api/sessions" do NOT stop the // If session is closed by a call to "DELETE /api/sessions" do NOT stop the
@ -239,7 +268,8 @@ public class KurentoSessionManager extends SessionManager {
if (session.isClosed()) { if (session.isClosed()) {
return false; return false;
} }
log.info("No more participants in session '{}', removing it and closing it", sessionId); log.info("No more participants in session '{}', removing it and closing it",
sessionId);
this.closeSessionAndEmptyCollections(session, reason, true); this.closeSessionAndEmptyCollections(session, reason, true);
sessionClosedByLastParticipant = true; sessionClosedByLastParticipant = true;
} finally { } finally {
@ -262,7 +292,8 @@ public class KurentoSessionManager extends SessionManager {
&& ProtocolElements.RECORDER_PARTICIPANT_PUBLICID && ProtocolElements.RECORDER_PARTICIPANT_PUBLICID
.equals(remainingParticipants.iterator().next().getParticipantPublicId())) { .equals(remainingParticipants.iterator().next().getParticipantPublicId())) {
// RECORDER participant is the last one standing. Start countdown // RECORDER participant is the last one standing. Start countdown
log.info("Last participant left. Starting {} seconds countdown for stopping recording of session {}", log.info(
"Last participant left. Starting {} seconds countdown for stopping recording of session {}",
this.openviduConfig.getOpenviduRecordingAutostopTimeout(), sessionId); this.openviduConfig.getOpenviduRecordingAutostopTimeout(), sessionId);
recordingManager.initAutomaticRecordingStopThread(session); recordingManager.initAutomaticRecordingStopThread(session);
} }
@ -274,6 +305,22 @@ public class KurentoSessionManager extends SessionManager {
} }
return sessionClosedByLastParticipant; return sessionClosedByLastParticipant;
} finally {
session.joinLeaveLock.unlock();
}
} else {
log.error(
"Timeout waiting for join-leave Session lock to be available for participant {} of session {} in leaveRoom",
kParticipant.getParticipantPublicId(), session.getSessionId());
return false;
}
} catch (InterruptedException e) {
log.error(
"Timeout waiting for join-leave Session lock to be available for participant {} of session {} in leaveRoom",
kParticipant.getParticipantPublicId(), session.getSessionId());
return false;
}
} }
/** /**

View File

@ -206,13 +206,13 @@ public abstract class KmsManager {
kms.setKurentoClientConnected(false); kms.setKurentoClientConnected(false);
kms.setTimeOfKurentoClientDisconnection(System.currentTimeMillis()); kms.setTimeOfKurentoClientDisconnection(System.currentTimeMillis());
if(kms.getKurentoClient().isClosed()) { if (kms.getKurentoClient().isClosed()) {
log.info("Kurento Client \"disconnected\" event for KMS {} [{}]. Closed explicitely", kms.getUri(), log.info("Kurento Client \"disconnected\" event for KMS {} [{}]. Closed explicitely", kms.getUri(),
kms.getKurentoClient().toString()); kms.getKurentoClient().toString());
return; return;
} else { } else {
log.info("Kurento Client \"disconnected\" event for KMS {} [{}]. Waiting reconnection", kms.getUri(), log.info("Kurento Client \"disconnected\" event for KMS {} [{}]. Waiting reconnection",
kms.getKurentoClient().toString()); kms.getUri(), kms.getKurentoClient().toString());
} }
// TODO: this is a fix for the lack of reconnected event // TODO: this is a fix for the lack of reconnected event
@ -235,7 +235,7 @@ public abstract class KmsManager {
return; return;
} }
if(kms.getKurentoClient().isClosed()) { if (kms.getKurentoClient().isClosed()) {
log.info( log.info(
"Timer of KMS with uri {} and KurentoClient [{}] has been closed. Cancelling Timer", "Timer of KMS with uri {} and KurentoClient [{}] has been closed. Cancelling Timer",
kms.getUri(), kms.getKurentoClient().toString()); kms.getUri(), kms.getKurentoClient().toString());