diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java index fe2dab24..f11db556 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.kurento.client.KurentoClient; @@ -62,6 +63,7 @@ public class Kms { private AtomicLong timeOfKurentoClientDisconnection = new AtomicLong(0); private Map kurentoSessions = new ConcurrentHashMap<>(); + private AtomicInteger activeRecordings = new AtomicInteger(0); public Kms(KmsProperties props, LoadManager loadManager) { this.id = props.getId(); @@ -143,6 +145,10 @@ public class Kms { this.kurentoSessions.remove(sessionId); } + public AtomicInteger getActiveRecordings() { + return this.activeRecordings; + } + public JsonObject toJson() { JsonObject json = new JsonObject(); json.addProperty("id", this.id); diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/DummyRecordingDownloader.java b/openvidu-server/src/main/java/io/openvidu/server/recording/DummyRecordingDownloader.java index d47518c9..f557c14a 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/DummyRecordingDownloader.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/DummyRecordingDownloader.java @@ -10,7 +10,6 @@ public class DummyRecordingDownloader implements RecordingDownloader { throws IOException { // Just immediately run callback function callback.run(); - return; } @Override diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java index 59cbc8f0..a991e738 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java @@ -58,6 +58,7 @@ import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.RecordingDownloader; import io.openvidu.server.recording.RecordingInfoUtils; import io.openvidu.server.utils.DockerManager; +import io.openvidu.server.utils.QuarantineKiller; public class ComposedRecordingService extends RecordingService { @@ -70,8 +71,8 @@ public class ComposedRecordingService extends RecordingService { private DockerManager dockerManager; public ComposedRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader, - OpenviduConfig openviduConfig, CallDetailRecord cdr) { - super(recordingManager, recordingDownloader, openviduConfig, cdr); + OpenviduConfig openviduConfig, CallDetailRecord cdr, QuarantineKiller quarantineKiller) { + super(recordingManager, recordingDownloader, openviduConfig, cdr, quarantineKiller); this.dockerManager = new DockerManager(); } @@ -95,6 +96,9 @@ public class ComposedRecordingService extends RecordingService { recording = this.startRecordingAudioOnly(session, recording, properties); } + // Increment active recordings + ((KurentoSession) session).getKms().getActiveRecordings().incrementAndGet(); + return recording; } @@ -341,6 +345,10 @@ public class ComposedRecordingService extends RecordingService { this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, recording, reason); } } + + // Decrement active recordings + ((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet(); + return recording; } @@ -380,7 +388,6 @@ public class ComposedRecordingService extends RecordingService { this.cleanRecordingMaps(recording); - // TODO: DOWNLOAD FILE IF SCALABILITY MODE final Recording[] finalRecordingArray = new Recording[1]; finalRecordingArray[0] = recording; try { @@ -398,6 +405,13 @@ public class ComposedRecordingService extends RecordingService { final long timestamp = System.currentTimeMillis(); cdr.recordRecordingStatusChanged(finalRecordingArray[0], reason, timestamp, finalRecordingArray[0].getStatus()); + + // Decrement active recordings once it is downloaded + ((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet(); + + // Now we can drop Media Node if waiting-idle-to-terminate + this.quarantineKiller.dropMediaNode(session.getMediaNodeId()); + }); } catch (IOException e) { log.error("Error while downloading recording {}: {}", finalRecordingArray[0].getName(), e.getMessage()); diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java index bccd1996..dabb5502 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java @@ -73,6 +73,7 @@ import io.openvidu.server.recording.RecordingDownloader; import io.openvidu.server.utils.CustomFileManager; import io.openvidu.server.utils.DockerManager; import io.openvidu.server.utils.JsonUtils; +import io.openvidu.server.utils.QuarantineKiller; public class RecordingManager { @@ -97,6 +98,9 @@ public class RecordingManager { @Autowired private KmsManager kmsManager; + @Autowired + protected QuarantineKiller quarantineKiller; + @Autowired private CallDetailRecord cdr; @@ -150,9 +154,10 @@ public class RecordingManager { RecordingManager.IMAGE_TAG = openviduConfig.getOpenViduRecordingVersion(); this.dockerManager = new DockerManager(); - this.composedRecordingService = new ComposedRecordingService(this, recordingDownloader, openviduConfig, cdr); + this.composedRecordingService = new ComposedRecordingService(this, recordingDownloader, openviduConfig, cdr, + quarantineKiller); this.singleStreamRecordingService = new SingleStreamRecordingService(this, recordingDownloader, openviduConfig, - cdr); + cdr, quarantineKiller); log.info("Recording module required: Downloading openvidu/openvidu-recording:" + openviduConfig.getOpenViduRecordingVersion() + " Docker image (350MB aprox)"); @@ -603,7 +608,8 @@ public class RecordingManager { log.warn("No KMSs were defined in kms.uris array. Recording path check aborted"); } else { - MediaPipeline pipeline = this.kmsManager.getLessLoadedAndRunningKms().getKurentoClient().createMediaPipeline(); + MediaPipeline pipeline = this.kmsManager.getLessLoadedAndRunningKms().getKurentoClient() + .createMediaPipeline(); RecorderEndpoint recorder = new RecorderEndpoint.Builder(pipeline, "file://" + testFilePath).build(); final AtomicBoolean kurentoRecorderError = new AtomicBoolean(false); diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingService.java index 011f28ed..92f6f249 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingService.java @@ -34,6 +34,7 @@ import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.RecordingDownloader; import io.openvidu.server.utils.CommandExecutor; import io.openvidu.server.utils.CustomFileManager; +import io.openvidu.server.utils.QuarantineKiller; public abstract class RecordingService { @@ -43,14 +44,16 @@ public abstract class RecordingService { protected RecordingManager recordingManager; protected RecordingDownloader recordingDownloader; protected CallDetailRecord cdr; + protected QuarantineKiller quarantineKiller; protected CustomFileManager fileWriter = new CustomFileManager(); RecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader, - OpenviduConfig openviduConfig, CallDetailRecord cdr) { + OpenviduConfig openviduConfig, CallDetailRecord cdr, QuarantineKiller quarantineKiller) { this.recordingManager = recordingManager; this.recordingDownloader = recordingDownloader; this.openviduConfig = openviduConfig; this.cdr = cdr; + this.quarantineKiller = quarantineKiller; } public abstract Recording startRecording(Session session, RecordingProperties properties) throws OpenViduException; @@ -104,7 +107,7 @@ public abstract class RecordingService { io.openvidu.java.client.Recording.Status status = io.openvidu.java.client.Recording.Status.failed .equals(recording.getStatus()) ? io.openvidu.java.client.Recording.Status.failed : io.openvidu.java.client.Recording.Status.ready; - + // Status is now failed or ready. Url property must be defined recording.setUrl(recordingManager.getRecordingUrl(recording)); diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java index 0a973dc9..162fa261 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java @@ -59,10 +59,12 @@ import io.openvidu.server.core.EndReason; import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; import io.openvidu.server.kurento.core.KurentoParticipant; +import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.kurento.endpoint.PublisherEndpoint; import io.openvidu.server.recording.RecorderEndpointWrapper; import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.RecordingDownloader; +import io.openvidu.server.utils.QuarantineKiller; public class SingleStreamRecordingService extends RecordingService { @@ -74,8 +76,8 @@ public class SingleStreamRecordingService extends RecordingService { private final String INDIVIDUAL_STREAM_METADATA_FILE = ".stream."; public SingleStreamRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader, - OpenviduConfig openviduConfig, CallDetailRecord cdr) { - super(recordingManager, recordingDownloader, openviduConfig, cdr); + OpenviduConfig openviduConfig, CallDetailRecord cdr, QuarantineKiller quarantineKiller) { + super(recordingManager, recordingDownloader, openviduConfig, cdr, quarantineKiller); } @Override @@ -129,6 +131,9 @@ public class SingleStreamRecordingService extends RecordingService { this.generateRecordingMetadataFile(recording); + // Increment active recordings + ((KurentoSession) session).getKms().getActiveRecordings().incrementAndGet(); + return recording; } @@ -163,7 +168,6 @@ public class SingleStreamRecordingService extends RecordingService { this.cleanRecordingMaps(recording); - // TODO: DOWNLOAD FILES IF SCALABILITY MODE final Recording[] finalRecordingArray = new Recording[1]; finalRecordingArray[0] = recording; try { @@ -181,6 +185,13 @@ public class SingleStreamRecordingService extends RecordingService { finalRecordingArray[0].getStatus()); storedRecorders.remove(finalRecordingArray[0].getSessionId()); + + // Decrement active recordings once it is downloaded + ((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet(); + + // Now we can drop Media Node if waiting-idle-to-terminate + this.quarantineKiller.dropMediaNode(session.getMediaNodeId()); + }); } catch (IOException e) { log.error("Error while downloading recording {}", finalRecordingArray[0].getName());