openvidu-server: Session closingLock from .lock() to .tryLock()

pull/431/head
pabloFuente 2020-04-09 13:20:29 +02:00
parent cd1aad4eff
commit 7636d8a516
5 changed files with 145 additions and 74 deletions

View File

@ -28,6 +28,7 @@ import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
@ -447,16 +448,27 @@ public abstract class SessionManager {
long sessionExistsSince = currentMillis - sessionNotActive.getStartTime();
if (sessionExistsSince > (openviduConfig.getSessionGarbageThreshold() * 1000)) {
try {
sessionNotActive.closingLock.writeLock().lock();
if (sessions.containsKey(sessionId)) {
// The session passed to active during lock wait
continue;
if (sessionNotActive.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
try {
if (sessions.containsKey(sessionId)) {
// The session passed to active during lock wait
continue;
}
iter.remove();
cleanCollections(sessionId);
log.info("Non active session {} cleaned up by garbage collector", sessionId);
} finally {
sessionNotActive.closingLock.writeLock().unlock();
}
} else {
log.error(
"Timeout waiting for Session closing lock to be available for garbage collector to clean session {}",
sessionId);
}
iter.remove();
cleanCollections(sessionId);
log.info("Non active session {} cleaned up by garbage collector", sessionId);
} finally {
sessionNotActive.closingLock.writeLock().unlock();
} catch (InterruptedException e) {
log.error(
"InterruptedException while waiting for Session closing lock to be available for garbage collector to clean session {}",
sessionId);
}
}
}
@ -515,13 +527,20 @@ public abstract class SessionManager {
// to the session. That is: if the session was in the automatic recording stop
// timeout with INDIVIDUAL recording (no docker participant connected)
try {
session.closingLock.writeLock().lock();
if (session.isClosed()) {
return;
if (session.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
try {
if (session.isClosed()) {
return;
}
this.closeSessionAndEmptyCollections(session, reason, true);
} finally {
session.closingLock.writeLock().unlock();
}
} else {
log.error("Timeout waiting for Session {} closing lock to be available", sessionId);
}
this.closeSessionAndEmptyCollections(session, reason, true);
} finally {
session.closingLock.writeLock().unlock();
} catch (InterruptedException e) {
log.error("InterruptedException while waiting for Session {} closing lock to be available", sessionId);
}
}
}

View File

@ -331,12 +331,18 @@ public class KurentoParticipant extends Participant {
} finally {
pub.closingLock.writeLock().unlock();
}
} else {
log.error(
"Timeout waiting for PublisherEndpoint closing lock of participant {} to be available for participant {} to call cancelReceveivingMedia",
senderName, this.getParticipantPublicId());
}
} catch (InterruptedException e) {
subscribers.remove(senderName);
log.error(
"Timeout wating for PublisherEndpoint closing lock of participant {} to be available for participant {} to call cancelReceveivingMedia",
"InterruptedException while waiting for PublisherEndpoint closing lock of participant {} to be available for participant {} to call cancelReceveivingMedia",
senderName, this.getParticipantPublicId());
} finally {
// Always clean map
subscribers.remove(senderName);
}
}
}

View File

