diff --git a/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java b/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java index a92998ea..ae0eef0a 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java +++ b/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java @@ -152,7 +152,7 @@ public class OpenViduServer implements JsonRpcConfigurer { public RecordingManager recordingManager() { return new RecordingManager(); } - + @Bean @ConditionalOnMissingBean public RecordingDownloader recordingDownload() { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index fe3580a1..476ab259 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -265,7 +265,7 @@ public class KurentoSessionManager extends SessionManager { + kurentoOptions.getFilter().getType()); log.error("PARTICIPANT {}: Error applying filter. The token has no permissions to apply filter {}", participant.getParticipantPublicId(), kurentoOptions.getFilter().getType(), e); - sessionEventsHandler.onPublishMedia(participant, null, kParticipant.getPublisher().createdAt(), + sessionEventsHandler.onPublishMedia(participant, null, System.currentTimeMillis(), kSession.getSessionId(), mediaOptions, sdpAnswer, participants, transactionId, e); throw e; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/DummyRecordingDownloader.java b/openvidu-server/src/main/java/io/openvidu/server/recording/DummyRecordingDownloader.java index 581965ba..5701dd7d 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/DummyRecordingDownloader.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/DummyRecordingDownloader.java @@ -6,8 +6,11 @@ import java.util.Collection; public class DummyRecordingDownloader implements RecordingDownloader { @Override - public void downloadRecording(Recording recording, Collection streamIds) throws IOException { - // Do nothing + public void downloadRecording(Recording recording, Collection streamIds, Runnable callback) + throws IOException { + // Just immediately run callback function + callback.run(); + return; } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/RecordingDownloader.java b/openvidu-server/src/main/java/io/openvidu/server/recording/RecordingDownloader.java index d6ea5b87..e9482fe2 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/RecordingDownloader.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/RecordingDownloader.java @@ -22,6 +22,7 @@ import java.util.Collection; public interface RecordingDownloader { - public void downloadRecording(Recording recording, Collection streamIds) throws IOException; + public void downloadRecording(Recording recording, Collection streamIds, Runnable callback) + throws IOException; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java index b0775f96..3a092820 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java @@ -48,6 +48,7 @@ import io.openvidu.server.kurento.core.KurentoParticipant; import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.recording.CompositeWrapper; import io.openvidu.server.recording.Recording; +import io.openvidu.server.recording.RecordingDownloader; import io.openvidu.server.recording.RecordingInfoUtils; import io.openvidu.server.utils.DockerManager; @@ -61,8 +62,9 @@ public class ComposedRecordingService extends RecordingService { private DockerManager dockerManager; - public ComposedRecordingService(RecordingManager recordingManager, OpenviduConfig openviduConfig) { - super(recordingManager, openviduConfig); + public ComposedRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader, + OpenviduConfig openviduConfig) { + super(recordingManager, recordingDownloader, openviduConfig); this.dockerManager = new DockerManager(); } @@ -94,6 +96,7 @@ public class ComposedRecordingService extends RecordingService { if (recording.hasVideo()) { return this.stopRecordingWithVideo(session, recording, reason); } else { + recording = this.sealRecordingMetadataFileAsProcessing(recording); return this.stopRecordingAudioOnly(session, recording, reason, 0); } } @@ -367,21 +370,28 @@ public class ComposedRecordingService extends RecordingService { this.cleanRecordingMaps(recording); - String filesPath = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/"; - File videoFile = new File(filesPath + recording.getName() + ".webm"); - long finalSize = videoFile.length(); - double finalDuration = (double) compositeWrapper.getDuration() / 1000; - - this.updateFilePermissions(filesPath); - - this.sealRecordingMetadataFile(recording, finalSize, finalDuration, - filesPath + RecordingManager.RECORDING_ENTITY_FILE + recording.getId()); + // TODO: DOWNLOAD FILE IF SCALABILITY MODE + final Recording[] finalRecordingArray = new Recording[1]; + try { + this.recordingDownloader.downloadRecording(recording, null, () -> { + String filesPath = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/"; + File videoFile = new File(filesPath + recording.getName() + ".webm"); + long finalSize = videoFile.length(); + double finalDuration = (double) compositeWrapper.getDuration() / 1000; + this.updateFilePermissions(filesPath); + this.sealRecordingMetadataFileAsStopped(recording, finalSize, finalDuration, + filesPath + RecordingManager.RECORDING_ENTITY_FILE + recording.getId()); + }); + } catch (IOException e) { + log.error("Error while downloading recording {}: {}", recording.getName(), e.getMessage()); + } + Recording finalRecording = finalRecordingArray[0] != null ? finalRecordingArray[0] : recording; if (reason != null && session != null) { - this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, recording, reason); + this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, finalRecording, reason); } - return recording; + return finalRecording; } private void waitForVideoFileNotEmpty(Recording recording) throws OpenViduException { diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingService.java index b2bcee32..740433b6 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingService.java @@ -63,8 +63,8 @@ public abstract class RecordingService { boolean newFolderCreated = this.fileWriter.createFolderIfNotExists(folder); if (newFolderCreated) { - log.info( - "New folder {} created. This means the recording started for a session with no publishers or no media type compatible publishers", + log.warn( + "New folder {} created. This means A) Cluster mode is enabled B) The recording started for a session with no publishers or C) No media type compatible publishers", folder); } else { log.info("Folder {} already existed. Some publisher is already being recorded", folder); @@ -78,22 +78,50 @@ public abstract class RecordingService { } /** - * Update and overwrites metadata recording file with final values on recording - * stop (".recording.RECORDING_ID" JSON file to store Recording entity). + * Update and overwrites metadata recording file to set it in "processing" + * status. Recording size and duration will remain as 0 * * @return updated Recording object */ - protected Recording sealRecordingMetadataFile(Recording recording, long size, double duration, + protected Recording sealRecordingMetadataFileAsProcessing(Recording recording) { + + log.info("Recording {} is processing", recording.getId()); + + final String entityFile = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/" + + RecordingManager.RECORDING_ENTITY_FILE + recording.getId(); + return this.sealRecordingMetadataFile(recording, 0, 0, io.openvidu.java.client.Recording.Status.processing, + entityFile); + } + + /** + * Update and overwrites metadata recording file to set it in "stopped" (or + * "failed") status + * + * @return updated Recording object + */ + protected Recording sealRecordingMetadataFileAsStopped(Recording recording, long size, double duration, String metadataFilePath) { + + log.info("Recording {} is processing", recording.getId()); + + io.openvidu.java.client.Recording.Status status = io.openvidu.java.client.Recording.Status.failed + .equals(recording.getStatus()) ? io.openvidu.java.client.Recording.Status.failed + : io.openvidu.java.client.Recording.Status.stopped; + + final String entityFile = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/" + + RecordingManager.RECORDING_ENTITY_FILE + recording.getId(); + return this.sealRecordingMetadataFile(recording, size, duration, status, entityFile); + } + + private Recording sealRecordingMetadataFile(Recording recording, long size, double duration, + io.openvidu.java.client.Recording.Status status, String metadataFilePath) { + recording.setStatus(status); recording.setSize(size); // Size in bytes recording.setDuration(duration > 0 ? duration : 0); // Duration in seconds - if (!io.openvidu.java.client.Recording.Status.failed.equals(recording.getStatus())) { - recording.setStatus(io.openvidu.java.client.Recording.Status.stopped); - } this.fileWriter.overwriteFile(metadataFilePath, recording.toJson().toString()); recording = this.recordingManager.updateRecordingUrl(recording); - log.info("Sealed recording metadata file at {}", metadataFilePath); + log.info("Sealed recording metadata file at {} with status [{}]", metadataFilePath, status.name()); return recording; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java index 6f7cb140..24be866c 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java @@ -24,10 +24,12 @@ import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; import java.io.Reader; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -59,6 +61,7 @@ import io.openvidu.server.kurento.core.KurentoParticipant; import io.openvidu.server.kurento.endpoint.PublisherEndpoint; import io.openvidu.server.recording.RecorderEndpointWrapper; import io.openvidu.server.recording.Recording; +import io.openvidu.server.recording.RecordingDownloader; public class SingleStreamRecordingService extends RecordingService { @@ -67,8 +70,9 @@ public class SingleStreamRecordingService extends RecordingService { private Map> recorders = new ConcurrentHashMap<>(); private final String INDIVIDUAL_STREAM_METADATA_FILE = ".stream."; - public SingleStreamRecordingService(RecordingManager recordingManager, OpenviduConfig openviduConfig) { - super(recordingManager, openviduConfig); + public SingleStreamRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader, + OpenviduConfig openviduConfig) { + super(recordingManager, recordingDownloader, openviduConfig); } @Override @@ -126,6 +130,7 @@ public class SingleStreamRecordingService extends RecordingService { @Override public Recording stopRecording(Session session, Recording recording, EndReason reason) { + recording = this.sealRecordingMetadataFileAsProcessing(recording); return this.stopRecording(session, recording, reason, 0); } @@ -134,8 +139,9 @@ public class SingleStreamRecordingService extends RecordingService { recording.hasVideo() ? (recording.hasAudio() ? "video+audio" : "video-only") : "audioOnly", recording.getId(), recording.getSessionId(), reason); - final int numberOfActiveRecorders = recorders.get(recording.getSessionId()).size(); - final CountDownLatch stoppedCountDown = new CountDownLatch(numberOfActiveRecorders); + final HashMap wrappers = new HashMap<>( + recorders.get(recording.getSessionId())); + final CountDownLatch stoppedCountDown = new CountDownLatch(wrappers.size()); for (RecorderEndpointWrapper wrapper : recorders.get(recording.getSessionId()).values()) { this.stopRecorderEndpointOfPublisherEndpoint(recording.getSessionId(), wrapper.getStreamId(), @@ -152,15 +158,29 @@ public class SingleStreamRecordingService extends RecordingService { } this.cleanRecordingMaps(recording); - this.recorders.remove(recording.getSessionId()); - recording = this.sealMetadataFiles(recording); + // TODO: DOWNLOAD FILES IF SCALABILITY MODE + final Recording[] finalRecordingArray = new Recording[1]; + try { + this.recordingDownloader.downloadRecording(recording, wrappers.keySet(), () -> { + // Update recording entity files with final file size + for (RecorderEndpointWrapper wrapper : wrappers.values()) { + if (wrapper.getSize() == 0) { + updateIndividualMetadataFile(wrapper); + } + } + finalRecordingArray[0] = this.sealMetadataFiles(recording); + }); + } catch (IOException e) { + log.error("Error while downloading recording {}", recording.getName()); + } + Recording finalRecording = finalRecordingArray[0] != null ? finalRecordingArray[0] : recording; if (reason != null && session != null) { - this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, recording, reason); + this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, finalRecording, reason); } - return recording; + return finalRecording; } public void startRecorderEndpointForPublisherEndpoint(Session session, String recordingId, @@ -223,7 +243,7 @@ public class SingleStreamRecordingService extends RecordingService { public void stopRecorderEndpointOfPublisherEndpoint(String sessionId, String streamId, CountDownLatch globalStopLatch, Long kmsDisconnectionTime) { log.info("Stopping single stream recorder for stream {} in session {}", streamId, sessionId); - final RecorderEndpointWrapper finalWrapper = this.recorders.get(sessionId).remove(streamId); + final RecorderEndpointWrapper finalWrapper = recorders.get(sessionId).remove(streamId); if (finalWrapper != null && kmsDisconnectionTime == 0) { finalWrapper.getRecorder().addStoppedListener(new EventListener() { @Override @@ -323,12 +343,21 @@ public class SingleStreamRecordingService extends RecordingService { } private void generateIndividualMetadataFile(RecorderEndpointWrapper wrapper) { + this.commonWriteIndividualMetadataFile(wrapper, this.fileWriter::createAndWriteFile); + } + + private void updateIndividualMetadataFile(RecorderEndpointWrapper wrapper) { + this.commonWriteIndividualMetadataFile(wrapper, this.fileWriter::overwriteFile); + } + + private void commonWriteIndividualMetadataFile(RecorderEndpointWrapper wrapper, + BiFunction writeFunction) { String filesPath = this.openviduConfig.getOpenViduRecordingPath() + wrapper.getRecordingId() + "/"; File videoFile = new File(filesPath + wrapper.getStreamId() + ".webm"); wrapper.setSize(videoFile.length()); String metadataFilePath = filesPath + INDIVIDUAL_STREAM_METADATA_FILE + wrapper.getStreamId(); String metadataFileContent = wrapper.toJson().toString(); - this.fileWriter.createAndWriteFile(metadataFilePath, metadataFileContent); + writeFunction.apply(metadataFilePath, metadataFileContent); } private Recording sealMetadataFiles(Recording recording) { @@ -336,7 +365,6 @@ public class SingleStreamRecordingService extends RecordingService { // individual recordings) and "size" (sum of all individual recordings size) String folderPath = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/"; - String metadataFilePath = folderPath + RecordingManager.RECORDING_ENTITY_FILE + recording.getId(); String syncFilePath = folderPath + recording.getName() + ".json"; @@ -397,7 +425,7 @@ public class SingleStreamRecordingService extends RecordingService { double duration = (double) (maxEndTime - minStartTime) / 1000; duration = duration > 0 ? duration : 0; - recording = this.sealRecordingMetadataFile(recording, accumulatedSize, duration, metadataFilePath); + recording = this.sealRecordingMetadataFileAsStopped(recording, accumulatedSize, duration, metadataFilePath); return recording; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/utils/CustomFileManager.java b/openvidu-server/src/main/java/io/openvidu/server/utils/CustomFileManager.java index 72a0f9e2..2658fdb2 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/utils/CustomFileManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/utils/CustomFileManager.java @@ -30,19 +30,23 @@ public class CustomFileManager { private static final Logger log = LoggerFactory.getLogger(CustomFileManager.class); - public void createAndWriteFile(String filePath, String text) { + public boolean createAndWriteFile(String filePath, String text) { try { this.writeAndCloseOnOutputStreamWriter(new FileOutputStream(filePath), text); + return true; } catch (IOException e) { log.error("Couldn't create file {}. Error: {}", filePath, e.getMessage()); + return false; } } - public void overwriteFile(String filePath, String text) { + public boolean overwriteFile(String filePath, String text) { try { this.writeAndCloseOnOutputStreamWriter(new FileOutputStream(filePath, false), text); + return true; } catch (IOException e) { log.error("Couldn't overwrite file {}. Error: {}", filePath, e.getMessage()); + return false; } }