openvidu-server: foundation for external composed recording

pull/570/head
pabloFuente 2020-11-25 21:02:29 +01:00
parent d3c75a455f
commit 58a67da150
21 changed files with 489 additions and 306 deletions

View File

@ -51,7 +51,9 @@ public class OpenViduException extends JsonRpcErrorException {
RECORDING_DELETE_ERROR_CODE(706), RECORDING_LIST_ERROR_CODE(705), RECORDING_STOP_ERROR_CODE(704), RECORDING_DELETE_ERROR_CODE(706), RECORDING_LIST_ERROR_CODE(705), RECORDING_STOP_ERROR_CODE(704),
RECORDING_START_ERROR_CODE(703), RECORDING_REPORT_ERROR_CODE(702), RECORDING_COMPLETION_ERROR_CODE(701), RECORDING_START_ERROR_CODE(703), RECORDING_REPORT_ERROR_CODE(702), RECORDING_COMPLETION_ERROR_CODE(701),
FORCED_CODEC_NOT_FOUND_IN_SDPOFFER(800); FORCED_CODEC_NOT_FOUND_IN_SDPOFFER(800),
MEDIA_NODE_NOT_FOUND(900), MEDIA_NODE_STATUS_WRONG(901);
private int value; private int value;

View File

@ -75,6 +75,8 @@ import io.openvidu.server.rpc.RpcNotificationService;
import io.openvidu.server.utils.CommandExecutor; import io.openvidu.server.utils.CommandExecutor;
import io.openvidu.server.utils.GeoLocationByIp; import io.openvidu.server.utils.GeoLocationByIp;
import io.openvidu.server.utils.GeoLocationByIpDummy; import io.openvidu.server.utils.GeoLocationByIpDummy;
import io.openvidu.server.utils.LocalCustomFileManager;
import io.openvidu.server.utils.LocalDockerManager;
import io.openvidu.server.utils.MediaNodeStatusManager; import io.openvidu.server.utils.MediaNodeStatusManager;
import io.openvidu.server.utils.MediaNodeStatusManagerDummy; import io.openvidu.server.utils.MediaNodeStatusManagerDummy;
import io.openvidu.server.utils.QuarantineKiller; import io.openvidu.server.utils.QuarantineKiller;
@ -170,7 +172,7 @@ public class OpenViduServer implements JsonRpcConfigurer {
@ConditionalOnMissingBean @ConditionalOnMissingBean
@DependsOn("openviduConfig") @DependsOn("openviduConfig")
public RecordingManager recordingManager() { public RecordingManager recordingManager() {
return new RecordingManager(); return new RecordingManager(new LocalDockerManager(false), new LocalCustomFileManager());
} }
@Bean @Bean

View File

@ -37,7 +37,6 @@ import java.util.Map;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import io.openvidu.java.client.VideoCodec;
import org.apache.commons.io.FilenameUtils; import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header; import org.apache.http.Header;
@ -55,6 +54,7 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonSyntaxException; import com.google.gson.JsonSyntaxException;
import io.openvidu.java.client.OpenViduRole; import io.openvidu.java.client.OpenViduRole;
import io.openvidu.java.client.VideoCodec;
import io.openvidu.server.OpenViduServer; import io.openvidu.server.OpenViduServer;
import io.openvidu.server.cdr.CDREventName; import io.openvidu.server.cdr.CDREventName;
import io.openvidu.server.config.Dotenv.DotenvFormatException; import io.openvidu.server.config.Dotenv.DotenvFormatException;
@ -239,6 +239,10 @@ public class OpenviduConfig {
return openViduRecordingDebug; return openViduRecordingDebug;
} }
public boolean isRecordingComposedExternal() {
return false;
}
public String getOpenViduRecordingPath() { public String getOpenViduRecordingPath() {
return this.openviduRecordingPath; return this.openviduRecordingPath;
} }

View File

@ -31,7 +31,6 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import io.openvidu.server.utils.SDPMunging;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.kurento.client.GenericMediaElement; import org.kurento.client.GenericMediaElement;
import org.kurento.client.IceCandidate; import org.kurento.client.IceCandidate;
@ -74,6 +73,7 @@ import io.openvidu.server.kurento.kms.KmsManager;
import io.openvidu.server.rpc.RpcHandler; import io.openvidu.server.rpc.RpcHandler;
import io.openvidu.server.utils.GeoLocation; import io.openvidu.server.utils.GeoLocation;
import io.openvidu.server.utils.JsonUtils; import io.openvidu.server.utils.JsonUtils;
import io.openvidu.server.utils.SDPMunging;
public class KurentoSessionManager extends SessionManager { public class KurentoSessionManager extends SessionManager {
@ -370,8 +370,8 @@ public class KurentoSessionManager extends SessionManager {
// Modify sdp if forced codec is defined // Modify sdp if forced codec is defined
if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) { if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) {
kurentoOptions.sdpOffer = sdpMunging.forceCodec(participant, kurentoOptions.sdpOffer, kurentoOptions.isOffer, kurentoOptions.sdpOffer = sdpMunging.forceCodec(participant, kurentoOptions.sdpOffer,
kSession, true, false, isTranscodingAllowed, forcedVideoCodec); kurentoOptions.isOffer, kSession, true, false, isTranscodingAllowed, forcedVideoCodec);
} }
log.debug( log.debug(
"Request [PUBLISH_MEDIA] isOffer={} sdp={} " "Request [PUBLISH_MEDIA] isOffer={} sdp={} "
@ -435,7 +435,7 @@ public class KurentoSessionManager extends SessionManager {
.customLayout(kSession.getSessionProperties().defaultCustomLayout()) .customLayout(kSession.getSessionProperties().defaultCustomLayout())
.resolution(/* .resolution(/*
* kSession.getSessionProperties().defaultRecordingResolution() * kSession.getSessionProperties().defaultRecordingResolution()
*/"1920x1080").build()); */"1920x1080").mediaNode(kSession.getMediaNodeId()).build());
}).start(); }).start();
} else if (recordingManager.sessionIsBeingRecorded(kSession.getSessionId())) { } else if (recordingManager.sessionIsBeingRecorded(kSession.getSessionId())) {
// Abort automatic recording stop thread for any recorded session in which a // Abort automatic recording stop thread for any recorded session in which a
@ -1079,8 +1079,8 @@ public class KurentoSessionManager extends SessionManager {
// Modify sdp if forced codec is defined // Modify sdp if forced codec is defined
if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) { if (forcedVideoCodec != VideoCodec.NONE && !participant.isIpcam()) {
sdpOffer = sdpMunging.forceCodec(participant, sdpOffer, true, kSession, isPublisher, sdpOffer = sdpMunging.forceCodec(participant, sdpOffer, true, kSession, isPublisher, true,
true, isTranscodingAllowed, forcedVideoCodec); isTranscodingAllowed, forcedVideoCodec);
} }
if (isPublisher) { if (isPublisher) {
@ -1102,7 +1102,8 @@ public class KurentoSessionManager extends SessionManager {
kParticipant.createPublishingEndpoint(kurentoOptions, streamId); kParticipant.createPublishingEndpoint(kurentoOptions, streamId);
SdpType sdpType = kurentoOptions.isOffer ? SdpType.OFFER : SdpType.ANSWER; SdpType sdpType = kurentoOptions.isOffer ? SdpType.OFFER : SdpType.ANSWER;
String sdpAnswer = kParticipant.publishToRoom(sdpType, sdpOffer, kurentoOptions.doLoopback, true); String sdpAnswer = kParticipant.publishToRoom(sdpType, sdpOffer, kurentoOptions.doLoopback, true);
log.debug("SDP Answer for publishing reconnection PARTICIPANT {}: {}", participant.getParticipantPublicId(), sdpAnswer); log.debug("SDP Answer for publishing reconnection PARTICIPANT {}: {}", participant.getParticipantPublicId(),
sdpAnswer);
sessionEventsHandler.onPublishMedia(participant, participant.getPublisherStreamId(), sessionEventsHandler.onPublishMedia(participant, participant.getPublisherStreamId(),
kParticipant.getPublisher().createdAt(), kSession.getSessionId(), kurentoOptions, sdpAnswer, kParticipant.getPublisher().createdAt(), kSession.getSessionId(), kurentoOptions, sdpAnswer,
new HashSet<Participant>(), transactionId, null); new HashSet<Participant>(), transactionId, null);
@ -1120,7 +1121,8 @@ public class KurentoSessionManager extends SessionManager {
"Unable to generate SDP answer when reconnecting subscriber to '" + streamId + "'"); "Unable to generate SDP answer when reconnecting subscriber to '" + streamId + "'");
} }
log.debug("SDP Answer for subscribing reconnection PARTICIPANT {}: {}", participant.getParticipantPublicId(), sdpAnswer); log.debug("SDP Answer for subscribing reconnection PARTICIPANT {}: {}",
participant.getParticipantPublicId(), sdpAnswer);
sessionEventsHandler.onSubscribe(participant, kSession, sdpAnswer, transactionId, null); sessionEventsHandler.onSubscribe(participant, kSession, sdpAnswer, transactionId, null);
} else { } else {
throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE,

View File

@ -23,19 +23,17 @@ import java.util.List;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import org.apache.commons.lang3.RandomStringUtils;
import org.kurento.client.KurentoClient; import org.kurento.client.KurentoClient;
import org.kurento.commons.exception.KurentoException; import org.kurento.commons.exception.KurentoException;
import io.openvidu.server.core.IdentifierPrefixes;
public class FixedOneKmsManager extends KmsManager { public class FixedOneKmsManager extends KmsManager {
@Override @Override
public List<Kms> initializeKurentoClients(List<KmsProperties> kmsProperties, boolean disconnectUponFailure) throws Exception { public List<Kms> initializeKurentoClients(List<KmsProperties> kmsProperties, boolean disconnectUponFailure)
throws Exception {
KmsProperties firstProps = kmsProperties.get(0); KmsProperties firstProps = kmsProperties.get(0);
KurentoClient kClient = null; KurentoClient kClient = null;
Kms kms = new Kms(firstProps, loadManager); Kms kms = new Kms(firstProps, loadManager, quarantineKiller);
try { try {
kClient = KurentoClient.create(firstProps.getUri(), this.generateKurentoConnectionListener(kms.getId())); kClient = KurentoClient.create(firstProps.getUri(), this.generateKurentoConnectionListener(kms.getId()));
this.addKms(kms); this.addKms(kms);
@ -55,6 +53,11 @@ public class FixedOneKmsManager extends KmsManager {
return Arrays.asList(kms); return Arrays.asList(kms);
} }
@Override
public boolean isMediaNodeRunning(String mediaNodeId) {
return true;
}
@Override @Override
@PostConstruct @PostConstruct
protected void postConstructInitKurentoClients() { protected void postConstructInitKurentoClients() {

View File

@ -36,6 +36,7 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.utils.QuarantineKiller;
/** /**
* Abstraction of a KMS instance: an object of this class corresponds to a KMS * Abstraction of a KMS instance: an object of this class corresponds to a KMS
@ -57,6 +58,7 @@ public class Kms {
private String ip; private String ip;
private KurentoClient client; private KurentoClient client;
private LoadManager loadManager; private LoadManager loadManager;
private QuarantineKiller quarantineKiller;
private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false); private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false);
private AtomicLong timeOfKurentoClientConnection = new AtomicLong(0); private AtomicLong timeOfKurentoClientConnection = new AtomicLong(0);
@ -65,7 +67,7 @@ public class Kms {
private Map<String, KurentoSession> kurentoSessions = new ConcurrentHashMap<>(); private Map<String, KurentoSession> kurentoSessions = new ConcurrentHashMap<>();
private AtomicInteger activeRecordings = new AtomicInteger(0); private AtomicInteger activeRecordings = new AtomicInteger(0);
public Kms(KmsProperties props, LoadManager loadManager) { public Kms(KmsProperties props, LoadManager loadManager, QuarantineKiller quarantineKiller) {
this.id = props.getId(); this.id = props.getId();
this.uri = props.getUri(); this.uri = props.getUri();
@ -79,6 +81,7 @@ public class Kms {
this.ip = url.getHost(); this.ip = url.getHost();
this.loadManager = loadManager; this.loadManager = loadManager;
this.quarantineKiller = quarantineKiller;
} }
public void setKurentoClient(KurentoClient client) { public void setKurentoClient(KurentoClient client) {
@ -145,8 +148,17 @@ public class Kms {
this.kurentoSessions.remove(sessionId); this.kurentoSessions.remove(sessionId);
} }
public AtomicInteger getActiveRecordings() { public synchronized int getActiveRecordings() {
return this.activeRecordings; return this.activeRecordings.get();
}
public synchronized int incrementActiveRecordings() {
return this.activeRecordings.incrementAndGet();
}
public synchronized void decrementActiveRecordings() {
this.activeRecordings.updateAndGet(i -> i > 0 ? i - 1 : i);
this.quarantineKiller.dropMediaNode(this.id);
} }
public JsonObject toJson() { public JsonObject toJson() {

View File

@ -46,6 +46,7 @@ import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.core.IdentifierPrefixes; import io.openvidu.server.core.IdentifierPrefixes;
import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.utils.MediaNodeStatusManager; import io.openvidu.server.utils.MediaNodeStatusManager;
import io.openvidu.server.utils.QuarantineKiller;
import io.openvidu.server.utils.UpdatableTimerTask; import io.openvidu.server.utils.UpdatableTimerTask;
public abstract class KmsManager { public abstract class KmsManager {
@ -101,6 +102,9 @@ public abstract class KmsManager {
@Autowired @Autowired
protected LoadManager loadManager; protected LoadManager loadManager;
@Autowired
protected QuarantineKiller quarantineKiller;
@Autowired @Autowired
protected MediaNodeStatusManager mediaNodeStatusManager; protected MediaNodeStatusManager mediaNodeStatusManager;
@ -353,6 +357,8 @@ public abstract class KmsManager {
public abstract List<Kms> initializeKurentoClients(List<KmsProperties> kmsProperties, boolean disconnectUponFailure) public abstract List<Kms> initializeKurentoClients(List<KmsProperties> kmsProperties, boolean disconnectUponFailure)
throws Exception; throws Exception;
public abstract boolean isMediaNodeRunning(String mediaNodeId);
@PostConstruct @PostConstruct
protected abstract void postConstructInitKurentoClients(); protected abstract void postConstructInitKurentoClients();

View File

@ -23,7 +23,7 @@ import org.kurento.client.RecorderEndpoint;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import io.openvidu.server.kurento.core.KurentoParticipant; import io.openvidu.server.kurento.core.KurentoParticipant;
import io.openvidu.server.recording.service.SingleStreamRecordingService; import io.openvidu.server.recording.service.RecordingService;
public class RecorderEndpointWrapper { public class RecorderEndpointWrapper {
@ -61,7 +61,7 @@ public class RecorderEndpointWrapper {
public RecorderEndpointWrapper(JsonObject json) { public RecorderEndpointWrapper(JsonObject json) {
String nameAux = json.get("name").getAsString(); String nameAux = json.get("name").getAsString();
// If the name includes the extension, remove it // If the name includes the extension, remove it
this.name = StringUtils.removeEnd(nameAux, SingleStreamRecordingService.INDIVIDUAL_RECORDING_EXTENSION); this.name = StringUtils.removeEnd(nameAux, RecordingService.INDIVIDUAL_RECORDING_EXTENSION);
this.connectionId = json.get("connectionId").getAsString(); this.connectionId = json.get("connectionId").getAsString();
this.streamId = json.get("streamId").getAsString(); this.streamId = json.get("streamId").getAsString();
this.clientData = (json.has("clientData") && !json.get("clientData").isJsonNull()) this.clientData = (json.has("clientData") && !json.get("clientData").isJsonNull())
@ -91,7 +91,7 @@ public class RecorderEndpointWrapper {
} }
public String getNameWithExtension() { public String getNameWithExtension() {
return this.name + SingleStreamRecordingService.INDIVIDUAL_RECORDING_EXTENSION; return this.name + RecordingService.INDIVIDUAL_RECORDING_EXTENSION;
} }
public String getConnectionId() { public String getConnectionId() {

View File

@ -61,7 +61,7 @@ public class Recording {
try { try {
this.duration = json.get("duration").getAsDouble(); this.duration = json.get("duration").getAsDouble();
} catch (Exception e) { } catch (Exception e) {
this.duration = new Long((long) json.get("duration").getAsLong()).doubleValue(); this.duration = Long.valueOf((long) json.get("duration").getAsLong()).doubleValue();
} }
if (json.get("url").isJsonNull()) { if (json.get("url").isJsonNull()) {
this.url = null; this.url = null;

View File

@ -1,7 +1,15 @@
package io.openvidu.server.recording.service; package io.openvidu.server.recording.service;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -10,6 +18,7 @@ import com.github.dockerjava.api.model.Bind;
import com.github.dockerjava.api.model.Volume; import com.github.dockerjava.api.model.Volume;
import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code;
import io.openvidu.java.client.RecordingProperties; import io.openvidu.java.client.RecordingProperties;
import io.openvidu.server.cdr.CallDetailRecord; import io.openvidu.server.cdr.CallDetailRecord;
import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.config.OpenviduConfig;
@ -18,16 +27,18 @@ import io.openvidu.server.core.Session;
import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.Recording;
import io.openvidu.server.recording.RecordingDownloader; import io.openvidu.server.recording.RecordingDownloader;
import io.openvidu.server.recording.RecordingUploader; import io.openvidu.server.recording.RecordingUploader;
import io.openvidu.server.utils.QuarantineKiller; import io.openvidu.server.utils.CustomFileManager;
import io.openvidu.server.utils.DockerManager;
public class ComposedQuickStartRecordingService extends ComposedRecordingService { public class ComposedQuickStartRecordingService extends ComposedRecordingService {
private static final Logger log = LoggerFactory.getLogger(ComposedRecordingService.class); private static final Logger log = LoggerFactory.getLogger(ComposedRecordingService.class);
public ComposedQuickStartRecordingService(RecordingManager recordingManager, public ComposedQuickStartRecordingService(RecordingManager recordingManager,
RecordingDownloader recordingDownloader, RecordingUploader recordingUploader, OpenviduConfig openviduConfig, RecordingDownloader recordingDownloader, RecordingUploader recordingUploader, CustomFileManager fileManager,
CallDetailRecord cdr, QuarantineKiller quarantineKiller) { OpenviduConfig openviduConfig, CallDetailRecord cdr, DockerManager dockerManager) {
super(recordingManager, recordingDownloader, recordingUploader, openviduConfig, cdr, quarantineKiller); super(recordingManager, recordingDownloader, recordingUploader, fileManager, openviduConfig, cdr,
dockerManager);
} }
public void stopRecordingContainer(Session session, EndReason reason) { public void stopRecordingContainer(Session session, EndReason reason) {
@ -38,7 +49,7 @@ public class ComposedQuickStartRecordingService extends ComposedRecordingService
if (containerId != null) { if (containerId != null) {
try { try {
dockerManager.removeDockerContainer(containerId, true); dockerManager.removeContainer(session.getMediaNodeId(), containerId, true);
} catch (Exception e) { } catch (Exception e) {
log.error("Can't remove COMPOSED_QUICK_START recording container from session {}", log.error("Can't remove COMPOSED_QUICK_START recording container from session {}",
session.getSessionId()); session.getSessionId());
@ -78,7 +89,8 @@ public class ComposedQuickStartRecordingService extends ComposedRecordingService
recordExecCommand += "export " + envs.get(i) + " "; recordExecCommand += "export " + envs.get(i) + " ";
} }
recordExecCommand += "&& ./composed_quick_start.sh --start-recording > /var/log/ffmpeg.log 2>&1 &"; recordExecCommand += "&& ./composed_quick_start.sh --start-recording > /var/log/ffmpeg.log 2>&1 &";
dockerManager.runCommandInContainer(containerId, recordExecCommand); dockerManager.runCommandInContainerAsync(recording.getRecordingProperties().mediaNode(), containerId,
recordExecCommand);
} catch (Exception e) { } catch (Exception e) {
this.cleanRecordingMaps(recording); this.cleanRecordingMaps(recording);
throw this.failStartRecording(session, recording, throw this.failStartRecording(session, recording,
@ -89,12 +101,16 @@ public class ComposedQuickStartRecordingService extends ComposedRecordingService
try { try {
this.waitForVideoFileNotEmpty(recording); this.waitForVideoFileNotEmpty(recording);
} catch (OpenViduException e) { } catch (Exception e) {
this.cleanRecordingMaps(recording); this.cleanRecordingMaps(recording);
throw this.failStartRecording(session, recording, throw this.failStartRecording(session, recording,
"Couldn't initialize recording container. Error: " + e.getMessage()); "Couldn't initialize recording container. Error: " + e.getMessage());
} }
if (this.openviduConfig.isRecordingComposedExternal()) {
this.generateRecordingMetadataFile(recording);
}
return recording; return recording;
} }
@ -103,7 +119,7 @@ public class ComposedQuickStartRecordingService extends ComposedRecordingService
log.info("Stopping COMPOSED_QUICK_START ({}) recording {} of session {}. Reason: {}", log.info("Stopping COMPOSED_QUICK_START ({}) recording {} of session {}. Reason: {}",
recording.hasAudio() ? "video + audio" : "audio-only", recording.getId(), recording.getSessionId(), recording.hasAudio() ? "video + audio" : "audio-only", recording.getId(), recording.getSessionId(),
RecordingManager.finalReason(reason)); RecordingManager.finalReason(reason));
log.info("Container for session {} still being ready for new recordings", recording.getSessionId()); log.info("Container for session {} still ready for new recordings", recording.getSessionId());
String containerId = this.sessionsContainers.get(recording.getSessionId()); String containerId = this.sessionsContainers.get(recording.getSessionId());
@ -116,29 +132,28 @@ public class ComposedQuickStartRecordingService extends ComposedRecordingService
} }
try { try {
dockerManager.runCommandInContainerSync(containerId, "./composed_quick_start.sh --stop-recording", 10); dockerManager.runCommandInContainerSync(recording.getRecordingProperties().mediaNode(), containerId,
} catch (InterruptedException e1) { "./composed_quick_start.sh --stop-recording", 10);
cleanRecordingMaps(recording); } catch (IOException e1) {
log.error("Error stopping recording for session id: {}", session.getSessionId()); log.error("Error stopping COMPOSED_QUICK_START recording {}: {}", recording.getId(), e1.getMessage());
e1.printStackTrace(); failRecordingCompletion(recording, containerId, true,
new OpenViduException(Code.RECORDING_COMPLETION_ERROR_CODE, e1.getMessage()));
} }
updateRecordingAttributes(recording); if (this.openviduConfig.isRecordingComposedExternal()) {
try {
this.sealRecordingMetadataFileAsReady(recording, recording.getSize(), recording.getDuration(), waitForComposedQuickStartFiles(recording);
getMetadataFilePath(recording)); } catch (Exception e) {
cleanRecordingMaps(recording); failRecordingCompletion(recording, containerId, false,
new OpenViduException(Code.RECORDING_COMPLETION_ERROR_CODE, e.getMessage()));
}
}
if (session != null && reason != null) { if (session != null && reason != null) {
this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, recording, reason); this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, recording, reason);
} }
final Recording[] finalRecordingArray = new Recording[1]; downloadComposedRecording(recording, reason);
finalRecordingArray[0] = recording;
this.uploadRecording(finalRecordingArray[0], reason);
// Decrement active recordings
// ((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet();
return recording; return recording;
} }
@ -166,7 +181,7 @@ public class ComposedQuickStartRecordingService extends ComposedRecordingService
.customLayout(recorderSession.getSessionProperties().defaultCustomLayout()) .customLayout(recorderSession.getSessionProperties().defaultCustomLayout())
.resolution( .resolution(
/* recorderSession.getSessionProperties().defaultRecordingResolution() */"1920x1080") /* recorderSession.getSessionProperties().defaultRecordingResolution() */"1920x1080")
.build()); .mediaNode(recorderSession.getMediaNodeId()).build());
log.info("COMPOSED_QUICK_START recording container launched for session: {}", log.info("COMPOSED_QUICK_START recording container launched for session: {}",
recorderSession.getSessionId()); recorderSession.getSessionId());
launched = true; launched = true;
@ -213,13 +228,13 @@ public class ComposedQuickStartRecordingService extends ComposedRecordingService
Bind bind1 = new Bind(openviduConfig.getOpenViduRecordingPath(), volume1); Bind bind1 = new Bind(openviduConfig.getOpenViduRecordingPath(), volume1);
List<Bind> binds = new ArrayList<>(); List<Bind> binds = new ArrayList<>();
binds.add(bind1); binds.add(bind1);
containerId = dockerManager.runContainer(container, containerName, null, volumes, binds, "host", envs, null, containerId = dockerManager.runContainer(properties.mediaNode(), container, containerName, null, volumes,
properties.shmSize(), false, null); binds, "host", envs, null, properties.shmSize(), false, null);
containers.put(containerId, containerName); containers.put(containerId, containerName);
this.sessionsContainers.put(session.getSessionId(), containerId); this.sessionsContainers.put(session.getSessionId(), containerId);
} catch (Exception e) { } catch (Exception e) {
if (containerId != null) { if (containerId != null) {
dockerManager.removeDockerContainer(containerId, true); dockerManager.removeContainer(properties.mediaNode(), containerId, true);
containers.remove(containerId); containers.remove(containerId);
sessionsContainers.remove(session.getSessionId()); sessionsContainers.remove(session.getSessionId());
} }
@ -229,4 +244,65 @@ public class ComposedQuickStartRecordingService extends ComposedRecordingService
return containerId; return containerId;
} }
private void waitForComposedQuickStartFiles(Recording recording) throws Exception {
final int SECONDS_MAX_WAIT = 30;
final String PATH = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/";
// Waiting for the files generated at the end of the stopping process: the
// ffprobe info and the thumbnail
final String INFO_FILE = PATH + recording.getId() + RecordingService.COMPOSED_INFO_FILE_EXTENSION;
final String THUMBNAIL_FILE = PATH + recording.getId() + RecordingService.COMPOSED_THUMBNAIL_EXTENSION;
Set<String> filesToWaitFor = Stream.of(INFO_FILE, THUMBNAIL_FILE).collect(Collectors.toSet());
Collection<Thread> waitForFileThreads = new HashSet<>();
CountDownLatch latch = new CountDownLatch(filesToWaitFor.size());
for (final String file : filesToWaitFor) {
Thread downloadThread = new Thread() {
@Override
public void run() {
try {
fileManager.waitForFileToExistAndNotEmpty(recording.getRecordingProperties().mediaNode(), file,
SECONDS_MAX_WAIT);
} catch (Exception e) {
log.error(e.getMessage());
recording.setStatus(io.openvidu.java.client.Recording.Status.failed);
} finally {
latch.countDown();
}
}
};
waitForFileThreads.add(downloadThread);
}
waitForFileThreads.forEach(t -> t.start());
try {
if (!latch.await(SECONDS_MAX_WAIT, TimeUnit.SECONDS)) {
recording.setStatus(io.openvidu.java.client.Recording.Status.failed);
String msg = "The wait for files of COMPOSED_QUICK_START recording " + recording.getId()
+ " didn't complete in " + SECONDS_MAX_WAIT + " seconds";
log.error(msg);
throw new Exception(msg);
} else {
if (io.openvidu.java.client.Recording.Status.failed.equals(recording.getStatus())) {
String msg = "Error waiting for some COMPOSED_QUICK_START recording file in recording "
+ recording.getId();
log.error(msg);
throw new Exception(msg);
} else {
log.info("All files of COMPOSED_QUICK_START recording {} are available to download",
recording.getId());
}
}
} catch (InterruptedException e) {
recording.setStatus(io.openvidu.java.client.Recording.Status.failed);
String msg = "Error waiting for COMPOSED_QUICK_START recording files of recording " + recording.getId()
+ ": " + e.getMessage();
log.error(msg);
throw new Exception(msg);
}
}
} }