@ -86,7 +86,7 @@ public class KurentoSessionManager extends SessionManager {
@Override
/* Protected by Session.closingLock.readLock */
public synchronized void joinRoom(Participant participant, String sessionId, Integer transactionId) {
public void joinRoom(Participant participant, String sessionId, Integer transactionId) {
Set<Participant> existingParticipants = null;
boolean lockAcquired = false;
try {
@ -162,8 +162,7 @@ public class KurentoSessionManager extends SessionManager {
}
@Override
public synchronized boolean leaveRoom(Participant participant, Integer transactionId, EndReason reason,
boolean closeWebSocket) {
public boolean leaveRoom(Participant participant, Integer transactionId, EndReason reason, boolean closeWebSocket) {
log.info("Request [LEAVE_ROOM] for participant {} of session {} with reason {}",
participant.getParticipantPublicId(), participant.getSessionId(),
reason != null ? reason.name() : "NULL");
@ -235,17 +234,27 @@ public class KurentoSessionManager extends SessionManager {
recordingManager.initAutomaticRecordingStopThread(session);
} else {
try {
session.closingLock.writeLock().lock();
if (session.isClosed()) {
return false;
if (session.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
try {
if (session.isClosed()) {
return false;
}
log.info("No more participants in session '{}', removing it and closing it", sessionId);
this.closeSessionAndEmptyCollections(session, reason, true);
sessionClosedByLastParticipant = true;
} finally {
session.closingLock.writeLock().unlock();
}
} else {
log.error(
"Timeout waiting for Session {} closing lock to be available for closing as last participant left",
sessionId);
}
log.info("No more participants in session '{}', removing it and closing it", sessionId);
this.closeSessionAndEmptyCollections(session, reason, true);
sessionClosedByLastParticipant = true;
} finally {
session.closingLock.writeLock().unlock();
} catch (InterruptedException e) {
log.error(
"InterruptedException while waiting for Session {} closing lock to be available for closing as last participant left",
sessionId);
}
}
} else if (remainingParticipants.size() == 1 && openviduConfig.isRecordingModuleEnabled()
&& MediaMode.ROUTED.equals(session.getSessionProperties().mediaMode())

View File

@ -480,34 +480,46 @@ public class RecordingManager {
recordingId, this.openviduConfig.getOpenviduRecordingAutostopTimeout());
if (this.automaticRecordingStopThreads.remove(session.getSessionId()) != null) {
boolean alreadyUnlocked = false;
try {
session.closingLock.writeLock().lock();
if (session.isClosed()) {
return;
}
if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) {
// Close session if there are no participants connected (RECORDER does not
// count) and publishing
log.info("Closing session {} after automatic stop of recording {}", session.getSessionId(),
recordingId);
sessionManager.closeSessionAndEmptyCollections(session, EndReason.automaticStop, true);
if (session.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
try {
if (session.isClosed()) {
return;
}
if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) {
// Close session if there are no participants connected (RECORDER does not
// count) and publishing
log.info("Closing session {} after automatic stop of recording {}",
session.getSessionId(), recordingId);
sessionManager.closeSessionAndEmptyCollections(session, EndReason.automaticStop,
true);
} else {
// There are users connected, but no one is publishing
// We don't need the lock if session is not closing
session.closingLock.writeLock().unlock();
alreadyUnlocked = true;
log.info(
"Automatic stopping recording {}. There are users connected to session {}, but no one is publishing",
recordingId, session.getSessionId());
this.stopRecording(session, recordingId, EndReason.automaticStop);
}
} finally {
if (!alreadyUnlocked) {
session.closingLock.writeLock().unlock();
}
}
} else {
// There are users connected, but no one is publishing
session.closingLock.writeLock().unlock(); // We don't need the lock if session's not closing
alreadyUnlocked = true;
log.info(
"Automatic stopping recording {}. There are users connected to session {}, but no one is publishing",
recordingId, session.getSessionId());
this.stopRecording(session, recordingId, EndReason.automaticStop);
}
} finally {
if (!alreadyUnlocked) {
session.closingLock.writeLock().unlock();
log.error(
"Timeout waiting for Session {} closing lock to be available for automatic recording stop thred",
session.getSessionId());
}
} catch (InterruptedException e) {
log.error(
"InterruptedException while waiting for Session {} closing lock to be available for automatic recording stop thred",
session.getSessionId());
}
} else {
// This code shouldn't be reachable
log.warn("Recording {} was already automatically stopped by a previous thread", recordingId);
@ -523,23 +535,34 @@ public class RecordingManager {
if (future != null) {
boolean cancelled = future.cancel(false);
try {
session.closingLock.writeLock().lock();
if (session.isClosed()) {
return false;
}
if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) {
// Close session if there are no participants connected (except for RECORDER).
// This code will only be executed if recording is manually stopped during the
// automatic stop timeout, so the session must be also closed
log.info(
"Ongoing recording of session {} was explicetly stopped within timeout for automatic recording stop. Closing session",
if (session.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
try {
if (session.isClosed()) {
return false;
}
if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) {
// Close session if there are no participants connected (except for RECORDER).
// This code will only be executed if recording is manually stopped during the
// automatic stop timeout, so the session must be also closed
log.info(
"Ongoing recording of session {} was explicetly stopped within timeout for automatic recording stop. Closing session",
session.getSessionId());
sessionManager.closeSessionAndEmptyCollections(session, reason, false);
}
} finally {
session.closingLock.writeLock().unlock();
}
} else {
log.error(
"Timeout waiting for Session {} closing lock to be available for aborting automatic recording stop thred",
session.getSessionId());
sessionManager.closeSessionAndEmptyCollections(session, reason, false);
}
return cancelled;
} finally {
session.closingLock.writeLock().unlock();
} catch (InterruptedException e) {
log.error(
"InterruptedException while waiting for Session {} closing lock to be available for aborting automatic recording stop thred",
session.getSessionId());
}
return cancelled;
} else {
return true;
}

View File

@ -21,6 +21,7 @@ import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
@ -234,15 +235,28 @@ public class SessionRestController {
Session sessionNotActive = this.sessionManager.getSessionNotActive(sessionId);
if (sessionNotActive != null) {
try {
sessionNotActive.closingLock.writeLock().lock();
if (sessionNotActive.isClosed()) {
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
if (sessionNotActive.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
try {
if (sessionNotActive.isClosed()) {
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
this.sessionManager.closeSessionAndEmptyCollections(sessionNotActive,
EndReason.sessionClosedByServer, true);
return new ResponseEntity<>(HttpStatus.NO_CONTENT);
} finally {
sessionNotActive.closingLock.writeLock().unlock();
}
} else {
String errorMsg = "Timeout waiting for Session " + sessionId
+ " closing lock to be available for closing from DELETE /api/sessions";
log.error(errorMsg);
return this.generateErrorResponse(errorMsg, "/api/sessions", HttpStatus.BAD_REQUEST);
}
this.sessionManager.closeSessionAndEmptyCollections(sessionNotActive, EndReason.sessionClosedByServer,
true);
return new ResponseEntity<>(HttpStatus.NO_CONTENT);
} finally {
sessionNotActive.closingLock.writeLock().unlock();
} catch (InterruptedException e) {
String errorMsg = "InterruptedException while waiting for Session " + sessionId
+ " closing lock to be available for closing from DELETE /api/sessions";
log.error(errorMsg);
return this.generateErrorResponse(errorMsg, "/api/sessions", HttpStatus.BAD_REQUEST);
}
} else {
return new ResponseEntity<>(HttpStatus.NOT_FOUND);