diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java index 628ad618..fa71a5f3 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java @@ -53,6 +53,11 @@ public class FixedOneKmsManager extends KmsManager { return Arrays.asList(kms); } + @Override + public boolean isMediaNodeAvailableForSession(String mediaNodeId) { + return true; + } + @Override public boolean isMediaNodeAvailableForRecording(String mediaNodeId) { return true; diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index 35dc5a84..7be250c8 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -42,6 +42,8 @@ import org.springframework.beans.factory.annotation.Autowired; import com.google.gson.JsonObject; +import io.openvidu.client.OpenViduException; +import io.openvidu.client.OpenViduException.Code; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.IdentifierPrefixes; import io.openvidu.server.kurento.core.KurentoSession; @@ -349,9 +351,72 @@ public abstract class KmsManager { return intervals[intervals.length - 1][0]; } + public void incrementActiveRecordings(String mediaNodeId) throws OpenViduException { + try { + if (KmsManager.selectAndRemoveKmsLock.tryLock(KmsManager.MAX_SECONDS_LOCK_WAIT, TimeUnit.SECONDS)) { + try { + final Kms kms = this.getKms(mediaNodeId); + if (kms == null) { + throw new OpenViduException(Code.MEDIA_NODE_NOT_FOUND, + "Media Node " + mediaNodeId + " does not exist"); + } + kms.incrementActiveRecordings(); + log.info("Incremented number of active recordings in Media Node {}. Current number: {}", + mediaNodeId, kms.getActiveRecordings()); + + if (!isMediaNodeAvailableForRecording(mediaNodeId)) { + throw new OpenViduException(Code.MEDIA_NODE_STATUS_WRONG, + "Media Node " + mediaNodeId + " does not allow starting new Recordings"); + } + + } finally { + KmsManager.selectAndRemoveKmsLock.unlock(); + } + } else { + throw new OpenViduException(Code.GENERIC_ERROR_CODE, + "selectAndRemoveKmsLock couldn't be acquired within " + KmsManager.MAX_SECONDS_LOCK_WAIT + + " seconds when incrementing active recordings of Media Node " + mediaNodeId); + } + } catch (InterruptedException e) { + throw new OpenViduException(Code.GENERIC_ERROR_CODE, + "InterruptedException waiting to acquire selectAndRemoveKmsLock when incrementing active recordings of Media Node " + + mediaNodeId + ": " + e.getMessage()); + } + } + + public void decrementActiveRecordings(String mediaNodeId) throws OpenViduException { + try { + if (KmsManager.selectAndRemoveKmsLock.tryLock(KmsManager.MAX_SECONDS_LOCK_WAIT, TimeUnit.SECONDS)) { + try { + final Kms kms = this.getKms(mediaNodeId); + if (kms != null) { + kms.decrementActiveRecordings(); + log.info("Decremented number of active recordings in Media Node {}. Current number: {}", + mediaNodeId, kms.getActiveRecordings()); + } else { + log.warn("Trying to decrement active recordings of Media Node {} but cannot be found", + mediaNodeId); + } + } finally { + KmsManager.selectAndRemoveKmsLock.unlock(); + } + } else { + throw new OpenViduException(Code.GENERIC_ERROR_CODE, + "selectAndRemoveKmsLock couldn't be acquired within " + KmsManager.MAX_SECONDS_LOCK_WAIT + + " seconds when decrementing active recordings of Media Node " + mediaNodeId); + } + } catch (InterruptedException e) { + throw new OpenViduException(Code.GENERIC_ERROR_CODE, + "InterruptedException waiting to acquire selectAndRemoveKmsLock when decrementing active recordings of Media Node " + + mediaNodeId + ": " + e.getMessage()); + } + } + public abstract List initializeKurentoClients(List kmsProperties, boolean disconnectUponFailure) throws Exception; + public abstract boolean isMediaNodeAvailableForSession(String mediaNodeId); + public abstract boolean isMediaNodeAvailableForRecording(String mediaNodeId); @PostConstruct diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedQuickStartRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedQuickStartRecordingService.java index 391e29bd..c9dc6e6f 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedQuickStartRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedQuickStartRecordingService.java @@ -24,6 +24,7 @@ import io.openvidu.server.cdr.CallDetailRecord; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.EndReason; import io.openvidu.server.core.Session; +import io.openvidu.server.kurento.kms.KmsManager; import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.RecordingDownloader; import io.openvidu.server.recording.RecordingUploader; @@ -35,9 +36,10 @@ public class ComposedQuickStartRecordingService extends ComposedRecordingService private static final Logger log = LoggerFactory.getLogger(ComposedRecordingService.class); public ComposedQuickStartRecordingService(RecordingManager recordingManager, - RecordingDownloader recordingDownloader, RecordingUploader recordingUploader, CustomFileManager fileManager, - OpenviduConfig openviduConfig, CallDetailRecord cdr, DockerManager dockerManager) { - super(recordingManager, recordingDownloader, recordingUploader, fileManager, openviduConfig, cdr, + RecordingDownloader recordingDownloader, RecordingUploader recordingUploader, KmsManager kmsManager, + CustomFileManager fileManager, OpenviduConfig openviduConfig, CallDetailRecord cdr, + DockerManager dockerManager) { + super(recordingManager, recordingDownloader, recordingUploader, kmsManager, fileManager, openviduConfig, cdr, dockerManager); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java index ad654b18..c5c931a0 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java @@ -52,6 +52,7 @@ import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; import io.openvidu.server.kurento.core.KurentoParticipant; import io.openvidu.server.kurento.core.KurentoSession; +import io.openvidu.server.kurento.kms.KmsManager; import io.openvidu.server.recording.CompositeWrapper; import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.RecordingDownloader; @@ -72,9 +73,9 @@ public class ComposedRecordingService extends RecordingService { protected DockerManager dockerManager; public ComposedRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader, - RecordingUploader recordingUploader, CustomFileManager fileManager, OpenviduConfig openviduConfig, - CallDetailRecord cdr, DockerManager dockerManager) { - super(recordingManager, recordingDownloader, recordingUploader, fileManager, openviduConfig, cdr); + RecordingUploader recordingUploader, KmsManager kmsManager, CustomFileManager fileManager, + OpenviduConfig openviduConfig, CallDetailRecord cdr, DockerManager dockerManager) { + super(recordingManager, recordingDownloader, recordingUploader, kmsManager, fileManager, openviduConfig, cdr); this.dockerManager = dockerManager; } @@ -287,12 +288,14 @@ public class ComposedRecordingService extends RecordingService { } } cleanRecordingMaps(recordingAux); + + // Decrement active recordings + kmsManager.decrementActiveRecordings(recordingAux.getRecordingProperties().mediaNode()); + if (i == timeout) { log.error("Container did not launched in {} seconds", timeout / 2); return; } - // Decrement active recordings - ((KurentoSession) session).getKms().decrementActiveRecordings(); }).start(); } } else { @@ -363,7 +366,7 @@ public class ComposedRecordingService extends RecordingService { // Decrement active recordings once it is downloaded. This method will also drop // the Media Node if no more sessions or recordings and status is // waiting-idle-to-terminate - ((KurentoSession) session).getKms().decrementActiveRecordings(); + kmsManager.decrementActiveRecordings(finalRecordingArray[0].getRecordingProperties().mediaNode()); // Upload if necessary this.uploadRecording(finalRecordingArray[0], reason); @@ -592,7 +595,7 @@ public class ComposedRecordingService extends RecordingService { // Decrement active recordings once it is downloaded. This method will also drop // the Media Node if no more sessions or recordings and status is // waiting-idle-to-terminate - ((KurentoSession) session).getKms().decrementActiveRecordings(); + kmsManager.decrementActiveRecordings(recording.getRecordingProperties().mediaNode()); // Upload if necessary this.uploadRecording(recording, reason); diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java index 59161812..baebae33 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java @@ -67,7 +67,6 @@ import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; import io.openvidu.server.core.SessionEventsHandler; import io.openvidu.server.core.SessionManager; -import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.kurento.kms.Kms; import io.openvidu.server.kurento.kms.KmsManager; import io.openvidu.server.recording.Recording; @@ -168,11 +167,11 @@ public class RecordingManager { this.dockerManager.init(); this.composedRecordingService = new ComposedRecordingService(this, recordingDownloader, recordingUploader, - fileManager, openviduConfig, cdr, this.dockerManager); + kmsManager, fileManager, openviduConfig, cdr, this.dockerManager); this.composedQuickStartRecordingService = new ComposedQuickStartRecordingService(this, recordingDownloader, - recordingUploader, fileManager, openviduConfig, cdr, this.dockerManager); + recordingUploader, kmsManager, fileManager, openviduConfig, cdr, this.dockerManager); this.singleStreamRecordingService = new SingleStreamRecordingService(this, recordingDownloader, - recordingUploader, fileManager, openviduConfig, cdr); + recordingUploader, kmsManager, fileManager, openviduConfig, cdr); this.checkRecordingRequirements(this.openviduConfig.getOpenViduRecordingPath(), this.openviduConfig.getOpenviduRecordingCustomLayout()); @@ -264,13 +263,9 @@ public class RecordingManager { public Recording startRecording(Session session, RecordingProperties properties) throws OpenViduException { try { - // 1. INCREMENT ACTIVE RECORDINGS OF MEDIA NODE HERE - ((KurentoSession) session).getKms().incrementActiveRecordings(); - // 2. CHECK THAT MEDIA NODE HAS RUNNING STATUS. IF NOT THEN FAIL RECORDING START - if (!kmsManager.isMediaNodeAvailableForRecording(properties.mediaNode())) { - throw new OpenViduException(Code.MEDIA_NODE_STATUS_WRONG, - "Media Node " + properties.mediaNode() + " status is not \"running\""); - } + // INCREMENT ACTIVE RECORDINGS OF MEDIA NODE HERE. IF MEDIA NODE IS NOT + // AVAILABLE FOR STARTING NEW RECORDINGS THIS METHOD THROWS AN EXCEPTION + kmsManager.incrementActiveRecordings(properties.mediaNode()); try { if (session.recordingLock.tryLock(15, TimeUnit.SECONDS)) { @@ -328,7 +323,7 @@ public class RecordingManager { } } catch (Exception e) { // DECREMENT ACTIVE RECORDINGS OF MEDIA NODE AND TRY REMOVE MEDIA NODE HERE - ((KurentoSession) session).getKms().decrementActiveRecordings(); + kmsManager.decrementActiveRecordings(properties.mediaNode()); throw e; } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingService.java index 13cd4c8c..d705e7a5 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingService.java @@ -31,6 +31,7 @@ import io.openvidu.server.cdr.CallDetailRecord; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.EndReason; import io.openvidu.server.core.Session; +import io.openvidu.server.kurento.kms.KmsManager; import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.RecordingDownloader; import io.openvidu.server.recording.RecordingUploader; @@ -46,6 +47,7 @@ public abstract class RecordingService { protected RecordingManager recordingManager; protected RecordingDownloader recordingDownloader; protected RecordingUploader recordingUploader; + protected KmsManager kmsManager; protected CustomFileManager fileManager; protected CallDetailRecord cdr; @@ -58,11 +60,12 @@ public abstract class RecordingService { public final static String INDIVIDUAL_RECORDING_COMPRESSED_EXTENSION = ".zip"; public RecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader, - RecordingUploader recordingUploader, CustomFileManager fileManager, OpenviduConfig openviduConfig, - CallDetailRecord cdr) { + RecordingUploader recordingUploader, KmsManager kmsManager, CustomFileManager fileManager, + OpenviduConfig openviduConfig, CallDetailRecord cdr) { this.recordingManager = recordingManager; this.recordingDownloader = recordingDownloader; this.recordingUploader = recordingUploader; + this.kmsManager = kmsManager; this.fileManager = fileManager; this.openviduConfig = openviduConfig; this.cdr = cdr; diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java index dc9a0d05..42b3b33b 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java @@ -61,8 +61,8 @@ import io.openvidu.server.core.EndReason; import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; import io.openvidu.server.kurento.core.KurentoParticipant; -import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.kurento.endpoint.PublisherEndpoint; +import io.openvidu.server.kurento.kms.KmsManager; import io.openvidu.server.recording.RecorderEndpointWrapper; import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.RecordingDownloader; @@ -79,9 +79,9 @@ public class SingleStreamRecordingService extends RecordingService { private Map>> storedRecorders = new ConcurrentHashMap<>(); public SingleStreamRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader, - RecordingUploader recordingUploader, CustomFileManager fileManager, OpenviduConfig openviduConfig, - CallDetailRecord cdr) { - super(recordingManager, recordingDownloader, recordingUploader, fileManager, openviduConfig, cdr); + RecordingUploader recordingUploader, KmsManager kmsManager, CustomFileManager fileManager, + OpenviduConfig openviduConfig, CallDetailRecord cdr) { + super(recordingManager, recordingDownloader, recordingUploader, kmsManager, fileManager, openviduConfig, cdr); } @Override @@ -187,7 +187,7 @@ public class SingleStreamRecordingService extends RecordingService { // Decrement active recordings once it is downloaded. This method will also drop // the Media Node if no more sessions or recordings and status is // waiting-idle-to-terminate - ((KurentoSession) session).getKms().decrementActiveRecordings(); + kmsManager.decrementActiveRecordings(session.getMediaNodeId()); // Upload if necessary this.uploadRecording(finalRecordingArray[0], reason);