diff --git a/openvidu-client/src/main/java/io/openvidu/client/OpenViduException.java b/openvidu-client/src/main/java/io/openvidu/client/OpenViduException.java index 74c7c295..c39ece51 100644 --- a/openvidu-client/src/main/java/io/openvidu/client/OpenViduException.java +++ b/openvidu-client/src/main/java/io/openvidu/client/OpenViduException.java @@ -51,7 +51,9 @@ public class OpenViduException extends JsonRpcErrorException { RECORDING_DELETE_ERROR_CODE(706), RECORDING_LIST_ERROR_CODE(705), RECORDING_STOP_ERROR_CODE(704), RECORDING_START_ERROR_CODE(703), RECORDING_REPORT_ERROR_CODE(702), RECORDING_COMPLETION_ERROR_CODE(701), - FORCED_CODEC_NOT_FOUND_IN_SDPOFFER(800); + FORCED_CODEC_NOT_FOUND_IN_SDPOFFER(800), + + MEDIA_NODE_NOT_FOUND(900), MEDIA_NODE_STATUS_WRONG(901); private int value; diff --git a/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java b/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java index 2bd34593..a8be0981 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java +++ b/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java @@ -75,6 +75,8 @@ import io.openvidu.server.rpc.RpcNotificationService; import io.openvidu.server.utils.CommandExecutor; import io.openvidu.server.utils.GeoLocationByIp; import io.openvidu.server.utils.GeoLocationByIpDummy; +import io.openvidu.server.utils.LocalCustomFileManager; +import io.openvidu.server.utils.LocalDockerManager; import io.openvidu.server.utils.MediaNodeStatusManager; import io.openvidu.server.utils.MediaNodeStatusManagerDummy; import io.openvidu.server.utils.QuarantineKiller; @@ -170,7 +172,7 @@ public class OpenViduServer implements JsonRpcConfigurer { @ConditionalOnMissingBean @DependsOn("openviduConfig") public RecordingManager recordingManager() { - return new RecordingManager(); + return new RecordingManager(new LocalDockerManager(false), new LocalCustomFileManager()); } @Bean diff --git a/openvidu-server/src/main/java/io/openvidu/server/config/OpenviduConfig.java b/openvidu-server/src/main/java/io/openvidu/server/config/OpenviduConfig.java index 53b7a437..1e103b02 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/config/OpenviduConfig.java +++ b/openvidu-server/src/main/java/io/openvidu/server/config/OpenviduConfig.java @@ -37,7 +37,6 @@ import java.util.Map; import javax.annotation.PostConstruct; -import io.openvidu.java.client.VideoCodec; import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.Header; @@ -55,6 +54,7 @@ import com.google.gson.JsonArray; import com.google.gson.JsonSyntaxException; import io.openvidu.java.client.OpenViduRole; +import io.openvidu.java.client.VideoCodec; import io.openvidu.server.OpenViduServer; import io.openvidu.server.cdr.CDREventName; import io.openvidu.server.config.Dotenv.DotenvFormatException; @@ -239,6 +239,10 @@ public class OpenviduConfig { return openViduRecordingDebug; } + public boolean isRecordingComposedExternal() { + return false; + } + public String getOpenViduRecordingPath() { return this.openviduRecordingPath; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index f095946f..1b4ea327 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -31,7 +31,6 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; -import io.openvidu.server.utils.SDPMunging; import org.apache.commons.lang3.RandomStringUtils; import org.kurento.client.GenericMediaElement; import org.kurento.client.IceCandidate; @@ -74,6 +73,7 @@ import io.openvidu.server.kurento.kms.KmsManager; import io.openvidu.server.rpc.RpcHandler; import io.openvidu.server.utils.GeoLocation; import io.openvidu.server.utils.JsonUtils; +import io.openvidu.server.utils.SDPMunging; public class KurentoSessionManager extends SessionManager { @@ -370,8 +370,8 @@ public class KurentoSessionManager extends SessionManager { // Modify sdp if forced codec is defined if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) { - kurentoOptions.sdpOffer = sdpMunging.forceCodec(participant, kurentoOptions.sdpOffer, kurentoOptions.isOffer, - kSession, true, false, isTranscodingAllowed, forcedVideoCodec); + kurentoOptions.sdpOffer = sdpMunging.forceCodec(participant, kurentoOptions.sdpOffer, + kurentoOptions.isOffer, kSession, true, false, isTranscodingAllowed, forcedVideoCodec); } log.debug( "Request [PUBLISH_MEDIA] isOffer={} sdp={} " @@ -435,7 +435,7 @@ public class KurentoSessionManager extends SessionManager { .customLayout(kSession.getSessionProperties().defaultCustomLayout()) .resolution(/* * kSession.getSessionProperties().defaultRecordingResolution() - */"1920x1080").build()); + */"1920x1080").mediaNode(kSession.getMediaNodeId()).build()); }).start(); } else if (recordingManager.sessionIsBeingRecorded(kSession.getSessionId())) { // Abort automatic recording stop thread for any recorded session in which a @@ -1079,8 +1079,8 @@ public class KurentoSessionManager extends SessionManager { // Modify sdp if forced codec is defined if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) { - sdpOffer = sdpMunging.forceCodec(participant, sdpOffer, true, kSession, isPublisher, - true, isTranscodingAllowed, forcedVideoCodec); + sdpOffer = sdpMunging.forceCodec(participant, sdpOffer, true, kSession, isPublisher, true, + isTranscodingAllowed, forcedVideoCodec); } if (isPublisher) { @@ -1102,7 +1102,8 @@ public class KurentoSessionManager extends SessionManager { kParticipant.createPublishingEndpoint(kurentoOptions, streamId); SdpType sdpType = kurentoOptions.isOffer ? SdpType.OFFER : SdpType.ANSWER; String sdpAnswer = kParticipant.publishToRoom(sdpType, sdpOffer, kurentoOptions.doLoopback, true); - log.debug("SDP Answer for publishing reconnection PARTICIPANT {}: {}", participant.getParticipantPublicId(), sdpAnswer); + log.debug("SDP Answer for publishing reconnection PARTICIPANT {}: {}", participant.getParticipantPublicId(), + sdpAnswer); sessionEventsHandler.onPublishMedia(participant, participant.getPublisherStreamId(), kParticipant.getPublisher().createdAt(), kSession.getSessionId(), kurentoOptions, sdpAnswer, new HashSet(), transactionId, null); @@ -1120,7 +1121,8 @@ public class KurentoSessionManager extends SessionManager { "Unable to generate SDP answer when reconnecting subscriber to '" + streamId + "'"); } - log.debug("SDP Answer for subscribing reconnection PARTICIPANT {}: {}", participant.getParticipantPublicId(), sdpAnswer); + log.debug("SDP Answer for subscribing reconnection PARTICIPANT {}: {}", + participant.getParticipantPublicId(), sdpAnswer); sessionEventsHandler.onSubscribe(participant, kSession, sdpAnswer, transactionId, null); } else { throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java index 192859a1..324b903d 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java @@ -23,19 +23,17 @@ import java.util.List; import javax.annotation.PostConstruct; -import org.apache.commons.lang3.RandomStringUtils; import org.kurento.client.KurentoClient; import org.kurento.commons.exception.KurentoException; -import io.openvidu.server.core.IdentifierPrefixes; - public class FixedOneKmsManager extends KmsManager { @Override - public List initializeKurentoClients(List kmsProperties, boolean disconnectUponFailure) throws Exception { + public List initializeKurentoClients(List kmsProperties, boolean disconnectUponFailure) + throws Exception { KmsProperties firstProps = kmsProperties.get(0); KurentoClient kClient = null; - Kms kms = new Kms(firstProps, loadManager); + Kms kms = new Kms(firstProps, loadManager, quarantineKiller); try { kClient = KurentoClient.create(firstProps.getUri(), this.generateKurentoConnectionListener(kms.getId())); this.addKms(kms); @@ -55,6 +53,11 @@ public class FixedOneKmsManager extends KmsManager { return Arrays.asList(kms); } + @Override + public boolean isMediaNodeRunning(String mediaNodeId) { + return true; + } + @Override @PostConstruct protected void postConstructInitKurentoClients() { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java index d63ec9d9..0957c3cc 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java @@ -36,6 +36,7 @@ import com.google.gson.JsonArray; import com.google.gson.JsonObject; import io.openvidu.server.kurento.core.KurentoSession; +import io.openvidu.server.utils.QuarantineKiller; /** * Abstraction of a KMS instance: an object of this class corresponds to a KMS @@ -57,6 +58,7 @@ public class Kms { private String ip; private KurentoClient client; private LoadManager loadManager; + private QuarantineKiller quarantineKiller; private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false); private AtomicLong timeOfKurentoClientConnection = new AtomicLong(0); @@ -65,7 +67,7 @@ public class Kms { private Map kurentoSessions = new ConcurrentHashMap<>(); private AtomicInteger activeRecordings = new AtomicInteger(0); - public Kms(KmsProperties props, LoadManager loadManager) { + public Kms(KmsProperties props, LoadManager loadManager, QuarantineKiller quarantineKiller) { this.id = props.getId(); this.uri = props.getUri(); @@ -79,6 +81,7 @@ public class Kms { this.ip = url.getHost(); this.loadManager = loadManager; + this.quarantineKiller = quarantineKiller; } public void setKurentoClient(KurentoClient client) { @@ -145,8 +148,17 @@ public class Kms { this.kurentoSessions.remove(sessionId); } - public AtomicInteger getActiveRecordings() { - return this.activeRecordings; + public synchronized int getActiveRecordings() { + return this.activeRecordings.get(); + } + + public synchronized int incrementActiveRecordings() { + return this.activeRecordings.incrementAndGet(); + } + + public synchronized void decrementActiveRecordings() { + this.activeRecordings.updateAndGet(i -> i > 0 ? i - 1 : i); + this.quarantineKiller.dropMediaNode(this.id); } public JsonObject toJson() { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index e93581df..d9460dde 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -46,6 +46,7 @@ import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.IdentifierPrefixes; import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.utils.MediaNodeStatusManager; +import io.openvidu.server.utils.QuarantineKiller; import io.openvidu.server.utils.UpdatableTimerTask; public abstract class KmsManager { @@ -101,6 +102,9 @@ public abstract class KmsManager { @Autowired protected LoadManager loadManager; + @Autowired + protected QuarantineKiller quarantineKiller; + @Autowired protected MediaNodeStatusManager mediaNodeStatusManager; @@ -353,6 +357,8 @@ public abstract class KmsManager { public abstract List initializeKurentoClients(List kmsProperties, boolean disconnectUponFailure) throws Exception; + public abstract boolean isMediaNodeRunning(String mediaNodeId); + @PostConstruct protected abstract void postConstructInitKurentoClients(); diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/RecorderEndpointWrapper.java b/openvidu-server/src/main/java/io/openvidu/server/recording/RecorderEndpointWrapper.java index fe552fad..f6b7dc29 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/RecorderEndpointWrapper.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/RecorderEndpointWrapper.java @@ -23,7 +23,7 @@ import org.kurento.client.RecorderEndpoint; import com.google.gson.JsonObject; import io.openvidu.server.kurento.core.KurentoParticipant; -import io.openvidu.server.recording.service.SingleStreamRecordingService; +import io.openvidu.server.recording.service.RecordingService; public class RecorderEndpointWrapper { @@ -61,7 +61,7 @@ public class RecorderEndpointWrapper { public RecorderEndpointWrapper(JsonObject json) { String nameAux = json.get("name").getAsString(); // If the name includes the extension, remove it - this.name = StringUtils.removeEnd(nameAux, SingleStreamRecordingService.INDIVIDUAL_RECORDING_EXTENSION); + this.name = StringUtils.removeEnd(nameAux, RecordingService.INDIVIDUAL_RECORDING_EXTENSION); this.connectionId = json.get("connectionId").getAsString(); this.streamId = json.get("streamId").getAsString(); this.clientData = (json.has("clientData") && !json.get("clientData").isJsonNull()) @@ -91,7 +91,7 @@ public class RecorderEndpointWrapper { } public String getNameWithExtension() { - return this.name + SingleStreamRecordingService.INDIVIDUAL_RECORDING_EXTENSION; + return this.name + RecordingService.INDIVIDUAL_RECORDING_EXTENSION; } public String getConnectionId() { diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/Recording.java b/openvidu-server/src/main/java/io/openvidu/server/recording/Recording.java index 326f468a..e35805ee 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/Recording.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/Recording.java @@ -61,7 +61,7 @@ public class Recording { try { this.duration = json.get("duration").getAsDouble(); } catch (Exception e) { - this.duration = new Long((long) json.get("duration").getAsLong()).doubleValue(); + this.duration = Long.valueOf((long) json.get("duration").getAsLong()).doubleValue(); } if (json.get("url").isJsonNull()) { this.url = null; diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedQuickStartRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedQuickStartRecordingService.java index 60e32e0a..758282f3 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedQuickStartRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedQuickStartRecordingService.java @@ -1,7 +1,15 @@ package io.openvidu.server.recording.service; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -10,6 +18,7 @@ import com.github.dockerjava.api.model.Bind; import com.github.dockerjava.api.model.Volume; import io.openvidu.client.OpenViduException; +import io.openvidu.client.OpenViduException.Code; import io.openvidu.java.client.RecordingProperties; import io.openvidu.server.cdr.CallDetailRecord; import io.openvidu.server.config.OpenviduConfig; @@ -18,16 +27,18 @@ import io.openvidu.server.core.Session; import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.RecordingDownloader; import io.openvidu.server.recording.RecordingUploader; -import io.openvidu.server.utils.QuarantineKiller; +import io.openvidu.server.utils.CustomFileManager; +import io.openvidu.server.utils.DockerManager; public class ComposedQuickStartRecordingService extends ComposedRecordingService { private static final Logger log = LoggerFactory.getLogger(ComposedRecordingService.class); public ComposedQuickStartRecordingService(RecordingManager recordingManager, - RecordingDownloader recordingDownloader, RecordingUploader recordingUploader, OpenviduConfig openviduConfig, - CallDetailRecord cdr, QuarantineKiller quarantineKiller) { - super(recordingManager, recordingDownloader, recordingUploader, openviduConfig, cdr, quarantineKiller); + RecordingDownloader recordingDownloader, RecordingUploader recordingUploader, CustomFileManager fileManager, + OpenviduConfig openviduConfig, CallDetailRecord cdr, DockerManager dockerManager) { + super(recordingManager, recordingDownloader, recordingUploader, fileManager, openviduConfig, cdr, + dockerManager); } public void stopRecordingContainer(Session session, EndReason reason) { @@ -38,7 +49,7 @@ public class ComposedQuickStartRecordingService extends ComposedRecordingService if (containerId != null) { try { - dockerManager.removeDockerContainer(containerId, true); + dockerManager.removeContainer(session.getMediaNodeId(), containerId, true); } catch (Exception e) { log.error("Can't remove COMPOSED_QUICK_START recording container from session {}", session.getSessionId()); @@ -78,7 +89,8 @@ public class ComposedQuickStartRecordingService extends ComposedRecordingService recordExecCommand += "export " + envs.get(i) + " "; } recordExecCommand += "&& ./composed_quick_start.sh --start-recording > /var/log/ffmpeg.log 2>&1 &"; - dockerManager.runCommandInContainer(containerId, recordExecCommand); + dockerManager.runCommandInContainerAsync(recording.getRecordingProperties().mediaNode(), containerId, + recordExecCommand); } catch (Exception e) { this.cleanRecordingMaps(recording); throw this.failStartRecording(session, recording, @@ -89,12 +101,16 @@ public class ComposedQuickStartRecordingService extends ComposedRecordingService try { this.waitForVideoFileNotEmpty(recording); - } catch (OpenViduException e) { + } catch (Exception e) { this.cleanRecordingMaps(recording); throw this.failStartRecording(session, recording, "Couldn't initialize recording container. Error: " + e.getMessage()); } + if (this.openviduConfig.isRecordingComposedExternal()) { + this.generateRecordingMetadataFile(recording); + } + return recording; } @@ -103,7 +119,7 @@ public class ComposedQuickStartRecordingService extends ComposedRecordingService log.info("Stopping COMPOSED_QUICK_START ({}) recording {} of session {}. Reason: {}", recording.hasAudio() ? "video + audio" : "audio-only", recording.getId(), recording.getSessionId(), RecordingManager.finalReason(reason)); - log.info("Container for session {} still being ready for new recordings", recording.getSessionId()); + log.info("Container for session {} still ready for new recordings", recording.getSessionId()); String containerId = this.sessionsContainers.get(recording.getSessionId()); @@ -116,29 +132,28 @@ public class ComposedQuickStartRecordingService extends ComposedRecordingService } try { - dockerManager.runCommandInContainerSync(containerId, "./composed_quick_start.sh --stop-recording", 10); - } catch (InterruptedException e1) { - cleanRecordingMaps(recording); - log.error("Error stopping recording for session id: {}", session.getSessionId()); - e1.printStackTrace(); + dockerManager.runCommandInContainerSync(recording.getRecordingProperties().mediaNode(), containerId, + "./composed_quick_start.sh --stop-recording", 10); + } catch (IOException e1) { + log.error("Error stopping COMPOSED_QUICK_START recording {}: {}", recording.getId(), e1.getMessage()); + failRecordingCompletion(recording, containerId, true, + new OpenViduException(Code.RECORDING_COMPLETION_ERROR_CODE, e1.getMessage())); } - updateRecordingAttributes(recording); - - this.sealRecordingMetadataFileAsReady(recording, recording.getSize(), recording.getDuration(), - getMetadataFilePath(recording)); - cleanRecordingMaps(recording); + if (this.openviduConfig.isRecordingComposedExternal()) { + try { + waitForComposedQuickStartFiles(recording); + } catch (Exception e) { + failRecordingCompletion(recording, containerId, false, + new OpenViduException(Code.RECORDING_COMPLETION_ERROR_CODE, e.getMessage())); + } + } if (session != null && reason != null) { this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, recording, reason); } - final Recording[] finalRecordingArray = new Recording[1]; - finalRecordingArray[0] = recording; - this.uploadRecording(finalRecordingArray[0], reason); - - // Decrement active recordings - // ((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet(); + downloadComposedRecording(recording, reason); return recording; } @@ -166,7 +181,7 @@ public class ComposedQuickStartRecordingService extends ComposedRecordingService .customLayout(recorderSession.getSessionProperties().defaultCustomLayout()) .resolution( /* recorderSession.getSessionProperties().defaultRecordingResolution() */"1920x1080") - .build()); + .mediaNode(recorderSession.getMediaNodeId()).build()); log.info("COMPOSED_QUICK_START recording container launched for session: {}", recorderSession.getSessionId()); launched = true; @@ -213,13 +228,13 @@ public class ComposedQuickStartRecordingService extends ComposedRecordingService Bind bind1 = new Bind(openviduConfig.getOpenViduRecordingPath(), volume1); List binds = new ArrayList<>(); binds.add(bind1); - containerId = dockerManager.runContainer(container, containerName, null, volumes, binds, "host", envs, null, - properties.shmSize(), false, null); + containerId = dockerManager.runContainer(properties.mediaNode(), container, containerName, null, volumes, + binds, "host", envs, null, properties.shmSize(), false, null); containers.put(containerId, containerName); this.sessionsContainers.put(session.getSessionId(), containerId); } catch (Exception e) { if (containerId != null) { - dockerManager.removeDockerContainer(containerId, true); + dockerManager.removeContainer(properties.mediaNode(), containerId, true); containers.remove(containerId); sessionsContainers.remove(session.getSessionId()); } @@ -229,4 +244,65 @@ public class ComposedQuickStartRecordingService extends ComposedRecordingService return containerId; } + private void waitForComposedQuickStartFiles(Recording recording) throws Exception { + + final int SECONDS_MAX_WAIT = 30; + final String PATH = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/"; + + // Waiting for the files generated at the end of the stopping process: the + // ffprobe info and the thumbnail + final String INFO_FILE = PATH + recording.getId() + RecordingService.COMPOSED_INFO_FILE_EXTENSION; + final String THUMBNAIL_FILE = PATH + recording.getId() + RecordingService.COMPOSED_THUMBNAIL_EXTENSION; + + Set filesToWaitFor = Stream.of(INFO_FILE, THUMBNAIL_FILE).collect(Collectors.toSet()); + + Collection waitForFileThreads = new HashSet<>(); + CountDownLatch latch = new CountDownLatch(filesToWaitFor.size()); + + for (final String file : filesToWaitFor) { + Thread downloadThread = new Thread() { + @Override + public void run() { + try { + fileManager.waitForFileToExistAndNotEmpty(recording.getRecordingProperties().mediaNode(), file, + SECONDS_MAX_WAIT); + } catch (Exception e) { + log.error(e.getMessage()); + recording.setStatus(io.openvidu.java.client.Recording.Status.failed); + } finally { + latch.countDown(); + } + } + }; + waitForFileThreads.add(downloadThread); + } + waitForFileThreads.forEach(t -> t.start()); + + try { + if (!latch.await(SECONDS_MAX_WAIT, TimeUnit.SECONDS)) { + recording.setStatus(io.openvidu.java.client.Recording.Status.failed); + String msg = "The wait for files of COMPOSED_QUICK_START recording " + recording.getId() + + " didn't complete in " + SECONDS_MAX_WAIT + " seconds"; + log.error(msg); + throw new Exception(msg); + } else { + if (io.openvidu.java.client.Recording.Status.failed.equals(recording.getStatus())) { + String msg = "Error waiting for some COMPOSED_QUICK_START recording file in recording " + + recording.getId(); + log.error(msg); + throw new Exception(msg); + } else { + log.info("All files of COMPOSED_QUICK_START recording {} are available to download", + recording.getId()); + } + } + } catch (InterruptedException e) { + recording.setStatus(io.openvidu.java.client.Recording.Status.failed); + String msg = "Error waiting for COMPOSED_QUICK_START recording files of recording " + recording.getId() + + ": " + e.getMessage(); + log.error(msg); + throw new Exception(msg); + } + } + } diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java index 09350bf9..bbf1d4de 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java @@ -58,8 +58,8 @@ import io.openvidu.server.recording.RecordingDownloader; import io.openvidu.server.recording.RecordingInfoUtils; import io.openvidu.server.recording.RecordingUploader; import io.openvidu.server.rest.RequestMappings; -import io.openvidu.server.utils.LocalDockerManager; -import io.openvidu.server.utils.QuarantineKiller; +import io.openvidu.server.utils.CustomFileManager; +import io.openvidu.server.utils.DockerManager; public class ComposedRecordingService extends RecordingService { @@ -69,13 +69,13 @@ public class ComposedRecordingService extends RecordingService { protected Map sessionsContainers = new ConcurrentHashMap<>(); private Map composites = new ConcurrentHashMap<>(); - protected LocalDockerManager dockerManager; + protected DockerManager dockerManager; public ComposedRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader, - RecordingUploader recordingUploader, OpenviduConfig openviduConfig, CallDetailRecord cdr, - QuarantineKiller quarantineKiller) { - super(recordingManager, recordingDownloader, recordingUploader, openviduConfig, cdr, quarantineKiller); - this.dockerManager = new LocalDockerManager(); + RecordingUploader recordingUploader, CustomFileManager fileManager, OpenviduConfig openviduConfig, + CallDetailRecord cdr, DockerManager dockerManager) { + super(recordingManager, recordingDownloader, recordingUploader, fileManager, openviduConfig, cdr); + this.dockerManager = dockerManager; } @Override @@ -98,9 +98,6 @@ public class ComposedRecordingService extends RecordingService { recording = this.startRecordingAudioOnly(session, recording, properties); } - // Increment active recordings - // ((KurentoSession) session).getKms().getActiveRecordings().incrementAndGet(); - return recording; } @@ -177,8 +174,8 @@ public class ComposedRecordingService extends RecordingService { Bind bind1 = new Bind(openviduConfig.getOpenViduRecordingPath(), volume1); List binds = new ArrayList<>(); binds.add(bind1); - containerId = dockerManager.runContainer(container, containerName, null, volumes, binds, "host", envs, null, - properties.shmSize(), false, null); + containerId = dockerManager.runContainer(properties.mediaNode(), container, containerName, null, volumes, + binds, "host", envs, null, properties.shmSize(), false, null); containers.put(containerId, containerName); } catch (Exception e) { this.cleanRecordingMaps(recording); @@ -190,12 +187,16 @@ public class ComposedRecordingService extends RecordingService { try { this.waitForVideoFileNotEmpty(recording); - } catch (OpenViduException e) { + } catch (Exception e) { this.cleanRecordingMaps(recording); throw this.failStartRecording(session, recording, "Couldn't initialize recording container. Error: " + e.getMessage()); } + if (this.openviduConfig.isRecordingComposedExternal()) { + this.generateRecordingMetadataFile(recording); + } + return recording; } @@ -224,9 +225,6 @@ public class ComposedRecordingService extends RecordingService { this.generateRecordingMetadataFile(recording); - // Increment active recordings - ((KurentoSession) session).getKms().getActiveRecordings().incrementAndGet(); - return recording; } @@ -273,7 +271,8 @@ public class ComposedRecordingService extends RecordingService { } else { log.warn("Removing container {} for closed session {}...", containerIdAux, session.getSessionId()); - dockerManager.removeDockerContainer(containerIdAux, true); + dockerManager.removeContainer(recordingAux.getRecordingProperties().mediaNode(), + containerIdAux, true); containers.remove(containerId); containerClosed = true; log.warn("Container {} for closed session {} succesfully stopped and removed", @@ -293,29 +292,18 @@ public class ComposedRecordingService extends RecordingService { return; } // Decrement active recordings - // ((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet(); + ((KurentoSession) session).getKms().decrementActiveRecordings(); }).start(); } } else { stopAndRemoveRecordingContainer(recording, containerId, 30); - updateRecordingAttributes(recording); - - this.sealRecordingMetadataFileAsReady(recording, recording.getSize(), recording.getDuration(), - getMetadataFilePath(recording)); - cleanRecordingMaps(recording); if (session != null && reason != null) { this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, recording, reason); } - // Upload if necessary - final Recording[] finalRecordingArray = new Recording[1]; - finalRecordingArray[0] = recording; - this.uploadRecording(finalRecordingArray[0], reason); - - // Decrement active recordings - // ((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet(); + downloadComposedRecording(session, recording, reason); } return recording; @@ -370,13 +358,12 @@ public class ComposedRecordingService extends RecordingService { this.updateFilePermissions(filesPath); finalRecordingArray[0] = this.sealRecordingMetadataFileAsReady(finalRecordingArray[0], finalSize, finalDuration, - filesPath + RecordingManager.RECORDING_ENTITY_FILE + finalRecordingArray[0].getId()); + filesPath + RecordingService.RECORDING_ENTITY_FILE + finalRecordingArray[0].getId()); - // Decrement active recordings once it is downloaded - ((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet(); - - // Now we can drop Media Node if waiting-idle-to-terminate - this.quarantineKiller.dropMediaNode(session.getMediaNodeId()); + // Decrement active recordings once it is downloaded. This method will also drop + // the Media Node if no more sessions or recordings and status is + // waiting-idle-to-terminate + ((KurentoSession) session).getKms().decrementActiveRecordings(); // Upload if necessary this.uploadRecording(finalRecordingArray[0], reason); @@ -397,29 +384,32 @@ public class ComposedRecordingService extends RecordingService { private void stopAndRemoveRecordingContainer(Recording recording, String containerId, int secondsOfWait) { // Gracefully stop ffmpeg process try { - dockerManager.runCommandInContainer(containerId, "echo 'q' > stop"); - } catch (InterruptedException e1) { + dockerManager.runCommandInContainerAsync(recording.getRecordingProperties().mediaNode(), containerId, + "echo 'q' > stop"); + } catch (IOException e1) { e1.printStackTrace(); } // Wait for the container to be gracefully self-stopped final int timeOfWait = 30; try { - dockerManager.waitForContainerStopped(containerId, timeOfWait); + dockerManager.waitForContainerStopped(recording.getRecordingProperties().mediaNode(), containerId, + timeOfWait); } catch (Exception e) { - failRecordingCompletion(recording, containerId, new OpenViduException(Code.RECORDING_COMPLETION_ERROR_CODE, - "The recording completion process couldn't finish in " + timeOfWait + " seconds")); + failRecordingCompletion(recording, containerId, true, + new OpenViduException(Code.RECORDING_COMPLETION_ERROR_CODE, + "The recording completion process couldn't finish in " + timeOfWait + " seconds")); } // Remove container - dockerManager.removeDockerContainer(containerId, false); + dockerManager.removeContainer(recording.getRecordingProperties().mediaNode(), containerId, false); containers.remove(containerId); } protected void updateRecordingAttributes(Recording recording) { try { RecordingInfoUtils infoUtils = new RecordingInfoUtils(this.openviduConfig.getOpenViduRecordingPath() - + recording.getId() + "/" + recording.getId() + ".info"); + + recording.getId() + "/" + recording.getId() + RecordingService.COMPOSED_INFO_FILE_EXTENSION); if (!infoUtils.hasVideo()) { log.error("COMPOSED recording {} with hasVideo=true has not video track", recording.getId()); @@ -440,44 +430,24 @@ public class ComposedRecordingService extends RecordingService { } } - protected void waitForVideoFileNotEmpty(Recording recording) throws OpenViduException { - + protected void waitForVideoFileNotEmpty(Recording recording) throws Exception { final String VIDEO_FILE = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/" - + recording.getName() + ".mp4"; - - int SECONDS_MAX_WAIT = 15; - int MILLISECONDS_INTERVAL_WAIT = 100; - int LIMIT = SECONDS_MAX_WAIT * 1000 / MILLISECONDS_INTERVAL_WAIT; - - int i = 0; - boolean arePresent = fileExistsAndHasBytes(VIDEO_FILE); - while (!arePresent && i < LIMIT) { - try { - Thread.sleep(MILLISECONDS_INTERVAL_WAIT); - arePresent = fileExistsAndHasBytes(VIDEO_FILE); - i++; - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - if (!arePresent) { - log.error("Recorder container failed generating video file (is empty) for session {}", - recording.getSessionId()); - throw new OpenViduException(Code.RECORDING_START_ERROR_CODE, - "Recorder container failed generating video file (is empty)"); - } + + recording.getName() + RecordingService.COMPOSED_RECORDING_EXTENSION; + int SECONDS_MAX_WAIT = 20; + this.fileManager.waitForFileToExistAndNotEmpty(recording.getRecordingProperties().mediaNode(), VIDEO_FILE, + SECONDS_MAX_WAIT); } - private boolean fileExistsAndHasBytes(String fileName) { - File f = new File(fileName); - return (f.exists() && f.isFile() && f.length() > 0); - } - - private void failRecordingCompletion(Recording recording, String containerId, OpenViduException e) - throws OpenViduException { + protected void failRecordingCompletion(Recording recording, String containerId, boolean removeContainer, + OpenViduException e) throws OpenViduException { recording.setStatus(io.openvidu.java.client.Recording.Status.failed); - dockerManager.removeDockerContainer(containerId, true); - containers.remove(containerId); + if (removeContainer) { + dockerManager.removeContainer(recording.getRecordingProperties().mediaNode(), containerId, true); + containers.remove(containerId); + } + sealRecordingMetadataFileAsReady(recording, recording.getSize(), recording.getDuration(), + getMetadataFilePath(recording)); + cleanRecordingMaps(recording); throw e; } @@ -611,4 +581,27 @@ public class ComposedRecordingService extends RecordingService { return finalUrl; } + protected void downloadComposedRecording(final Session session, final Recording recording, final EndReason reason) { + try { + this.recordingDownloader.downloadRecording(recording, null, () -> { + + updateRecordingAttributes(recording); + + this.sealRecordingMetadataFileAsReady(recording, recording.getSize(), recording.getDuration(), + getMetadataFilePath(recording)); + cleanRecordingMaps(recording); + + // Decrement active recordings once it is downloaded. This method will also drop + // the Media Node if no more sessions or recordings and status is + // waiting-idle-to-terminate + ((KurentoSession) session).getKms().decrementActiveRecordings(); + + // Upload if necessary + this.uploadRecording(recording, reason); + }); + } catch (IOException e) { + log.error("Error while downloading recording {}: {}", recording.getName(), e.getMessage()); + } + } + } \ No newline at end of file diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java index 07374487..2c5092ff 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java @@ -67,15 +67,17 @@ import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; import io.openvidu.server.core.SessionEventsHandler; import io.openvidu.server.core.SessionManager; +import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.kurento.kms.Kms; import io.openvidu.server.kurento.kms.KmsManager; import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.RecordingDownloader; import io.openvidu.server.recording.RecordingUploader; import io.openvidu.server.utils.CustomFileManager; -import io.openvidu.server.utils.LocalDockerManager; +import io.openvidu.server.utils.DockerManager; import io.openvidu.server.utils.JsonUtils; -import io.openvidu.server.utils.QuarantineKiller; +import io.openvidu.server.utils.LocalCustomFileManager; +import io.openvidu.server.utils.LocalDockerManager; import io.openvidu.server.utils.RecordingUtils; public class RecordingManager { @@ -85,7 +87,8 @@ public class RecordingManager { private ComposedRecordingService composedRecordingService; private ComposedQuickStartRecordingService composedQuickStartRecordingService; private SingleStreamRecordingService singleStreamRecordingService; - private LocalDockerManager dockerManager; + private DockerManager dockerManager; + private CustomFileManager fileManager; @Autowired protected SessionEventsHandler sessionHandler; @@ -108,9 +111,6 @@ public class RecordingManager { @Autowired private KmsManager kmsManager; - @Autowired - protected QuarantineKiller quarantineKiller; - @Autowired private CallDetailRecord cdr; @@ -125,14 +125,18 @@ public class RecordingManager { private ScheduledThreadPoolExecutor automaticRecordingStopExecutor = new ScheduledThreadPoolExecutor( Runtime.getRuntime().availableProcessors()); - public static final String RECORDING_ENTITY_FILE = ".recording."; public static final String IMAGE_NAME = "openvidu/openvidu-recording"; - static String IMAGE_TAG; + public static String IMAGE_TAG; private static final List LAST_PARTICIPANT_LEFT_REASONS = Arrays .asList(new EndReason[] { EndReason.disconnect, EndReason.forceDisconnectByUser, EndReason.forceDisconnectByServer, EndReason.networkDisconnect }); + public RecordingManager(DockerManager dockerManager, CustomFileManager fileManager) { + this.dockerManager = dockerManager; + this.fileManager = fileManager; + } + @PostConstruct public void init() { if (this.openviduConfig.isRecordingModuleEnabled()) { @@ -164,60 +168,33 @@ public class RecordingManager { RecordingManager.IMAGE_TAG = openviduConfig.getOpenViduRecordingVersion(); - this.dockerManager = new LocalDockerManager(); - this.composedRecordingService = new ComposedRecordingService(this, recordingDownloader, recordingUploader, - openviduConfig, cdr, quarantineKiller); - this.composedQuickStartRecordingService = new ComposedQuickStartRecordingService(this, recordingDownloader, - recordingUploader, openviduConfig, cdr, quarantineKiller); - this.singleStreamRecordingService = new SingleStreamRecordingService(this, recordingDownloader, - recordingUploader, openviduConfig, cdr, quarantineKiller); + this.dockerManager.init(); - log.info("Recording module required: Downloading openvidu/openvidu-recording:" - + openviduConfig.getOpenViduRecordingVersion() + " Docker image (350MB aprox)"); + this.composedRecordingService = new ComposedRecordingService(this, recordingDownloader, recordingUploader, + fileManager, openviduConfig, cdr, this.dockerManager); + this.composedQuickStartRecordingService = new ComposedQuickStartRecordingService(this, recordingDownloader, + recordingUploader, fileManager, openviduConfig, cdr, this.dockerManager); + this.singleStreamRecordingService = new SingleStreamRecordingService(this, recordingDownloader, + recordingUploader, fileManager, openviduConfig, cdr); this.checkRecordingRequirements(this.openviduConfig.getOpenViduRecordingPath(), this.openviduConfig.getOpenviduRecordingCustomLayout()); - if (dockerManager.dockerImageExistsLocally(IMAGE_NAME + ":" + IMAGE_TAG)) { - log.info("Docker image already exists locally"); - } else { - Thread t = new Thread(() -> { - boolean keep = true; - log.info("Downloading "); - while (keep) { - System.out.print("."); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - keep = false; - log.info("\nDownload complete"); - } - } - }); - t.start(); - try { - dockerManager.downloadDockerImage(IMAGE_NAME + ":" + IMAGE_TAG, 600); - } catch (Exception e) { - log.error("Error downloading docker image {}:{}", IMAGE_NAME, IMAGE_TAG); - } - t.interrupt(); - try { - t.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - log.info("Docker image available"); + LocalDockerManager dockMng = new LocalDockerManager(true); + + if (!openviduConfig.isRecordingComposedExternal()) { + downloadRecordingImageToLocal(dockMng); } // Clean any stranded openvidu/openvidu-recording container on startup - dockerManager.cleanStrandedContainers(RecordingManager.IMAGE_NAME); + dockMng.cleanStrandedContainers(RecordingManager.IMAGE_NAME); } public void checkRecordingRequirements(String openviduRecordingPath, String openviduRecordingCustomLayout) throws OpenViduException { LocalDockerManager dockerManager = null; try { - dockerManager = new LocalDockerManager(); + dockerManager = new LocalDockerManager(true); dockerManager.checkDockerEnabled(); } catch (OpenViduException e) { String message = e.getMessage(); @@ -242,6 +219,42 @@ public class RecordingManager { this.checkRecordingPaths(openviduRecordingPath, openviduRecordingCustomLayout); } + private void downloadRecordingImageToLocal(LocalDockerManager dockMng) { + log.info("Recording module required: Downloading openvidu/openvidu-recording:" + + openviduConfig.getOpenViduRecordingVersion() + " Docker image (350MB aprox)"); + + if (dockMng.dockerImageExistsLocally(IMAGE_NAME + ":" + IMAGE_TAG)) { + log.info("Docker image already exists locally"); + } else { + Thread t = new Thread(() -> { + boolean keep = true; + log.info("Downloading "); + while (keep) { + System.out.print("."); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + keep = false; + log.info("\nDownload complete"); + } + } + }); + t.start(); + try { + dockMng.downloadDockerImage(IMAGE_NAME + ":" + IMAGE_TAG, 600); + } catch (Exception e) { + log.error("Error downloading docker image {}:{}", IMAGE_NAME, IMAGE_TAG); + } + t.interrupt(); + try { + t.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + log.info("Docker image available"); + } + } + public void startComposedQuickStartContainer(Session session) { this.composedQuickStartRecordingService.runComposedQuickStartContainer(session); } @@ -252,14 +265,23 @@ public class RecordingManager { public Recording startRecording(Session session, RecordingProperties properties) throws OpenViduException { try { - if (session.recordingLock.tryLock(15, TimeUnit.SECONDS)) { - try { - if (sessionIsBeingRecorded(session.getSessionId())) { - throw new OpenViduException(Code.RECORDING_START_ERROR_CODE, - "Concurrent start of recording for session " + session.getSessionId()); - } else { - Recording recording = null; - try { + + // 1. INCREMENT ACTIVE RECORDINGS OF MEDIA NODE HERE + ((KurentoSession) session).getKms().incrementActiveRecordings(); + // 2. CHECK THAT MEDIA NODE HAS RUNNING STATUS. IF NOT THEN FAIL RECORDING START + if (!kmsManager.isMediaNodeRunning(properties.mediaNode())) { + throw new OpenViduException(Code.MEDIA_NODE_STATUS_WRONG, + "Media Node " + properties.mediaNode() + " status is not \"running\""); + } + + try { + if (session.recordingLock.tryLock(15, TimeUnit.SECONDS)) { + try { + if (sessionIsBeingRecorded(session.getSessionId())) { + throw new OpenViduException(Code.RECORDING_START_ERROR_CODE, + "Concurrent start of recording for session " + session.getSessionId()); + } else { + Recording recording = null; switch (properties.outputMode()) { case COMPOSED: recording = this.composedRecordingService.startRecording(session, properties); @@ -271,41 +293,45 @@ public class RecordingManager { recording = this.singleStreamRecordingService.startRecording(session, properties); break; } - } catch (Exception e) { - throw e; - } - this.recordingFromStartingToStarted(recording); + this.recordingFromStartingToStarted(recording); - this.cdr.recordRecordingStarted(recording); - this.cdr.recordRecordingStatusChanged(recording, null, recording.getCreatedAt(), - Status.started); + this.cdr.recordRecordingStarted(recording); + this.cdr.recordRecordingStatusChanged(recording, null, recording.getCreatedAt(), + Status.started); - if (!(OutputMode.COMPOSED.equals(properties.outputMode()) && properties.hasVideo())) { - // Directly send recording started notification for all cases except for - // COMPOSED recordings with video (will be sent on first RECORDER subscriber) - // Both INDIVIDUAL and COMPOSED_QUICK_START should notify immediately - this.sessionHandler.sendRecordingStartedNotification(session, recording); + if (!(OutputMode.COMPOSED.equals(properties.outputMode()) && properties.hasVideo())) { + // Directly send recording started notification for all cases except for + // COMPOSED recordings with video (will be sent on first RECORDER subscriber) + // Both INDIVIDUAL and COMPOSED_QUICK_START should notify immediately + this.sessionHandler.sendRecordingStartedNotification(session, recording); + } + if (session.getActivePublishers() == 0) { + // Init automatic recording stop if no publishers when starting the recording + log.info( + "No publisher in session {}. Starting {} seconds countdown for stopping recording", + session.getSessionId(), + this.openviduConfig.getOpenviduRecordingAutostopTimeout()); + this.initAutomaticRecordingStopThread(session); + } + return recording; } - if (session.getActivePublishers() == 0) { - // Init automatic recording stop if no publishers when starting the recording - log.info("No publisher in session {}. Starting {} seconds countdown for stopping recording", - session.getSessionId(), this.openviduConfig.getOpenviduRecordingAutostopTimeout()); - this.initAutomaticRecordingStopThread(session); - } - return recording; + } finally { + session.recordingLock.unlock(); } - } finally { - session.recordingLock.unlock(); + } else { + throw new OpenViduException(Code.RECORDING_START_ERROR_CODE, + "Timeout waiting for recording Session lock to be available for session " + + session.getSessionId()); } - } else { + } catch (InterruptedException e) { throw new OpenViduException(Code.RECORDING_START_ERROR_CODE, - "Timeout waiting for recording Session lock to be available for session " + "InterruptedException waiting for recording Session lock to be available for session " + session.getSessionId()); } - } catch (InterruptedException e) { - throw new OpenViduException(Code.RECORDING_START_ERROR_CODE, - "InterruptedException waiting for recording Session lock to be available for session " - + session.getSessionId()); + } catch (Exception e) { + // DECREMENT ACTIVE RECORDINGS OF MEDIA NODE AND TRY REMOVE MEDIA NODE HERE + ((KurentoSession) session).getKms().decrementActiveRecordings(); + throw e; } } @@ -517,9 +543,9 @@ public class RecordingManager { File[] innerFiles = files[i].listFiles(); for (int j = 0; j < innerFiles.length; j++) { if (innerFiles[j].isFile() - && innerFiles[j].getName().startsWith(RecordingManager.RECORDING_ENTITY_FILE)) { + && innerFiles[j].getName().startsWith(RecordingService.RECORDING_ENTITY_FILE)) { fileNamesNoExtension - .add(innerFiles[j].getName().replaceFirst(RecordingManager.RECORDING_ENTITY_FILE, "")); + .add(innerFiles[j].getName().replaceFirst(RecordingService.RECORDING_ENTITY_FILE, "")); break; } } @@ -548,7 +574,7 @@ public class RecordingManager { public File getRecordingEntityFileFromLocalStorage(String recordingId) { String metadataFilePath = openviduConfig.getOpenViduRecordingPath() + recordingId + "/" - + RecordingManager.RECORDING_ENTITY_FILE + recordingId; + + RecordingService.RECORDING_ENTITY_FILE + recordingId; return new File(metadataFilePath); } @@ -571,7 +597,7 @@ public class RecordingManager { } public Recording getRecordingFromEntityFile(File file) { - if (file.isFile() && file.getName().startsWith(RecordingManager.RECORDING_ENTITY_FILE)) { + if (file.isFile() && file.getName().startsWith(RecordingService.RECORDING_ENTITY_FILE)) { JsonObject json; try { json = jsonUtils.fromFileToJsonObject(file.getAbsolutePath()); @@ -726,7 +752,8 @@ public class RecordingManager { } final String testFolderPath = openviduRecordingPath + "/TEST_RECORDING_PATH_" + System.currentTimeMillis(); - final String testFilePath = testFolderPath + "/TEST_RECORDING_PATH" + SingleStreamRecordingService.INDIVIDUAL_RECORDING_EXTENSION; + final String testFilePath = testFolderPath + "/TEST_RECORDING_PATH" + + RecordingService.INDIVIDUAL_RECORDING_EXTENSION; // Check Kurento Media Server write permissions in recording path if (this.kmsManager.getKmss().isEmpty()) { @@ -780,7 +807,7 @@ public class RecordingManager { log.info("Kurento Media Server has write permissions on recording path: {}", openviduRecordingPath); try { - new CustomFileManager().deleteFolder(testFolderPath); + new LocalCustomFileManager().deleteFolder(testFolderPath); log.info("OpenVidu Server has write permissions over files created by Kurento Media Server"); } catch (IOException e) { String errorMessage = "The recording path \"" + openviduRecordingPath diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManagerUtilsLocalStorage.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManagerUtilsLocalStorage.java index 4c0fe3a0..66130a87 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManagerUtilsLocalStorage.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManagerUtilsLocalStorage.java @@ -50,9 +50,9 @@ public class RecordingManagerUtilsLocalStorage extends RecordingManagerUtils { File[] innerFiles = files[i].listFiles(); for (int j = 0; j < innerFiles.length; j++) { if (innerFiles[j].isFile() - && innerFiles[j].getName().startsWith(RecordingManager.RECORDING_ENTITY_FILE)) { + && innerFiles[j].getName().startsWith(RecordingService.RECORDING_ENTITY_FILE)) { fileNamesNoExtension - .add(innerFiles[j].getName().replaceFirst(RecordingManager.RECORDING_ENTITY_FILE, "")); + .add(innerFiles[j].getName().replaceFirst(RecordingService.RECORDING_ENTITY_FILE, "")); break; } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingService.java index f47aba83..13cd4c8c 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingService.java @@ -36,7 +36,6 @@ import io.openvidu.server.recording.RecordingDownloader; import io.openvidu.server.recording.RecordingUploader; import io.openvidu.server.utils.CommandExecutor; import io.openvidu.server.utils.CustomFileManager; -import io.openvidu.server.utils.QuarantineKiller; import io.openvidu.server.utils.RecordingUtils; public abstract class RecordingService { @@ -47,19 +46,26 @@ public abstract class RecordingService { protected RecordingManager recordingManager; protected RecordingDownloader recordingDownloader; protected RecordingUploader recordingUploader; + protected CustomFileManager fileManager; protected CallDetailRecord cdr; - protected QuarantineKiller quarantineKiller; - protected CustomFileManager fileWriter = new CustomFileManager(); - RecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader, - RecordingUploader recordingUploader, OpenviduConfig openviduConfig, CallDetailRecord cdr, - QuarantineKiller quarantineKiller) { + public final static String RECORDING_ENTITY_FILE = ".recording."; + public final static String COMPOSED_RECORDING_EXTENSION = ".mp4"; + public final static String COMPOSED_THUMBNAIL_EXTENSION = ".jpg"; + public final static String COMPOSED_INFO_FILE_EXTENSION = ".info"; + public final static String INDIVIDUAL_RECORDING_EXTENSION = ".webm"; + public final static String INDIVIDUAL_STREAM_METADATA_FILE = ".stream."; + public final static String INDIVIDUAL_RECORDING_COMPRESSED_EXTENSION = ".zip"; + + public RecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader, + RecordingUploader recordingUploader, CustomFileManager fileManager, OpenviduConfig openviduConfig, + CallDetailRecord cdr) { this.recordingManager = recordingManager; this.recordingDownloader = recordingDownloader; this.recordingUploader = recordingUploader; + this.fileManager = fileManager; this.openviduConfig = openviduConfig; this.cdr = cdr; - this.quarantineKiller = quarantineKiller; } public abstract Recording startRecording(Session session, RecordingProperties properties) throws OpenViduException; @@ -72,20 +78,16 @@ public abstract class RecordingService { */ protected void generateRecordingMetadataFile(Recording recording) { String folder = this.openviduConfig.getOpenViduRecordingPath() + recording.getId(); - boolean newFolderCreated = this.fileWriter.createFolderIfNotExists(folder); + boolean newFolderCreated = this.fileManager.createFolderIfNotExists(folder); if (newFolderCreated) { - log.warn( - "New folder {} created. This means A) Cluster mode is enabled B) The recording started for a session with no publishers or C) No media type compatible publishers", - folder); - } else { - log.info("Folder {} already existed. Some publisher is already being recorded", folder); + log.info("New folder {} created for recording {}", folder, recording.getId()); } String filePath = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/" - + RecordingManager.RECORDING_ENTITY_FILE + recording.getId(); + + RecordingService.RECORDING_ENTITY_FILE + recording.getId(); String text = recording.toJson().toString(); - this.fileWriter.createAndWriteFile(filePath, text); + this.fileManager.createAndWriteFile(filePath, text); log.info("Generated recording metadata file at {}", filePath); } @@ -97,7 +99,7 @@ public abstract class RecordingService { */ protected Recording sealRecordingMetadataFileAsStopped(Recording recording) { final String entityFile = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/" - + RecordingManager.RECORDING_ENTITY_FILE + recording.getId(); + + RecordingService.RECORDING_ENTITY_FILE + recording.getId(); return this.sealRecordingMetadataFile(recording, 0, 0, io.openvidu.java.client.Recording.Status.stopped, entityFile); } @@ -123,7 +125,7 @@ public abstract class RecordingService { recording.setUrl(recordingManager.getRecordingUrl(recording)); final String entityFile = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/" - + RecordingManager.RECORDING_ENTITY_FILE + recording.getId(); + + RecordingService.RECORDING_ENTITY_FILE + recording.getId(); return this.sealRecordingMetadataFile(recording, size, duration, status, entityFile); } @@ -133,7 +135,7 @@ public abstract class RecordingService { recording.setSize(size); // Size in bytes recording.setDuration(duration > 0 ? duration : 0); // Duration in seconds - if (this.fileWriter.overwriteFile(metadataFilePath, recording.toJson().toString())) { + if (this.fileManager.overwriteFile(metadataFilePath, recording.toJson().toString())) { log.info("Sealed recording metadata file at {} with status [{}]", metadataFilePath, status.name()); } @@ -151,8 +153,8 @@ public abstract class RecordingService { if (properties.name() == null || properties.name().isEmpty()) { // No name provided for the recording file. Use recordingId RecordingProperties.Builder builder = new RecordingProperties.Builder().name(recordingId) - .outputMode(properties.outputMode()).hasAudio(properties.hasAudio()) - .hasVideo(properties.hasVideo()); + .outputMode(properties.outputMode()).hasAudio(properties.hasAudio()).hasVideo(properties.hasVideo()) + .mediaNode(properties.mediaNode()); if (RecordingUtils.IS_COMPOSED(properties.outputMode()) && properties.hasVideo()) { builder.resolution(properties.resolution()); builder.recordingLayout(properties.recordingLayout()); @@ -202,7 +204,7 @@ public abstract class RecordingService { protected String getMetadataFilePath(Recording recording) { final String folderPath = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/"; - return folderPath + RecordingManager.RECORDING_ENTITY_FILE + recording.getId(); + return folderPath + RecordingService.RECORDING_ENTITY_FILE + recording.getId(); } protected void uploadRecording(final Recording recording, EndReason reason) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java index b395dc70..dc9a0d05 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java @@ -47,7 +47,6 @@ import org.kurento.client.StoppedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonArray; import com.google.gson.JsonObject; @@ -68,7 +67,7 @@ import io.openvidu.server.recording.RecorderEndpointWrapper; import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.RecordingDownloader; import io.openvidu.server.recording.RecordingUploader; -import io.openvidu.server.utils.QuarantineKiller; +import io.openvidu.server.utils.CustomFileManager; public class SingleStreamRecordingService extends RecordingService { @@ -79,13 +78,10 @@ public class SingleStreamRecordingService extends RecordingService { // Multiple recorder endpoints per stream during a recording private Map>> storedRecorders = new ConcurrentHashMap<>(); - private final String INDIVIDUAL_STREAM_METADATA_FILE = ".stream."; - public static final String INDIVIDUAL_RECORDING_EXTENSION = ".webm"; - public SingleStreamRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader, - RecordingUploader recordingUploader, OpenviduConfig openviduConfig, CallDetailRecord cdr, - QuarantineKiller quarantineKiller) { - super(recordingManager, recordingDownloader, recordingUploader, openviduConfig, cdr, quarantineKiller); + RecordingUploader recordingUploader, CustomFileManager fileManager, OpenviduConfig openviduConfig, + CallDetailRecord cdr) { + super(recordingManager, recordingDownloader, recordingUploader, fileManager, openviduConfig, cdr); } @Override @@ -139,9 +135,6 @@ public class SingleStreamRecordingService extends RecordingService { this.generateRecordingMetadataFile(recording); - // Increment active recordings - ((KurentoSession) session).getKms().getActiveRecordings().incrementAndGet(); - return recording; } @@ -191,11 +184,10 @@ public class SingleStreamRecordingService extends RecordingService { cleanRecordingWrappers(finalRecordingArray[0]); - // Decrement active recordings once it is downloaded - ((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet(); - - // Now we can drop Media Node if waiting-idle-to-terminate - this.quarantineKiller.dropMediaNode(session.getMediaNodeId()); + // Decrement active recordings once it is downloaded. This method will also drop + // the Media Node if no more sessions or recordings and status is + // waiting-idle-to-terminate + ((KurentoSession) session).getKms().decrementActiveRecordings(); // Upload if necessary this.uploadRecording(finalRecordingArray[0], reason); @@ -241,8 +233,8 @@ public class SingleStreamRecordingService extends RecordingService { RecorderEndpoint recorder = new RecorderEndpoint.Builder(pipeline, "file://" + openviduConfig.getOpenViduRemoteRecordingPath() + recordingId + "/" + fileName - + SingleStreamRecordingService.INDIVIDUAL_RECORDING_EXTENSION) - .withMediaProfile(profile).build(); + + RecordingService.INDIVIDUAL_RECORDING_EXTENSION).withMediaProfile(profile) + .build(); recorder.addRecordingListener(new EventListener() { @Override @@ -417,11 +409,11 @@ public class SingleStreamRecordingService extends RecordingService { } private void generateIndividualMetadataFile(RecorderEndpointWrapper wrapper) { - this.commonWriteIndividualMetadataFile(wrapper, this.fileWriter::createAndWriteFile); + this.commonWriteIndividualMetadataFile(wrapper, this.fileManager::createAndWriteFile); } private void updateIndividualMetadataFile(RecorderEndpointWrapper wrapper) { - this.commonWriteIndividualMetadataFile(wrapper, this.fileWriter::overwriteFile); + this.commonWriteIndividualMetadataFile(wrapper, this.fileManager::overwriteFile); } private void commonWriteIndividualMetadataFile(RecorderEndpointWrapper wrapper, @@ -439,7 +431,7 @@ public class SingleStreamRecordingService extends RecordingService { // individual recordings) and "size" (sum of all individual recordings size) String folderPath = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/"; - String metadataFilePath = folderPath + RecordingManager.RECORDING_ENTITY_FILE + recording.getId(); + String metadataFilePath = folderPath + RecordingService.RECORDING_ENTITY_FILE + recording.getId(); String syncFilePath = folderPath + recording.getName() + ".json"; recording = this.recordingManager.getRecordingFromEntityFile(new File(metadataFilePath)); @@ -452,7 +444,6 @@ public class SingleStreamRecordingService extends RecordingService { File[] files = folder.listFiles(); Reader reader = null; - Gson gson = new Gson(); // Sync metadata json object to store in "RECORDING_NAME.json" JsonObject json = new JsonObject(); @@ -495,8 +486,9 @@ public class SingleStreamRecordingService extends RecordingService { } json.add("files", jsonArrayFiles); - this.fileWriter.createAndWriteFile(syncFilePath, new GsonBuilder().setPrettyPrinting().create().toJson(json)); - this.generateZipFileAndCleanFolder(folderPath, recording.getName() + ".zip"); + this.fileManager.createAndWriteFile(syncFilePath, new GsonBuilder().setPrettyPrinting().create().toJson(json)); + this.generateZipFileAndCleanFolder(folderPath, + recording.getName() + RecordingService.INDIVIDUAL_RECORDING_COMPRESSED_EXTENSION); double duration = (double) (maxEndTime - minStartTime) / 1000; duration = duration > 0 ? duration : 0; @@ -520,7 +512,7 @@ public class SingleStreamRecordingService extends RecordingService { String fileExtension = FilenameUtils.getExtension(files[i].getName()); if (files[i].isFile() && (fileExtension.equals("json") - || SingleStreamRecordingService.INDIVIDUAL_RECORDING_EXTENSION.equals("." + fileExtension))) { + || RecordingService.INDIVIDUAL_RECORDING_EXTENSION.equals("." + fileExtension))) { // Zip video files and json sync metadata file FileInputStream fis = new FileInputStream(files[i]); @@ -534,7 +526,7 @@ public class SingleStreamRecordingService extends RecordingService { fis.close(); } - if (!files[i].getName().startsWith(RecordingManager.RECORDING_ENTITY_FILE)) { + if (!files[i].getName().startsWith(RecordingService.RECORDING_ENTITY_FILE)) { // Clean inspected file if it is not files[i].delete(); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java b/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java index aec9c8c3..ddbb53f9 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java +++ b/openvidu-server/src/main/java/io/openvidu/server/rest/SessionRestController.java @@ -367,7 +367,7 @@ public class SessionRestController { RecordingProperties recordingProperties; try { - recordingProperties = getRecordingPropertiesFromParams(params, session.getSessionProperties()).build(); + recordingProperties = getRecordingPropertiesFromParams(params, session).build(); } catch (RuntimeException e) { return this.generateErrorResponse(e.getMessage(), "/sessions", HttpStatus.UNPROCESSABLE_ENTITY); } catch (Exception e) { @@ -411,8 +411,13 @@ public class SessionRestController { Session session = sessionManager.getSession(recording.getSessionId()); - Recording stoppedRecording = this.recordingManager.stopRecording(session, recording.getId(), - EndReason.recordingStoppedByServer); + Recording stoppedRecording; + try { + stoppedRecording = this.recordingManager.stopRecording(session, recording.getId(), + EndReason.recordingStoppedByServer); + } catch (Exception e) { + return new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR); + } session.recordingManuallyStopped.set(true); @@ -906,8 +911,8 @@ public class SessionRestController { return builder; } - protected RecordingProperties.Builder getRecordingPropertiesFromParams(Map params, - SessionProperties sessionProperties) throws Exception { + protected RecordingProperties.Builder getRecordingPropertiesFromParams(Map params, Session session) + throws Exception { RecordingProperties.Builder builder = new RecordingProperties.Builder(); @@ -976,7 +981,7 @@ public class SessionRestController { // If outputMode is COMPOSED when defaultOutputMode is COMPOSED_QUICK_START, // change outputMode to COMPOSED_QUICK_START (and vice versa) - OutputMode defaultOutputMode = sessionProperties.defaultOutputMode(); + OutputMode defaultOutputMode = session.getSessionProperties().defaultOutputMode(); if (OutputMode.COMPOSED_QUICK_START.equals(defaultOutputMode) && OutputMode.COMPOSED.equals(finalOutputMode)) { finalOutputMode = OutputMode.COMPOSED_QUICK_START; } else if (OutputMode.COMPOSED.equals(defaultOutputMode) @@ -984,15 +989,17 @@ public class SessionRestController { finalOutputMode = OutputMode.COMPOSED; } - builder.outputMode(finalOutputMode == null ? sessionProperties.defaultOutputMode() : finalOutputMode); + builder.outputMode( + finalOutputMode == null ? session.getSessionProperties().defaultOutputMode() : finalOutputMode); if (RecordingUtils.IS_COMPOSED(finalOutputMode)) { builder.resolution(resolution != null ? resolution : "1920x1080"); // resolution == null ? // sessionProperties.defaultRecordingResolution) // : resolution)); - builder.recordingLayout( - recordingLayout == null ? sessionProperties.defaultRecordingLayout() : recordingLayout); + builder.recordingLayout(recordingLayout == null ? session.getSessionProperties().defaultRecordingLayout() + : recordingLayout); if (RecordingLayout.CUSTOM.equals(recordingLayout)) { - builder.customLayout(customLayout == null ? sessionProperties.defaultCustomLayout() : customLayout); + builder.customLayout( + customLayout == null ? session.getSessionProperties().defaultCustomLayout() : customLayout); } if (shmSize != null) { if (shmSize < 134217728L) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/utils/CustomFileManager.java b/openvidu-server/src/main/java/io/openvidu/server/utils/CustomFileManager.java index 3c0e0546..11f6da46 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/utils/CustomFileManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/utils/CustomFileManager.java @@ -28,7 +28,7 @@ import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class CustomFileManager { +public abstract class CustomFileManager { private static final Logger log = LoggerFactory.getLogger(CustomFileManager.class); @@ -121,4 +121,7 @@ public class CustomFileManager { } } + public abstract void waitForFileToExistAndNotEmpty(String mediaNodeId, String absolutePathToFile, + int maxSecondsWait) throws Exception; + } diff --git a/openvidu-server/src/main/java/io/openvidu/server/utils/DockerManager.java b/openvidu-server/src/main/java/io/openvidu/server/utils/DockerManager.java index 6cbed886..74857a43 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/utils/DockerManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/utils/DockerManager.java @@ -11,16 +11,17 @@ public interface DockerManager { public DockerManager init(); - public String runContainer(String image, String containerName, String user, List volumes, List binds, - String networkMode, List envs, List command, Long shmSize, boolean privileged, - Map labels) throws Exception; + public String runContainer(String mediaNodeId, String image, String containerName, String user, + List volumes, List binds, String networkMode, List envs, List command, + Long shmSize, boolean privileged, Map labels) throws Exception; - public void removeContainer(String containerId, boolean force); + public void removeContainer(String mediaNodeId, String containerId, boolean force); - public void runCommandInContainerSync(String containerId, String command, int secondsOfWait) throws IOException; + public void runCommandInContainerSync(String mediaNodeId, String containerId, String command, int secondsOfWait) + throws IOException; - public void runCommandInContainerAsync(String containerId, String command) throws IOException; + public void runCommandInContainerAsync(String mediaNodeId, String containerId, String command) throws IOException; - public void waitForContainerStopped(String containerId, int secondsOfWait) throws Exception; + public void waitForContainerStopped(String mediaNodeId, String containerId, int secondsOfWait) throws Exception; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/utils/LocalCustomFileManager.java b/openvidu-server/src/main/java/io/openvidu/server/utils/LocalCustomFileManager.java new file mode 100644 index 00000000..b2568767 --- /dev/null +++ b/openvidu-server/src/main/java/io/openvidu/server/utils/LocalCustomFileManager.java @@ -0,0 +1,37 @@ +package io.openvidu.server.utils; + +import java.io.File; + +public class LocalCustomFileManager extends CustomFileManager { + + @Override + public void waitForFileToExistAndNotEmpty(String mediaNodeId, String absolutePathToFile, int maxSeconsWait) + throws Exception { + + // Check 10 times per seconds + int MILLISECONDS_INTERVAL_WAIT = 100; + int LIMIT = maxSeconsWait * 1000 / MILLISECONDS_INTERVAL_WAIT; + int i = 0; + + boolean arePresent = fileExistsAndHasBytes(absolutePathToFile); + while (!arePresent && i < LIMIT) { + try { + Thread.sleep(MILLISECONDS_INTERVAL_WAIT); + arePresent = fileExistsAndHasBytes(absolutePathToFile); + i++; + } catch (InterruptedException e) { + throw new Exception("Interrupted exception while waiting for file " + absolutePathToFile + " to exist"); + } + } + if (!arePresent) { + throw new Exception("File " + absolutePathToFile + " does not exist and hasn't been created in " + + maxSeconsWait + " seconds"); + } + } + + private boolean fileExistsAndHasBytes(String fileName) { + File f = new File(fileName); + return (f.exists() && f.isFile() && f.length() > 0); + } + +} diff --git a/openvidu-server/src/main/java/io/openvidu/server/utils/LocalDockerManager.java b/openvidu-server/src/main/java/io/openvidu/server/utils/LocalDockerManager.java index de6f72ed..0b697b51 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/utils/LocalDockerManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/utils/LocalDockerManager.java @@ -53,15 +53,23 @@ import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.server.recording.service.WaitForContainerStoppedCallback; -public class LocalDockerManager { +public class LocalDockerManager implements DockerManager { - private static final Logger log = LoggerFactory.getLogger(LocalDockerManager.class); + private static final Logger log = LoggerFactory.getLogger(DockerManager.class); private DockerClient dockerClient; - public LocalDockerManager() { + public LocalDockerManager(boolean init) { + if (init) { + this.init(); + } + } + + @Override + public DockerManager init() { DockerClientConfig config = DefaultDockerClientConfig.createDefaultConfigBuilder().build(); this.dockerClient = DockerClientBuilder.getInstance(config).build(); + return this; } public void downloadDockerImage(String image, int secondsOfWait) throws Exception { @@ -107,11 +115,12 @@ public class LocalDockerManager { } } - public String runContainer(String container, String containerName, String user, List volumes, - List binds, String networkMode, List envs, List command, Long shmSize, - boolean privileged, Map labels) throws Exception { + @Override + public String runContainer(String mediaNodeId, String image, String containerName, String user, + List volumes, List binds, String networkMode, List envs, List command, + Long shmSize, boolean privileged, Map labels) throws Exception { - CreateContainerCmd cmd = dockerClient.createContainerCmd(container).withEnv(envs); + CreateContainerCmd cmd = dockerClient.createContainerCmd(image).withEnv(envs); if (containerName != null) { cmd.withName(containerName); } @@ -153,12 +162,13 @@ public class LocalDockerManager { containerName); throw e; } catch (NotFoundException e) { - log.error("Docker image {} couldn't be found in docker host", container); + log.error("Docker image {} couldn't be found in docker host", image); throw e; } } - public void removeDockerContainer(String containerId, boolean force) { + @Override + public void removeContainer(String mediaNodeId, String containerId, boolean force) { dockerClient.removeContainerCmd(containerId).withForce(force).exec(); } @@ -172,15 +182,9 @@ public class LocalDockerManager { } } - public void runCommandInContainer(String containerId, String command) throws InterruptedException { - ExecCreateCmdResponse execCreateCmdResponse = dockerClient.execCreateCmd(containerId).withAttachStdout(true) - .withAttachStderr(true).withCmd("bash", "-c", command).exec(); - dockerClient.execStartCmd(execCreateCmdResponse.getId()).exec(new ExecStartResultCallback() { - }); - } - - public void runCommandInContainerSync(String containerId, String command, int secondsOfWait) - throws InterruptedException { + @Override + public void runCommandInContainerSync(String mediaNodeId, String containerId, String command, int secondsOfWait) + throws IOException { ExecCreateCmdResponse execCreateCmdResponse = dockerClient.execCreateCmd(containerId).withAttachStdout(true) .withAttachStderr(true).withCmd("bash", "-c", command).exec(); CountDownLatch latch = new CountDownLatch(1); @@ -193,12 +197,21 @@ public class LocalDockerManager { try { latch.await(secondsOfWait, TimeUnit.SECONDS); } catch (InterruptedException e) { - throw new InterruptedException("Container " + containerId + " did not return from executing command \"" - + command + "\" in " + secondsOfWait + " seconds"); + throw new IOException("Container " + containerId + " did not return from executing command \"" + command + + "\" in " + secondsOfWait + " seconds"); } } - public void waitForContainerStopped(String containerId, int secondsOfWait) throws Exception { + @Override + public void runCommandInContainerAsync(String mediaNodeId, String containerId, String command) throws IOException { + ExecCreateCmdResponse execCreateCmdResponse = dockerClient.execCreateCmd(containerId).withAttachStdout(true) + .withAttachStderr(true).withCmd("bash", "-c", command).exec(); + dockerClient.execStartCmd(execCreateCmdResponse.getId()).exec(new ExecStartResultCallback() { + }); + } + + @Override + public void waitForContainerStopped(String mediaNodeId, String containerId, int secondsOfWait) throws Exception { CountDownLatch latch = new CountDownLatch(1); WaitForContainerStoppedCallback callback = new WaitForContainerStoppedCallback(latch); dockerClient.waitContainerCmd(containerId).exec(callback); diff --git a/openvidu-server/src/test/java/io/openvidu/server/test/integration/config/IntegrationTestConfiguration.java b/openvidu-server/src/test/java/io/openvidu/server/test/integration/config/IntegrationTestConfiguration.java index c98b1f14..56b022b2 100644 --- a/openvidu-server/src/test/java/io/openvidu/server/test/integration/config/IntegrationTestConfiguration.java +++ b/openvidu-server/src/test/java/io/openvidu/server/test/integration/config/IntegrationTestConfiguration.java @@ -38,7 +38,8 @@ public class IntegrationTestConfiguration { List successfullyConnectedKmss = new ArrayList<>(); List kmsProperties = invocation.getArgument(0); for (KmsProperties kmsProp : kmsProperties) { - Kms kms = new Kms(kmsProp, Whitebox.getInternalState(spy, "loadManager")); + Kms kms = new Kms(kmsProp, Whitebox.getInternalState(spy, "loadManager"), + Whitebox.getInternalState(spy, "quarantineKiller")); KurentoClient kClient = mock(KurentoClient.class); doAnswer(i -> {