openvidu-server: selectAndRemoveKmsLock lock() to tryLock()

pull/419/head
pabloFuente 2020-04-01 18:35:14 +02:00
parent 9dcd8d73ae
commit e647b286c6
1 changed files with 48 additions and 27 deletions

View File

@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -81,11 +82,12 @@ public class KurentoSessionManager extends SessionManager {
@Autowired @Autowired
private KurentoParticipantEndpointConfig kurentoEndpointConfig; private KurentoParticipantEndpointConfig kurentoEndpointConfig;
private final int MS_MAX_LOCK_WAIT = 15;
@Override @Override
/* Protected by Session.closingLock.readLock */ /* Protected by Session.closingLock.readLock */
public synchronized void joinRoom(Participant participant, String sessionId, Integer transactionId) { public synchronized void joinRoom(Participant participant, String sessionId, Integer transactionId) {
Set<Participant> existingParticipants = null; Set<Participant> existingParticipants = null;
boolean lockAcquired = false;
try { try {
KurentoSession kSession = (KurentoSession) sessions.get(sessionId); KurentoSession kSession = (KurentoSession) sessions.get(sessionId);
@ -103,21 +105,39 @@ public class KurentoSessionManager extends SessionManager {
openviduConfig, recordingManager); openviduConfig, recordingManager);
} }
lockAcquired = true;
KmsManager.selectAndRemoveKmsLock.lock();
Kms lessLoadedKms = null;
try { try {
lessLoadedKms = this.kmsManager.getLessLoadedAndRunningKms(); if (KmsManager.selectAndRemoveKmsLock.tryLock(MS_MAX_LOCK_WAIT, TimeUnit.SECONDS)) {
} catch (NoSuchElementException e) { try {
// Restore session not active Kms lessLoadedKms = null;
this.cleanCollections(sessionId); try {
this.storeSessionNotActive(sessionNotActive); lessLoadedKms = this.kmsManager.getLessLoadedAndRunningKms();
throw new OpenViduException(Code.ROOM_CANNOT_BE_CREATED_ERROR_CODE, } catch (NoSuchElementException e) {
"There is no available Media Node where to initialize session '" + sessionId + "'"); // Restore session not active
this.cleanCollections(sessionId);
this.storeSessionNotActive(sessionNotActive);
throw new OpenViduException(Code.ROOM_CANNOT_BE_CREATED_ERROR_CODE,
"There is no available Media Node where to initialize session '" + sessionId
+ "'");
}
log.info("KMS less loaded is {} with a load of {}", lessLoadedKms.getUri(),
lessLoadedKms.getLoad());
kSession = createSession(sessionNotActive, lessLoadedKms);
} finally {
KmsManager.selectAndRemoveKmsLock.unlock();
}
} else {
String error = "Timeout of " + MS_MAX_LOCK_WAIT + " seconds waiting to acquire lock";
log.error(error);
sessionEventsHandler.onParticipantJoined(participant, sessionId, null, transactionId,
new OpenViduException(Code.ROOM_CANNOT_BE_CREATED_ERROR_CODE, error));
return;
}
} catch (InterruptedException e) {
String error = "'" + participant.getParticipantPublicId() + "' is trying to join session '"
+ sessionId + "' but was interrupted while waiting to acquire lock: " + e.getMessage();
log.error(error);
throw new OpenViduException(Code.ROOM_CLOSED_ERROR_CODE, error);
} }
log.info("KMS less loaded is {} with a load of {}", lessLoadedKms.getUri(), lessLoadedKms.getLoad());
kSession = createSession(sessionNotActive, lessLoadedKms);
} }
if (kSession.isClosed()) { if (kSession.isClosed()) {
@ -133,10 +153,6 @@ public class KurentoSessionManager extends SessionManager {
log.warn("PARTICIPANT {}: Error joining/creating session {}", participant.getParticipantPublicId(), log.warn("PARTICIPANT {}: Error joining/creating session {}", participant.getParticipantPublicId(),
sessionId, e); sessionId, e);
sessionEventsHandler.onParticipantJoined(participant, sessionId, null, transactionId, e); sessionEventsHandler.onParticipantJoined(participant, sessionId, null, transactionId, e);
} finally {
if (lockAcquired) {
KmsManager.selectAndRemoveKmsLock.unlock();
}
} }
if (existingParticipants != null) { if (existingParticipants != null) {
sessionEventsHandler.onParticipantJoined(participant, sessionId, existingParticipants, transactionId, null); sessionEventsHandler.onParticipantJoined(participant, sessionId, existingParticipants, transactionId, null);
@ -146,7 +162,9 @@ public class KurentoSessionManager extends SessionManager {
@Override @Override
public synchronized boolean leaveRoom(Participant participant, Integer transactionId, EndReason reason, public synchronized boolean leaveRoom(Participant participant, Integer transactionId, EndReason reason,
boolean closeWebSocket) { boolean closeWebSocket) {
log.debug("Request [LEAVE_ROOM] ({})", participant.getParticipantPublicId()); log.info("Request [LEAVE_ROOM] for participant {} of session {} with reason {}",
participant.getParticipantPublicId(), participant.getSessionId(),
reason != null ? reason.name() : "NULL");
boolean sessionClosedByLastParticipant = false; boolean sessionClosedByLastParticipant = false;
@ -172,15 +190,18 @@ public class KurentoSessionManager extends SessionManager {
this.coturnCredentialsService.deleteUser(p.getToken().getTurnCredentials().getUsername()); this.coturnCredentialsService.deleteUser(p.getToken().getTurnCredentials().getUsername());
} }
boolean stillParticipant = false; // TODO: why is this necessary??
for (Session s : sessions.values()) { if (insecureUsers.containsKey(p.getParticipantPrivateId())) {
if (s.getParticipantByPrivateId(p.getParticipantPrivateId()) != null) { boolean stillParticipant = false;
stillParticipant = true; for (Session s : sessions.values()) {
break; if (!s.isClosed() && (s.getParticipantByPrivateId(p.getParticipantPrivateId()) != null)) {
stillParticipant = true;
break;
}
}
if (!stillParticipant) {
insecureUsers.remove(p.getParticipantPrivateId());
} }
}
if (!stillParticipant) {
insecureUsers.remove(p.getParticipantPrivateId());
} }
} }