View File

@ -58,8 +58,8 @@ import io.openvidu.server.recording.RecordingDownloader;
import io.openvidu.server.recording.RecordingInfoUtils; import io.openvidu.server.recording.RecordingInfoUtils;
import io.openvidu.server.recording.RecordingUploader; import io.openvidu.server.recording.RecordingUploader;
import io.openvidu.server.rest.RequestMappings; import io.openvidu.server.rest.RequestMappings;
import io.openvidu.server.utils.LocalDockerManager; import io.openvidu.server.utils.CustomFileManager;
import io.openvidu.server.utils.QuarantineKiller; import io.openvidu.server.utils.DockerManager;
public class ComposedRecordingService extends RecordingService { public class ComposedRecordingService extends RecordingService {
@ -69,13 +69,13 @@ public class ComposedRecordingService extends RecordingService {
protected Map<String, String> sessionsContainers = new ConcurrentHashMap<>(); protected Map<String, String> sessionsContainers = new ConcurrentHashMap<>();
private Map<String, CompositeWrapper> composites = new ConcurrentHashMap<>(); private Map<String, CompositeWrapper> composites = new ConcurrentHashMap<>();
protected LocalDockerManager dockerManager; protected DockerManager dockerManager;
public ComposedRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader, public ComposedRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader,
RecordingUploader recordingUploader, OpenviduConfig openviduConfig, CallDetailRecord cdr, RecordingUploader recordingUploader, CustomFileManager fileManager, OpenviduConfig openviduConfig,
QuarantineKiller quarantineKiller) { CallDetailRecord cdr, DockerManager dockerManager) {
super(recordingManager, recordingDownloader, recordingUploader, openviduConfig, cdr, quarantineKiller); super(recordingManager, recordingDownloader, recordingUploader, fileManager, openviduConfig, cdr);
this.dockerManager = new LocalDockerManager(); this.dockerManager = dockerManager;
} }
@Override @Override
@ -98,9 +98,6 @@ public class ComposedRecordingService extends RecordingService {
recording = this.startRecordingAudioOnly(session, recording, properties); recording = this.startRecordingAudioOnly(session, recording, properties);
} }
// Increment active recordings
// ((KurentoSession) session).getKms().getActiveRecordings().incrementAndGet();
return recording; return recording;
} }
@ -177,8 +174,8 @@ public class ComposedRecordingService extends RecordingService {
Bind bind1 = new Bind(openviduConfig.getOpenViduRecordingPath(), volume1); Bind bind1 = new Bind(openviduConfig.getOpenViduRecordingPath(), volume1);
List<Bind> binds = new ArrayList<>(); List<Bind> binds = new ArrayList<>();
binds.add(bind1); binds.add(bind1);
containerId = dockerManager.runContainer(container, containerName, null, volumes, binds, "host", envs, null, containerId = dockerManager.runContainer(properties.mediaNode(), container, containerName, null, volumes,
properties.shmSize(), false, null); binds, "host", envs, null, properties.shmSize(), false, null);
containers.put(containerId, containerName); containers.put(containerId, containerName);
} catch (Exception e) { } catch (Exception e) {
this.cleanRecordingMaps(recording); this.cleanRecordingMaps(recording);
@ -190,12 +187,16 @@ public class ComposedRecordingService extends RecordingService {
try { try {
this.waitForVideoFileNotEmpty(recording); this.waitForVideoFileNotEmpty(recording);
} catch (OpenViduException e) { } catch (Exception e) {
this.cleanRecordingMaps(recording); this.cleanRecordingMaps(recording);
throw this.failStartRecording(session, recording, throw this.failStartRecording(session, recording,
"Couldn't initialize recording container. Error: " + e.getMessage()); "Couldn't initialize recording container. Error: " + e.getMessage());
} }
if (this.openviduConfig.isRecordingComposedExternal()) {
this.generateRecordingMetadataFile(recording);
}
return recording; return recording;
} }
@ -224,9 +225,6 @@ public class ComposedRecordingService extends RecordingService {
this.generateRecordingMetadataFile(recording); this.generateRecordingMetadataFile(recording);
// Increment active recordings
((KurentoSession) session).getKms().getActiveRecordings().incrementAndGet();
return recording; return recording;
} }
@ -273,7 +271,8 @@ public class ComposedRecordingService extends RecordingService {
} else { } else {
log.warn("Removing container {} for closed session {}...", containerIdAux, log.warn("Removing container {} for closed session {}...", containerIdAux,
session.getSessionId()); session.getSessionId());
dockerManager.removeDockerContainer(containerIdAux, true); dockerManager.removeContainer(recordingAux.getRecordingProperties().mediaNode(),
containerIdAux, true);
containers.remove(containerId); containers.remove(containerId);
containerClosed = true; containerClosed = true;
log.warn("Container {} for closed session {} succesfully stopped and removed", log.warn("Container {} for closed session {} succesfully stopped and removed",
@ -293,29 +292,18 @@ public class ComposedRecordingService extends RecordingService {
return; return;
} }
// Decrement active recordings // Decrement active recordings
// ((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet(); ((KurentoSession) session).getKms().decrementActiveRecordings();
}).start(); }).start();
} }
} else { } else {
stopAndRemoveRecordingContainer(recording, containerId, 30); stopAndRemoveRecordingContainer(recording, containerId, 30);
updateRecordingAttributes(recording);
this.sealRecordingMetadataFileAsReady(recording, recording.getSize(), recording.getDuration(),
getMetadataFilePath(recording));
cleanRecordingMaps(recording);
if (session != null && reason != null) { if (session != null && reason != null) {
this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, recording, reason); this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, recording, reason);
} }
// Upload if necessary downloadComposedRecording(session, recording, reason);
final Recording[] finalRecordingArray = new Recording[1];
finalRecordingArray[0] = recording;
this.uploadRecording(finalRecordingArray[0], reason);
// Decrement active recordings
// ((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet();
} }
return recording; return recording;
@ -370,13 +358,12 @@ public class ComposedRecordingService extends RecordingService {
this.updateFilePermissions(filesPath); this.updateFilePermissions(filesPath);
finalRecordingArray[0] = this.sealRecordingMetadataFileAsReady(finalRecordingArray[0], finalSize, finalRecordingArray[0] = this.sealRecordingMetadataFileAsReady(finalRecordingArray[0], finalSize,
finalDuration, finalDuration,
filesPath + RecordingManager.RECORDING_ENTITY_FILE + finalRecordingArray[0].getId()); filesPath + RecordingService.RECORDING_ENTITY_FILE + finalRecordingArray[0].getId());
// Decrement active recordings once it is downloaded // Decrement active recordings once it is downloaded. This method will also drop
((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet(); // the Media Node if no more sessions or recordings and status is
// waiting-idle-to-terminate
// Now we can drop Media Node if waiting-idle-to-terminate ((KurentoSession) session).getKms().decrementActiveRecordings();
this.quarantineKiller.dropMediaNode(session.getMediaNodeId());
// Upload if necessary // Upload if necessary
this.uploadRecording(finalRecordingArray[0], reason); this.uploadRecording(finalRecordingArray[0], reason);
@ -397,29 +384,32 @@ public class ComposedRecordingService extends RecordingService {
private void stopAndRemoveRecordingContainer(Recording recording, String containerId, int secondsOfWait) { private void stopAndRemoveRecordingContainer(Recording recording, String containerId, int secondsOfWait) {
// Gracefully stop ffmpeg process // Gracefully stop ffmpeg process
try { try {
dockerManager.runCommandInContainer(containerId, "echo 'q' > stop"); dockerManager.runCommandInContainerAsync(recording.getRecordingProperties().mediaNode(), containerId,
} catch (InterruptedException e1) { "echo 'q' > stop");
} catch (IOException e1) {
e1.printStackTrace(); e1.printStackTrace();
} }
// Wait for the container to be gracefully self-stopped // Wait for the container to be gracefully self-stopped
final int timeOfWait = 30; final int timeOfWait = 30;
try { try {
dockerManager.waitForContainerStopped(containerId, timeOfWait); dockerManager.waitForContainerStopped(recording.getRecordingProperties().mediaNode(), containerId,
timeOfWait);
} catch (Exception e) { } catch (Exception e) {
failRecordingCompletion(recording, containerId, new OpenViduException(Code.RECORDING_COMPLETION_ERROR_CODE, failRecordingCompletion(recording, containerId, true,
"The recording completion process couldn't finish in " + timeOfWait + " seconds")); new OpenViduException(Code.RECORDING_COMPLETION_ERROR_CODE,
"The recording completion process couldn't finish in " + timeOfWait + " seconds"));
} }
// Remove container // Remove container
dockerManager.removeDockerContainer(containerId, false); dockerManager.removeContainer(recording.getRecordingProperties().mediaNode(), containerId, false);
containers.remove(containerId); containers.remove(containerId);
} }
protected void updateRecordingAttributes(Recording recording) { protected void updateRecordingAttributes(Recording recording) {
try { try {
RecordingInfoUtils infoUtils = new RecordingInfoUtils(this.openviduConfig.getOpenViduRecordingPath() RecordingInfoUtils infoUtils = new RecordingInfoUtils(this.openviduConfig.getOpenViduRecordingPath()
+ recording.getId() + "/" + recording.getId() + ".info"); + recording.getId() + "/" + recording.getId() + RecordingService.COMPOSED_INFO_FILE_EXTENSION);
if (!infoUtils.hasVideo()) { if (!infoUtils.hasVideo()) {
log.error("COMPOSED recording {} with hasVideo=true has not video track", recording.getId()); log.error("COMPOSED recording {} with hasVideo=true has not video track", recording.getId());
@ -440,44 +430,24 @@ public class ComposedRecordingService extends RecordingService {
} }
} }
protected void waitForVideoFileNotEmpty(Recording recording) throws OpenViduException { protected void waitForVideoFileNotEmpty(Recording recording) throws Exception {
final String VIDEO_FILE = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/" final String VIDEO_FILE = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/"
+ recording.getName() + ".mp4"; + recording.getName() + RecordingService.COMPOSED_RECORDING_EXTENSION;
int SECONDS_MAX_WAIT = 20;
int SECONDS_MAX_WAIT = 15; this.fileManager.waitForFileToExistAndNotEmpty(recording.getRecordingProperties().mediaNode(), VIDEO_FILE,
int MILLISECONDS_INTERVAL_WAIT = 100; SECONDS_MAX_WAIT);
int LIMIT = SECONDS_MAX_WAIT * 1000 / MILLISECONDS_INTERVAL_WAIT;
int i = 0;
boolean arePresent = fileExistsAndHasBytes(VIDEO_FILE);
while (!arePresent && i < LIMIT) {
try {
Thread.sleep(MILLISECONDS_INTERVAL_WAIT);
arePresent = fileExistsAndHasBytes(VIDEO_FILE);
i++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (!arePresent) {
log.error("Recorder container failed generating video file (is empty) for session {}",
recording.getSessionId());
throw new OpenViduException(Code.RECORDING_START_ERROR_CODE,
"Recorder container failed generating video file (is empty)");
}
} }
private boolean fileExistsAndHasBytes(String fileName) { protected void failRecordingCompletion(Recording recording, String containerId, boolean removeContainer,
File f = new File(fileName); OpenViduException e) throws OpenViduException {
return (f.exists() && f.isFile() && f.length() > 0);
}
private void failRecordingCompletion(Recording recording, String containerId, OpenViduException e)
throws OpenViduException {
recording.setStatus(io.openvidu.java.client.Recording.Status.failed); recording.setStatus(io.openvidu.java.client.Recording.Status.failed);
dockerManager.removeDockerContainer(containerId, true); if (removeContainer) {
containers.remove(containerId); dockerManager.removeContainer(recording.getRecordingProperties().mediaNode(), containerId, true);
containers.remove(containerId);
}
sealRecordingMetadataFileAsReady(recording, recording.getSize(), recording.getDuration(),
getMetadataFilePath(recording));
cleanRecordingMaps(recording);
throw e; throw e;
} }
@ -611,4 +581,27 @@ public class ComposedRecordingService extends RecordingService {
return finalUrl; return finalUrl;
} }
protected void downloadComposedRecording(final Session session, final Recording recording, final EndReason reason) {
try {
this.recordingDownloader.downloadRecording(recording, null, () -> {
updateRecordingAttributes(recording);
this.sealRecordingMetadataFileAsReady(recording, recording.getSize(), recording.getDuration(),
getMetadataFilePath(recording));
cleanRecordingMaps(recording);
// Decrement active recordings once it is downloaded. This method will also drop
// the Media Node if no more sessions or recordings and status is
// waiting-idle-to-terminate
((KurentoSession) session).getKms().decrementActiveRecordings();
// Upload if necessary
this.uploadRecording(recording, reason);
});
} catch (IOException e) {
log.error("Error while downloading recording {}: {}", recording.getName(), e.getMessage());
}
}
} }

View File

@ -67,15 +67,17 @@ import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session; import io.openvidu.server.core.Session;
import io.openvidu.server.core.SessionEventsHandler; import io.openvidu.server.core.SessionEventsHandler;
import io.openvidu.server.core.SessionManager; import io.openvidu.server.core.SessionManager;
import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.kurento.kms.Kms; import io.openvidu.server.kurento.kms.Kms;
import io.openvidu.server.kurento.kms.KmsManager; import io.openvidu.server.kurento.kms.KmsManager;
import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.Recording;
import io.openvidu.server.recording.RecordingDownloader; import io.openvidu.server.recording.RecordingDownloader;
import io.openvidu.server.recording.RecordingUploader; import io.openvidu.server.recording.RecordingUploader;
import io.openvidu.server.utils.CustomFileManager; import io.openvidu.server.utils.CustomFileManager;
import io.openvidu.server.utils.LocalDockerManager; import io.openvidu.server.utils.DockerManager;
import io.openvidu.server.utils.JsonUtils; import io.openvidu.server.utils.JsonUtils;
import io.openvidu.server.utils.QuarantineKiller; import io.openvidu.server.utils.LocalCustomFileManager;
import io.openvidu.server.utils.LocalDockerManager;
import io.openvidu.server.utils.RecordingUtils; import io.openvidu.server.utils.RecordingUtils;
public class RecordingManager { public class RecordingManager {
@ -85,7 +87,8 @@ public class RecordingManager {
private ComposedRecordingService composedRecordingService; private ComposedRecordingService composedRecordingService;
private ComposedQuickStartRecordingService composedQuickStartRecordingService; private ComposedQuickStartRecordingService composedQuickStartRecordingService;
private SingleStreamRecordingService singleStreamRecordingService; private SingleStreamRecordingService singleStreamRecordingService;
private LocalDockerManager dockerManager; private DockerManager dockerManager;
private CustomFileManager fileManager;
@Autowired @Autowired
protected SessionEventsHandler sessionHandler; protected SessionEventsHandler sessionHandler;
@ -108,9 +111,6 @@ public class RecordingManager {
@Autowired @Autowired
private KmsManager kmsManager; private KmsManager kmsManager;
@Autowired
protected QuarantineKiller quarantineKiller;
@Autowired @Autowired
private CallDetailRecord cdr; private CallDetailRecord cdr;
@ -125,14 +125,18 @@ public class RecordingManager {
private ScheduledThreadPoolExecutor automaticRecordingStopExecutor = new ScheduledThreadPoolExecutor( private ScheduledThreadPoolExecutor automaticRecordingStopExecutor = new ScheduledThreadPoolExecutor(
Runtime.getRuntime().availableProcessors()); Runtime.getRuntime().availableProcessors());
public static final String RECORDING_ENTITY_FILE = ".recording.";
public static final String IMAGE_NAME = "openvidu/openvidu-recording"; public static final String IMAGE_NAME = "openvidu/openvidu-recording";
static String IMAGE_TAG; public static String IMAGE_TAG;
private static final List<EndReason> LAST_PARTICIPANT_LEFT_REASONS = Arrays private static final List<EndReason> LAST_PARTICIPANT_LEFT_REASONS = Arrays
.asList(new EndReason[] { EndReason.disconnect, EndReason.forceDisconnectByUser, .asList(new EndReason[] { EndReason.disconnect, EndReason.forceDisconnectByUser,
EndReason.forceDisconnectByServer, EndReason.networkDisconnect }); EndReason.forceDisconnectByServer, EndReason.networkDisconnect });
public RecordingManager(DockerManager dockerManager, CustomFileManager fileManager) {
this.dockerManager = dockerManager;
this.fileManager = fileManager;
}
@PostConstruct @PostConstruct
public void init() { public void init() {
if (this.openviduConfig.isRecordingModuleEnabled()) { if (this.openviduConfig.isRecordingModuleEnabled()) {
@ -164,60 +168,33 @@ public class RecordingManager {
RecordingManager.IMAGE_TAG = openviduConfig.getOpenViduRecordingVersion(); RecordingManager.IMAGE_TAG = openviduConfig.getOpenViduRecordingVersion();
this.dockerManager = new LocalDockerManager(); this.dockerManager.init();
this.composedRecordingService = new ComposedRecordingService(this, recordingDownloader, recordingUploader,
openviduConfig, cdr, quarantineKiller);
this.composedQuickStartRecordingService = new ComposedQuickStartRecordingService(this, recordingDownloader,
recordingUploader, openviduConfig, cdr, quarantineKiller);
this.singleStreamRecordingService = new SingleStreamRecordingService(this, recordingDownloader,
recordingUploader, openviduConfig, cdr, quarantineKiller);
log.info("Recording module required: Downloading openvidu/openvidu-recording:" this.composedRecordingService = new ComposedRecordingService(this, recordingDownloader, recordingUploader,
+ openviduConfig.getOpenViduRecordingVersion() + " Docker image (350MB aprox)"); fileManager, openviduConfig, cdr, this.dockerManager);
this.composedQuickStartRecordingService = new ComposedQuickStartRecordingService(this, recordingDownloader,
recordingUploader, fileManager, openviduConfig, cdr, this.dockerManager);
this.singleStreamRecordingService = new SingleStreamRecordingService(this, recordingDownloader,
recordingUploader, fileManager, openviduConfig, cdr);
this.checkRecordingRequirements(this.openviduConfig.getOpenViduRecordingPath(), this.checkRecordingRequirements(this.openviduConfig.getOpenViduRecordingPath(),
this.openviduConfig.getOpenviduRecordingCustomLayout()); this.openviduConfig.getOpenviduRecordingCustomLayout());
if (dockerManager.dockerImageExistsLocally(IMAGE_NAME + ":" + IMAGE_TAG)) { LocalDockerManager dockMng = new LocalDockerManager(true);
log.info("Docker image already exists locally");
} else { if (!openviduConfig.isRecordingComposedExternal()) {
Thread t = new Thread(() -> { downloadRecordingImageToLocal(dockMng);
boolean keep = true;
log.info("Downloading ");
while (keep) {
System.out.print(".");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
keep = false;
log.info("\nDownload complete");
}
}
});
t.start();
try {
dockerManager.downloadDockerImage(IMAGE_NAME + ":" + IMAGE_TAG, 600);
} catch (Exception e) {
log.error("Error downloading docker image {}:{}", IMAGE_NAME, IMAGE_TAG);
}
t.interrupt();
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("Docker image available");
} }
// Clean any stranded openvidu/openvidu-recording container on startup // Clean any stranded openvidu/openvidu-recording container on startup
dockerManager.cleanStrandedContainers(RecordingManager.IMAGE_NAME); dockMng.cleanStrandedContainers(RecordingManager.IMAGE_NAME);
} }
public void checkRecordingRequirements(String openviduRecordingPath, String openviduRecordingCustomLayout) public void checkRecordingRequirements(String openviduRecordingPath, String openviduRecordingCustomLayout)
throws OpenViduException { throws OpenViduException {
LocalDockerManager dockerManager = null; LocalDockerManager dockerManager = null;
try { try {
dockerManager = new LocalDockerManager(); dockerManager = new LocalDockerManager(true);
dockerManager.checkDockerEnabled(); dockerManager.checkDockerEnabled();
} catch (OpenViduException e) { } catch (OpenViduException e) {
String message = e.getMessage(); String message = e.getMessage();
@ -242,6 +219,42 @@ public class RecordingManager {
this.checkRecordingPaths(openviduRecordingPath, openviduRecordingCustomLayout); this.checkRecordingPaths(openviduRecordingPath, openviduRecordingCustomLayout);
} }
private void downloadRecordingImageToLocal(LocalDockerManager dockMng) {
log.info("Recording module required: Downloading openvidu/openvidu-recording:"
+ openviduConfig.getOpenViduRecordingVersion() + " Docker image (350MB aprox)");
if (dockMng.dockerImageExistsLocally(IMAGE_NAME + ":" + IMAGE_TAG)) {
log.info("Docker image already exists locally");
} else {
Thread t = new Thread(() -> {
boolean keep = true;
log.info("Downloading ");
while (keep) {
System.out.print(".");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
keep = false;
log.info("\nDownload complete");
}
}
});
t.start();
try {
dockMng.downloadDockerImage(IMAGE_NAME + ":" + IMAGE_TAG, 600);
} catch (Exception e) {
log.error("Error downloading docker image {}:{}", IMAGE_NAME, IMAGE_TAG);
}
t.interrupt();
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("Docker image available");
}
}
public void startComposedQuickStartContainer(Session session) { public void startComposedQuickStartContainer(Session session) {
this.composedQuickStartRecordingService.runComposedQuickStartContainer(session); this.composedQuickStartRecordingService.runComposedQuickStartContainer(session);
} }
@ -252,14 +265,23 @@ public class RecordingManager {
public Recording startRecording(Session session, RecordingProperties properties) throws OpenViduException { public Recording startRecording(Session session, RecordingProperties properties) throws OpenViduException {
try { try {
if (session.recordingLock.tryLock(15, TimeUnit.SECONDS)) {
try { // 1. INCREMENT ACTIVE RECORDINGS OF MEDIA NODE HERE
if (sessionIsBeingRecorded(session.getSessionId())) { ((KurentoSession) session).getKms().incrementActiveRecordings();
throw new OpenViduException(Code.RECORDING_START_ERROR_CODE, // 2. CHECK THAT MEDIA NODE HAS RUNNING STATUS. IF NOT THEN FAIL RECORDING START
"Concurrent start of recording for session " + session.getSessionId()); if (!kmsManager.isMediaNodeRunning(properties.mediaNode())) {
} else { throw new OpenViduException(Code.MEDIA_NODE_STATUS_WRONG,
Recording recording = null; "Media Node " + properties.mediaNode() + " status is not \"running\"");
try { }
try {
if (session.recordingLock.tryLock(15, TimeUnit.SECONDS)) {
try {
if (sessionIsBeingRecorded(session.getSessionId())) {
throw new OpenViduException(Code.RECORDING_START_ERROR_CODE,
"Concurrent start of recording for session " + session.getSessionId());
} else {
Recording recording = null;
switch (properties.outputMode()) { switch (properties.outputMode()) {
case COMPOSED: case COMPOSED:
recording = this.composedRecordingService.startRecording(session, properties); recording = this.composedRecordingService.startRecording(session, properties);
@ -271,41 +293,45 @@ public class RecordingManager {
recording = this.singleStreamRecordingService.startRecording(session, properties); recording = this.singleStreamRecordingService.startRecording(session, properties);
break; break;
} }
} catch (Exception e) { this.recordingFromStartingToStarted(recording);
throw e;
}
this.recordingFromStartingToStarted(recording);
this.cdr.recordRecordingStarted(recording); this.cdr.recordRecordingStarted(recording);
this.cdr.recordRecordingStatusChanged(recording, null, recording.getCreatedAt(), this.cdr.recordRecordingStatusChanged(recording, null, recording.getCreatedAt(),
Status.started); Status.started);
if (!(OutputMode.COMPOSED.equals(properties.outputMode()) && properties.hasVideo())) { if (!(OutputMode.COMPOSED.equals(properties.outputMode()) && properties.hasVideo())) {
// Directly send recording started notification for all cases except for // Directly send recording started notification for all cases except for
// COMPOSED recordings with video (will be sent on first RECORDER subscriber) // COMPOSED recordings with video (will be sent on first RECORDER subscriber)
// Both INDIVIDUAL and COMPOSED_QUICK_START should notify immediately // Both INDIVIDUAL and COMPOSED_QUICK_START should notify immediately
this.sessionHandler.sendRecordingStartedNotification(session, recording); this.sessionHandler.sendRecordingStartedNotification(session, recording);
}
if (session.getActivePublishers() == 0) {
// Init automatic recording stop if no publishers when starting the recording
log.info(
"No publisher in session {}. Starting {} seconds countdown for stopping recording",
session.getSessionId(),
this.openviduConfig.getOpenviduRecordingAutostopTimeout());
this.initAutomaticRecordingStopThread(session);
}
return recording;
} }
if (session.getActivePublishers() == 0) { } finally {
// Init automatic recording stop if no publishers when starting the recording session.recordingLock.unlock();
log.info("No publisher in session {}. Starting {} seconds countdown for stopping recording",
session.getSessionId(), this.openviduConfig.getOpenviduRecordingAutostopTimeout());
this.initAutomaticRecordingStopThread(session);
}
return recording;
} }
} finally { } else {
session.recordingLock.unlock(); throw new OpenViduException(Code.RECORDING_START_ERROR_CODE,
"Timeout waiting for recording Session lock to be available for session "
+ session.getSessionId());
} }
} else { } catch (InterruptedException e) {
throw new OpenViduException(Code.RECORDING_START_ERROR_CODE, throw new OpenViduException(Code.RECORDING_START_ERROR_CODE,
"Timeout waiting for recording Session lock to be available for session " "InterruptedException waiting for recording Session lock to be available for session "
+ session.getSessionId()); + session.getSessionId());
} }
} catch (InterruptedException e) { } catch (Exception e) {
throw new OpenViduException(Code.RECORDING_START_ERROR_CODE, // DECREMENT ACTIVE RECORDINGS OF MEDIA NODE AND TRY REMOVE MEDIA NODE HERE
"InterruptedException waiting for recording Session lock to be available for session " ((KurentoSession) session).getKms().decrementActiveRecordings();
+ session.getSessionId()); throw e;
} }
} }
@ -517,9 +543,9 @@ public class RecordingManager {
File[] innerFiles = files[i].listFiles(); File[] innerFiles = files[i].listFiles();
for (int j = 0; j < innerFiles.length; j++) { for (int j = 0; j < innerFiles.length; j++) {
if (innerFiles[j].isFile() if (innerFiles[j].isFile()
&& innerFiles[j].getName().startsWith(RecordingManager.RECORDING_ENTITY_FILE)) { && innerFiles[j].getName().startsWith(RecordingService.RECORDING_ENTITY_FILE)) {
fileNamesNoExtension fileNamesNoExtension
.add(innerFiles[j].getName().replaceFirst(RecordingManager.RECORDING_ENTITY_FILE, "")); .add(innerFiles[j].getName().replaceFirst(RecordingService.RECORDING_ENTITY_FILE, ""));
break; break;
} }
} }
@ -548,7 +574,7 @@ public class RecordingManager {
public File getRecordingEntityFileFromLocalStorage(String recordingId) { public File getRecordingEntityFileFromLocalStorage(String recordingId) {
String metadataFilePath = openviduConfig.getOpenViduRecordingPath() + recordingId + "/" String metadataFilePath = openviduConfig.getOpenViduRecordingPath() + recordingId + "/"
+ RecordingManager.RECORDING_ENTITY_FILE + recordingId; + RecordingService.RECORDING_ENTITY_FILE + recordingId;
return new File(metadataFilePath); return new File(metadataFilePath);
} }
@ -571,7 +597,7 @@ public class RecordingManager {
} }
public Recording getRecordingFromEntityFile(File file) { public Recording getRecordingFromEntityFile(File file) {
if (file.isFile() && file.getName().startsWith(RecordingManager.RECORDING_ENTITY_FILE)) { if (file.isFile() && file.getName().startsWith(RecordingService.RECORDING_ENTITY_FILE)) {
JsonObject json; JsonObject json;
try { try {
json = jsonUtils.fromFileToJsonObject(file.getAbsolutePath()); json = jsonUtils.fromFileToJsonObject(file.getAbsolutePath());
@ -726,7 +752,8 @@ public class RecordingManager {
} }
final String testFolderPath = openviduRecordingPath + "/TEST_RECORDING_PATH_" + System.currentTimeMillis(); final String testFolderPath = openviduRecordingPath + "/TEST_RECORDING_PATH_" + System.currentTimeMillis();
final String testFilePath = testFolderPath + "/TEST_RECORDING_PATH" + SingleStreamRecordingService.INDIVIDUAL_RECORDING_EXTENSION; final String testFilePath = testFolderPath + "/TEST_RECORDING_PATH"
+ RecordingService.INDIVIDUAL_RECORDING_EXTENSION;
// Check Kurento Media Server write permissions in recording path // Check Kurento Media Server write permissions in recording path
if (this.kmsManager.getKmss().isEmpty()) { if (this.kmsManager.getKmss().isEmpty()) {
@ -780,7 +807,7 @@ public class RecordingManager {
log.info("Kurento Media Server has write permissions on recording path: {}", openviduRecordingPath); log.info("Kurento Media Server has write permissions on recording path: {}", openviduRecordingPath);
try { try {
new CustomFileManager().deleteFolder(testFolderPath); new LocalCustomFileManager().deleteFolder(testFolderPath);
log.info("OpenVidu Server has write permissions over files created by Kurento Media Server"); log.info("OpenVidu Server has write permissions over files created by Kurento Media Server");
} catch (IOException e) { } catch (IOException e) {
String errorMessage = "The recording path \"" + openviduRecordingPath String errorMessage = "The recording path \"" + openviduRecordingPath

View File

@ -50,9 +50,9 @@ public class RecordingManagerUtilsLocalStorage extends RecordingManagerUtils {
File[] innerFiles = files[i].listFiles(); File[] innerFiles = files[i].listFiles();
for (int j = 0; j < innerFiles.length; j++) { for (int j = 0; j < innerFiles.length; j++) {
if (innerFiles[j].isFile() if (innerFiles[j].isFile()
&& innerFiles[j].getName().startsWith(RecordingManager.RECORDING_ENTITY_FILE)) { && innerFiles[j].getName().startsWith(RecordingService.RECORDING_ENTITY_FILE)) {
fileNamesNoExtension fileNamesNoExtension
.add(innerFiles[j].getName().replaceFirst(RecordingManager.RECORDING_ENTITY_FILE, "")); .add(innerFiles[j].getName().replaceFirst(RecordingService.RECORDING_ENTITY_FILE, ""));
break; break;
} }
} }

View File

@ -36,7 +36,6 @@ import io.openvidu.server.recording.RecordingDownloader;
import io.openvidu.server.recording.RecordingUploader; import io.openvidu.server.recording.RecordingUploader;
import io.openvidu.server.utils.CommandExecutor; import io.openvidu.server.utils.CommandExecutor;
import io.openvidu.server.utils.CustomFileManager; import io.openvidu.server.utils.CustomFileManager;
import io.openvidu.server.utils.QuarantineKiller;
import io.openvidu.server.utils.RecordingUtils; import io.openvidu.server.utils.RecordingUtils;
public abstract class RecordingService { public abstract class RecordingService {
@ -47,19 +46,26 @@ public abstract class RecordingService {
protected RecordingManager recordingManager; protected RecordingManager recordingManager;
protected RecordingDownloader recordingDownloader; protected RecordingDownloader recordingDownloader;
protected RecordingUploader recordingUploader; protected RecordingUploader recordingUploader;
protected CustomFileManager fileManager;
protected CallDetailRecord cdr; protected CallDetailRecord cdr;
protected QuarantineKiller quarantineKiller;
protected CustomFileManager fileWriter = new CustomFileManager();
RecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader, public final static String RECORDING_ENTITY_FILE = ".recording.";
RecordingUploader recordingUploader, OpenviduConfig openviduConfig, CallDetailRecord cdr, public final static String COMPOSED_RECORDING_EXTENSION = ".mp4";
QuarantineKiller quarantineKiller) { public final static String COMPOSED_THUMBNAIL_EXTENSION = ".jpg";
public final static String COMPOSED_INFO_FILE_EXTENSION = ".info";
public final static String INDIVIDUAL_RECORDING_EXTENSION = ".webm";
public final static String INDIVIDUAL_STREAM_METADATA_FILE = ".stream.";
public final static String INDIVIDUAL_RECORDING_COMPRESSED_EXTENSION = ".zip";
public RecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader,
RecordingUploader recordingUploader, CustomFileManager fileManager, OpenviduConfig openviduConfig,
CallDetailRecord cdr) {
this.recordingManager = recordingManager; this.recordingManager = recordingManager;
this.recordingDownloader = recordingDownloader; this.recordingDownloader = recordingDownloader;
this.recordingUploader = recordingUploader; this.recordingUploader = recordingUploader;
this.fileManager = fileManager;
this.openviduConfig = openviduConfig; this.openviduConfig = openviduConfig;
this.cdr = cdr; this.cdr = cdr;
this.quarantineKiller = quarantineKiller;
} }
public abstract Recording startRecording(Session session, RecordingProperties properties) throws OpenViduException; public abstract Recording startRecording(Session session, RecordingProperties properties) throws OpenViduException;
@ -72,20 +78,16 @@ public abstract class RecordingService {
*/ */
protected void generateRecordingMetadataFile(Recording recording) { protected void generateRecordingMetadataFile(Recording recording) {
String folder = this.openviduConfig.getOpenViduRecordingPath() + recording.getId(); String folder = this.openviduConfig.getOpenViduRecordingPath() + recording.getId();
boolean newFolderCreated = this.fileWriter.createFolderIfNotExists(folder); boolean newFolderCreated = this.fileManager.createFolderIfNotExists(folder);
if (newFolderCreated) { if (newFolderCreated) {
log.warn( log.info("New folder {} created for recording {}", folder, recording.getId());
"New folder {} created. This means A) Cluster mode is enabled B) The recording started for a session with no publishers or C) No media type compatible publishers",
folder);
} else {
log.info("Folder {} already existed. Some publisher is already being recorded", folder);
} }
String filePath = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/" String filePath = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/"
+ RecordingManager.RECORDING_ENTITY_FILE + recording.getId(); + RecordingService.RECORDING_ENTITY_FILE + recording.getId();
String text = recording.toJson().toString(); String text = recording.toJson().toString();
this.fileWriter.createAndWriteFile(filePath, text); this.fileManager.createAndWriteFile(filePath, text);
log.info("Generated recording metadata file at {}", filePath); log.info("Generated recording metadata file at {}", filePath);
} }
@ -97,7 +99,7 @@ public abstract class RecordingService {
*/ */
protected Recording sealRecordingMetadataFileAsStopped(Recording recording) { protected Recording sealRecordingMetadataFileAsStopped(Recording recording) {
final String entityFile = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/" final String entityFile = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/"
+ RecordingManager.RECORDING_ENTITY_FILE + recording.getId(); + RecordingService.RECORDING_ENTITY_FILE + recording.getId();
return this.sealRecordingMetadataFile(recording, 0, 0, io.openvidu.java.client.Recording.Status.stopped, return this.sealRecordingMetadataFile(recording, 0, 0, io.openvidu.java.client.Recording.Status.stopped,
entityFile); entityFile);
} }
@ -123,7 +125,7 @@ public abstract class RecordingService {
recording.setUrl(recordingManager.getRecordingUrl(recording)); recording.setUrl(recordingManager.getRecordingUrl(recording));
final String entityFile = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/" final String entityFile = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/"
+ RecordingManager.RECORDING_ENTITY_FILE + recording.getId(); + RecordingService.RECORDING_ENTITY_FILE + recording.getId();
return this.sealRecordingMetadataFile(recording, size, duration, status, entityFile); return this.sealRecordingMetadataFile(recording, size, duration, status, entityFile);
} }
@ -133,7 +135,7 @@ public abstract class RecordingService {
recording.setSize(size); // Size in bytes recording.setSize(size); // Size in bytes
recording.setDuration(duration > 0 ? duration : 0); // Duration in seconds recording.setDuration(duration > 0 ? duration : 0); // Duration in seconds
if (this.fileWriter.overwriteFile(metadataFilePath, recording.toJson().toString())) { if (this.fileManager.overwriteFile(metadataFilePath, recording.toJson().toString())) {
log.info("Sealed recording metadata file at {} with status [{}]", metadataFilePath, status.name()); log.info("Sealed recording metadata file at {} with status [{}]", metadataFilePath, status.name());
} }
@ -151,8 +153,8 @@ public abstract class RecordingService {
if (properties.name() == null || properties.name().isEmpty()) { if (properties.name() == null || properties.name().isEmpty()) {
// No name provided for the recording file. Use recordingId // No name provided for the recording file. Use recordingId
RecordingProperties.Builder builder = new RecordingProperties.Builder().name(recordingId) RecordingProperties.Builder builder = new RecordingProperties.Builder().name(recordingId)
.outputMode(properties.outputMode()).hasAudio(properties.hasAudio()) .outputMode(properties.outputMode()).hasAudio(properties.hasAudio()).hasVideo(properties.hasVideo())
.hasVideo(properties.hasVideo()); .mediaNode(properties.mediaNode());
if (RecordingUtils.IS_COMPOSED(properties.outputMode()) && properties.hasVideo()) { if (RecordingUtils.IS_COMPOSED(properties.outputMode()) && properties.hasVideo()) {
builder.resolution(properties.resolution()); builder.resolution(properties.resolution());
builder.recordingLayout(properties.recordingLayout()); builder.recordingLayout(properties.recordingLayout());
@ -202,7 +204,7 @@ public abstract class RecordingService {
protected String getMetadataFilePath(Recording recording) { protected String getMetadataFilePath(Recording recording) {
final String folderPath = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/"; final String folderPath = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/";
return folderPath + RecordingManager.RECORDING_ENTITY_FILE + recording.getId(); return folderPath + RecordingService.RECORDING_ENTITY_FILE + recording.getId();
} }
protected void uploadRecording(final Recording recording, EndReason reason) { protected void uploadRecording(final Recording recording, EndReason reason) {

View File

@ -47,7 +47,6 @@ import org.kurento.client.StoppedEvent;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder; import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray; import com.google.gson.JsonArray;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
@ -68,7 +67,7 @@ import io.openvidu.server.recording.RecorderEndpointWrapper;
import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.Recording;
import io.openvidu.server.recording.RecordingDownloader; import io.openvidu.server.recording.RecordingDownloader;
import io.openvidu.server.recording.RecordingUploader; import io.openvidu.server.recording.RecordingUploader;
import io.openvidu.server.utils.QuarantineKiller; import io.openvidu.server.utils.CustomFileManager;
public class SingleStreamRecordingService extends RecordingService { public class SingleStreamRecordingService extends RecordingService {
@ -79,13 +78,10 @@ public class SingleStreamRecordingService extends RecordingService {
// Multiple recorder endpoints per stream during a recording // Multiple recorder endpoints per stream during a recording
private Map<String, Map<String, List<RecorderEndpointWrapper>>> storedRecorders = new ConcurrentHashMap<>(); private Map<String, Map<String, List<RecorderEndpointWrapper>>> storedRecorders = new ConcurrentHashMap<>();
private final String INDIVIDUAL_STREAM_METADATA_FILE = ".stream.";
public static final String INDIVIDUAL_RECORDING_EXTENSION = ".webm";
public SingleStreamRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader, public SingleStreamRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader,
RecordingUploader recordingUploader, OpenviduConfig openviduConfig, CallDetailRecord cdr, RecordingUploader recordingUploader, CustomFileManager fileManager, OpenviduConfig openviduConfig,
QuarantineKiller quarantineKiller) { CallDetailRecord cdr) {
super(recordingManager, recordingDownloader, recordingUploader, openviduConfig, cdr, quarantineKiller); super(recordingManager, recordingDownloader, recordingUploader, fileManager, openviduConfig, cdr);
} }
@Override @Override
@ -139,9 +135,6 @@ public class SingleStreamRecordingService extends RecordingService {
this.generateRecordingMetadataFile(recording); this.generateRecordingMetadataFile(recording);
// Increment active recordings
((KurentoSession) session).getKms().getActiveRecordings().incrementAndGet();
return recording; return recording;
} }
@ -191,11 +184,10 @@ public class SingleStreamRecordingService extends RecordingService {
cleanRecordingWrappers(finalRecordingArray[0]); cleanRecordingWrappers(finalRecordingArray[0]);
// Decrement active recordings once it is downloaded // Decrement active recordings once it is downloaded. This method will also drop
((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet(); // the Media Node if no more sessions or recordings and status is
// waiting-idle-to-terminate
// Now we can drop Media Node if waiting-idle-to-terminate ((KurentoSession) session).getKms().decrementActiveRecordings();
this.quarantineKiller.dropMediaNode(session.getMediaNodeId());
// Upload if necessary // Upload if necessary
this.uploadRecording(finalRecordingArray[0], reason); this.uploadRecording(finalRecordingArray[0], reason);
@ -241,8 +233,8 @@ public class SingleStreamRecordingService extends RecordingService {
RecorderEndpoint recorder = new RecorderEndpoint.Builder(pipeline, RecorderEndpoint recorder = new RecorderEndpoint.Builder(pipeline,
"file://" + openviduConfig.getOpenViduRemoteRecordingPath() + recordingId + "/" + fileName "file://" + openviduConfig.getOpenViduRemoteRecordingPath() + recordingId + "/" + fileName
+ SingleStreamRecordingService.INDIVIDUAL_RECORDING_EXTENSION) + RecordingService.INDIVIDUAL_RECORDING_EXTENSION).withMediaProfile(profile)
.withMediaProfile(profile).build(); .build();
recorder.addRecordingListener(new EventListener<RecordingEvent>() { recorder.addRecordingListener(new EventListener<RecordingEvent>() {
@Override @Override
@ -417,11 +409,11 @@ public class SingleStreamRecordingService extends RecordingService {
} }
private void generateIndividualMetadataFile(RecorderEndpointWrapper wrapper) { private void generateIndividualMetadataFile(RecorderEndpointWrapper wrapper) {
this.commonWriteIndividualMetadataFile(wrapper, this.fileWriter::createAndWriteFile); this.commonWriteIndividualMetadataFile(wrapper, this.fileManager::createAndWriteFile);
} }
private void updateIndividualMetadataFile(RecorderEndpointWrapper wrapper) { private void updateIndividualMetadataFile(RecorderEndpointWrapper wrapper) {
this.commonWriteIndividualMetadataFile(wrapper, this.fileWriter::overwriteFile); this.commonWriteIndividualMetadataFile(wrapper, this.fileManager::overwriteFile);
} }
private void commonWriteIndividualMetadataFile(RecorderEndpointWrapper wrapper, private void commonWriteIndividualMetadataFile(RecorderEndpointWrapper wrapper,
@ -439,7 +431,7 @@ public class SingleStreamRecordingService extends RecordingService {
// individual recordings) and "size" (sum of all individual recordings size) // individual recordings) and "size" (sum of all individual recordings size)
String folderPath = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/"; String folderPath = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/";
String metadataFilePath = folderPath + RecordingManager.RECORDING_ENTITY_FILE + recording.getId(); String metadataFilePath = folderPath + RecordingService.RECORDING_ENTITY_FILE + recording.getId();
String syncFilePath = folderPath + recording.getName() + ".json"; String syncFilePath = folderPath + recording.getName() + ".json";
recording = this.recordingManager.getRecordingFromEntityFile(new File(metadataFilePath)); recording = this.recordingManager.getRecordingFromEntityFile(new File(metadataFilePath));
@ -452,7 +444,6 @@ public class SingleStreamRecordingService extends RecordingService {
File[] files = folder.listFiles(); File[] files = folder.listFiles();
Reader reader = null; Reader reader = null;
Gson gson = new Gson();
// Sync metadata json object to store in "RECORDING_NAME.json" // Sync metadata json object to store in "RECORDING_NAME.json"
JsonObject json = new JsonObject(); JsonObject json = new JsonObject();
@ -495,8 +486,9 @@ public class SingleStreamRecordingService extends RecordingService {
} }
json.add("files", jsonArrayFiles); json.add("files", jsonArrayFiles);
this.fileWriter.createAndWriteFile(syncFilePath, new GsonBuilder().setPrettyPrinting().create().toJson(json)); this.fileManager.createAndWriteFile(syncFilePath, new GsonBuilder().setPrettyPrinting().create().toJson(json));
this.generateZipFileAndCleanFolder(folderPath, recording.getName() + ".zip"); this.generateZipFileAndCleanFolder(folderPath,
recording.getName() + RecordingService.INDIVIDUAL_RECORDING_COMPRESSED_EXTENSION);
double duration = (double) (maxEndTime - minStartTime) / 1000; double duration = (double) (maxEndTime - minStartTime) / 1000;
duration = duration > 0 ? duration : 0; duration = duration > 0 ? duration : 0;
@ -520,7 +512,7 @@ public class SingleStreamRecordingService extends RecordingService {
String fileExtension = FilenameUtils.getExtension(files[i].getName()); String fileExtension = FilenameUtils.getExtension(files[i].getName());
if (files[i].isFile() && (fileExtension.equals("json") if (files[i].isFile() && (fileExtension.equals("json")
|| SingleStreamRecordingService.INDIVIDUAL_RECORDING_EXTENSION.equals("." + fileExtension))) { || RecordingService.INDIVIDUAL_RECORDING_EXTENSION.equals("." + fileExtension))) {
// Zip video files and json sync metadata file // Zip video files and json sync metadata file
FileInputStream fis = new FileInputStream(files[i]); FileInputStream fis = new FileInputStream(files[i]);
@ -534,7 +526,7 @@ public class SingleStreamRecordingService extends RecordingService {
fis.close(); fis.close();
} }
if (!files[i].getName().startsWith(RecordingManager.RECORDING_ENTITY_FILE)) { if (!files[i].getName().startsWith(RecordingService.RECORDING_ENTITY_FILE)) {
// Clean inspected file if it is not // Clean inspected file if it is not
files[i].delete(); files[i].delete();
} }

View File

@ -367,7 +367,7 @@ public class SessionRestController {
RecordingProperties recordingProperties; RecordingProperties recordingProperties;
try { try {
recordingProperties = getRecordingPropertiesFromParams(params, session.getSessionProperties()).build(); recordingProperties = getRecordingPropertiesFromParams(params, session).build();
} catch (RuntimeException e) { } catch (RuntimeException e) {
return this.generateErrorResponse(e.getMessage(), "/sessions", HttpStatus.UNPROCESSABLE_ENTITY); return this.generateErrorResponse(e.getMessage(), "/sessions", HttpStatus.UNPROCESSABLE_ENTITY);
} catch (Exception e) { } catch (Exception e) {
@ -411,8 +411,13 @@ public class SessionRestController {
Session session = sessionManager.getSession(recording.getSessionId()); Session session = sessionManager.getSession(recording.getSessionId());
Recording stoppedRecording = this.recordingManager.stopRecording(session, recording.getId(), Recording stoppedRecording;
EndReason.recordingStoppedByServer); try {
stoppedRecording = this.recordingManager.stopRecording(session, recording.getId(),
EndReason.recordingStoppedByServer);
} catch (Exception e) {
return new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR);
}
session.recordingManuallyStopped.set(true); session.recordingManuallyStopped.set(true);
@ -906,8 +911,8 @@ public class SessionRestController {
return builder; return builder;
} }
protected RecordingProperties.Builder getRecordingPropertiesFromParams(Map<?, ?> params, protected RecordingProperties.Builder getRecordingPropertiesFromParams(Map<?, ?> params, Session session)
SessionProperties sessionProperties) throws Exception { throws Exception {
RecordingProperties.Builder builder = new RecordingProperties.Builder(); RecordingProperties.Builder builder = new RecordingProperties.Builder();
@ -976,7 +981,7 @@ public class SessionRestController {
// If outputMode is COMPOSED when defaultOutputMode is COMPOSED_QUICK_START, // If outputMode is COMPOSED when defaultOutputMode is COMPOSED_QUICK_START,
// change outputMode to COMPOSED_QUICK_START (and vice versa) // change outputMode to COMPOSED_QUICK_START (and vice versa)
OutputMode defaultOutputMode = sessionProperties.defaultOutputMode(); OutputMode defaultOutputMode = session.getSessionProperties().defaultOutputMode();
if (OutputMode.COMPOSED_QUICK_START.equals(defaultOutputMode) && OutputMode.COMPOSED.equals(finalOutputMode)) { if (OutputMode.COMPOSED_QUICK_START.equals(defaultOutputMode) && OutputMode.COMPOSED.equals(finalOutputMode)) {
finalOutputMode = OutputMode.COMPOSED_QUICK_START; finalOutputMode = OutputMode.COMPOSED_QUICK_START;
} else if (OutputMode.COMPOSED.equals(defaultOutputMode) } else if (OutputMode.COMPOSED.equals(defaultOutputMode)
@ -984,15 +989,17 @@ public class SessionRestController {
finalOutputMode = OutputMode.COMPOSED; finalOutputMode = OutputMode.COMPOSED;
} }
builder.outputMode(finalOutputMode == null ? sessionProperties.defaultOutputMode() : finalOutputMode); builder.outputMode(
finalOutputMode == null ? session.getSessionProperties().defaultOutputMode() : finalOutputMode);
if (RecordingUtils.IS_COMPOSED(finalOutputMode)) { if (RecordingUtils.IS_COMPOSED(finalOutputMode)) {
builder.resolution(resolution != null ? resolution : "1920x1080"); // resolution == null ? builder.resolution(resolution != null ? resolution : "1920x1080"); // resolution == null ?
// sessionProperties.defaultRecordingResolution) // sessionProperties.defaultRecordingResolution)
// : resolution)); // : resolution));
builder.recordingLayout( builder.recordingLayout(recordingLayout == null ? session.getSessionProperties().defaultRecordingLayout()
recordingLayout == null ? sessionProperties.defaultRecordingLayout() : recordingLayout); : recordingLayout);
if (RecordingLayout.CUSTOM.equals(recordingLayout)) { if (RecordingLayout.CUSTOM.equals(recordingLayout)) {
builder.customLayout(customLayout == null ? sessionProperties.defaultCustomLayout() : customLayout); builder.customLayout(
customLayout == null ? session.getSessionProperties().defaultCustomLayout() : customLayout);
} }
if (shmSize != null) { if (shmSize != null) {
if (shmSize < 134217728L) { if (shmSize < 134217728L) {

View File

@ -28,7 +28,7 @@ import org.apache.commons.io.FileUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class CustomFileManager { public abstract class CustomFileManager {
private static final Logger log = LoggerFactory.getLogger(CustomFileManager.class); private static final Logger log = LoggerFactory.getLogger(CustomFileManager.class);
@ -121,4 +121,7 @@ public class CustomFileManager {
} }
} }
public abstract void waitForFileToExistAndNotEmpty(String mediaNodeId, String absolutePathToFile,
int maxSecondsWait) throws Exception;
} }

View File

@ -11,16 +11,17 @@ public interface DockerManager {
public DockerManager init(); public DockerManager init();
public String runContainer(String image, String containerName, String user, List<Volume> volumes, List<Bind> binds, public String runContainer(String mediaNodeId, String image, String containerName, String user,
String networkMode, List<String> envs, List<String> command, Long shmSize, boolean privileged, List<Volume> volumes, List<Bind> binds, String networkMode, List<String> envs, List<String> command,
Map<String, String> labels) throws Exception; Long shmSize, boolean privileged, Map<String, String> labels) throws Exception;
public void removeContainer(String containerId, boolean force); public void removeContainer(String mediaNodeId, String containerId, boolean force);
public void runCommandInContainerSync(String containerId, String command, int secondsOfWait) throws IOException; public void runCommandInContainerSync(String mediaNodeId, String containerId, String command, int secondsOfWait)
throws IOException;
public void runCommandInContainerAsync(String containerId, String command) throws IOException; public void runCommandInContainerAsync(String mediaNodeId, String containerId, String command) throws IOException;
public void waitForContainerStopped(String containerId, int secondsOfWait) throws Exception; public void waitForContainerStopped(String mediaNodeId, String containerId, int secondsOfWait) throws Exception;
} }

View File

@ -0,0 +1,37 @@
package io.openvidu.server.utils;
import java.io.File;
public class LocalCustomFileManager extends CustomFileManager {
@Override
public void waitForFileToExistAndNotEmpty(String mediaNodeId, String absolutePathToFile, int maxSeconsWait)
throws Exception {
// Check 10 times per seconds
int MILLISECONDS_INTERVAL_WAIT = 100;
int LIMIT = maxSeconsWait * 1000 / MILLISECONDS_INTERVAL_WAIT;
int i = 0;
boolean arePresent = fileExistsAndHasBytes(absolutePathToFile);
while (!arePresent && i < LIMIT) {
try {
Thread.sleep(MILLISECONDS_INTERVAL_WAIT);
arePresent = fileExistsAndHasBytes(absolutePathToFile);
i++;
} catch (InterruptedException e) {
throw new Exception("Interrupted exception while waiting for file " + absolutePathToFile + " to exist");
}
}
if (!arePresent) {
throw new Exception("File " + absolutePathToFile + " does not exist and hasn't been created in "
+ maxSeconsWait + " seconds");
}
}
private boolean fileExistsAndHasBytes(String fileName) {
File f = new File(fileName);
return (f.exists() && f.isFile() && f.length() > 0);
}
}

View File

@ -53,15 +53,23 @@ import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code; import io.openvidu.client.OpenViduException.Code;
import io.openvidu.server.recording.service.WaitForContainerStoppedCallback; import io.openvidu.server.recording.service.WaitForContainerStoppedCallback;
public class LocalDockerManager { public class LocalDockerManager implements DockerManager {
private static final Logger log = LoggerFactory.getLogger(LocalDockerManager.class); private static final Logger log = LoggerFactory.getLogger(DockerManager.class);
private DockerClient dockerClient; private DockerClient dockerClient;
public LocalDockerManager() { public LocalDockerManager(boolean init) {
if (init) {
this.init();
}
}
@Override
public DockerManager init() {
DockerClientConfig config = DefaultDockerClientConfig.createDefaultConfigBuilder().build(); DockerClientConfig config = DefaultDockerClientConfig.createDefaultConfigBuilder().build();
this.dockerClient = DockerClientBuilder.getInstance(config).build(); this.dockerClient = DockerClientBuilder.getInstance(config).build();
return this;
} }
public void downloadDockerImage(String image, int secondsOfWait) throws Exception { public void downloadDockerImage(String image, int secondsOfWait) throws Exception {
@ -107,11 +115,12 @@ public class LocalDockerManager {
} }
} }
public String runContainer(String container, String containerName, String user, List<Volume> volumes, @Override
List<Bind> binds, String networkMode, List<String> envs, List<String> command, Long shmSize, public String runContainer(String mediaNodeId, String image, String containerName, String user,
boolean privileged, Map<String, String> labels) throws Exception { List<Volume> volumes, List<Bind> binds, String networkMode, List<String> envs, List<String> command,
Long shmSize, boolean privileged, Map<String, String> labels) throws Exception {
CreateContainerCmd cmd = dockerClient.createContainerCmd(container).withEnv(envs); CreateContainerCmd cmd = dockerClient.createContainerCmd(image).withEnv(envs);
if (containerName != null) { if (containerName != null) {
cmd.withName(containerName); cmd.withName(containerName);
} }
@ -153,12 +162,13 @@ public class LocalDockerManager {
containerName); containerName);
throw e; throw e;
} catch (NotFoundException e) { } catch (NotFoundException e) {
log.error("Docker image {} couldn't be found in docker host", container); log.error("Docker image {} couldn't be found in docker host", image);
throw e; throw e;
} }
} }
public void removeDockerContainer(String containerId, boolean force) { @Override
public void removeContainer(String mediaNodeId, String containerId, boolean force) {
dockerClient.removeContainerCmd(containerId).withForce(force).exec(); dockerClient.removeContainerCmd(containerId).withForce(force).exec();
} }
@ -172,15 +182,9 @@ public class LocalDockerManager {
} }
} }
public void runCommandInContainer(String containerId, String command) throws InterruptedException { @Override
ExecCreateCmdResponse execCreateCmdResponse = dockerClient.execCreateCmd(containerId).withAttachStdout(true) public void runCommandInContainerSync(String mediaNodeId, String containerId, String command, int secondsOfWait)
.withAttachStderr(true).withCmd("bash", "-c", command).exec(); throws IOException {
dockerClient.execStartCmd(execCreateCmdResponse.getId()).exec(new ExecStartResultCallback() {
});
}
public void runCommandInContainerSync(String containerId, String command, int secondsOfWait)
throws InterruptedException {
ExecCreateCmdResponse execCreateCmdResponse = dockerClient.execCreateCmd(containerId).withAttachStdout(true) ExecCreateCmdResponse execCreateCmdResponse = dockerClient.execCreateCmd(containerId).withAttachStdout(true)
.withAttachStderr(true).withCmd("bash", "-c", command).exec(); .withAttachStderr(true).withCmd("bash", "-c", command).exec();
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
@ -193,12 +197,21 @@ public class LocalDockerManager {
try { try {
latch.await(secondsOfWait, TimeUnit.SECONDS); latch.await(secondsOfWait, TimeUnit.SECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new InterruptedException("Container " + containerId + " did not return from executing command \"" throw new IOException("Container " + containerId + " did not return from executing command \"" + command
+ command + "\" in " + secondsOfWait + " seconds"); + "\" in " + secondsOfWait + " seconds");
} }
} }
public void waitForContainerStopped(String containerId, int secondsOfWait) throws Exception { @Override
public void runCommandInContainerAsync(String mediaNodeId, String containerId, String command) throws IOException {
ExecCreateCmdResponse execCreateCmdResponse = dockerClient.execCreateCmd(containerId).withAttachStdout(true)
.withAttachStderr(true).withCmd("bash", "-c", command).exec();
dockerClient.execStartCmd(execCreateCmdResponse.getId()).exec(new ExecStartResultCallback() {
});
}
@Override
public void waitForContainerStopped(String mediaNodeId, String containerId, int secondsOfWait) throws Exception {
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
WaitForContainerStoppedCallback callback = new WaitForContainerStoppedCallback(latch); WaitForContainerStoppedCallback callback = new WaitForContainerStoppedCallback(latch);
dockerClient.waitContainerCmd(containerId).exec(callback); dockerClient.waitContainerCmd(containerId).exec(callback);

View File

@ -38,7 +38,8 @@ public class IntegrationTestConfiguration {
List<Kms> successfullyConnectedKmss = new ArrayList<>(); List<Kms> successfullyConnectedKmss = new ArrayList<>();
List<KmsProperties> kmsProperties = invocation.getArgument(0); List<KmsProperties> kmsProperties = invocation.getArgument(0);
for (KmsProperties kmsProp : kmsProperties) { for (KmsProperties kmsProp : kmsProperties) {
Kms kms = new Kms(kmsProp, Whitebox.getInternalState(spy, "loadManager")); Kms kms = new Kms(kmsProp, Whitebox.getInternalState(spy, "loadManager"),
Whitebox.getInternalState(spy, "quarantineKiller"));
KurentoClient kClient = mock(KurentoClient.class); KurentoClient kClient = mock(KurentoClient.class);
doAnswer(i -> { doAnswer(i -> {