diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java index 9748d353..560d8808 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java @@ -88,11 +88,15 @@ public class KurentoParticipant extends Participant { if (!OpenViduRole.SUBSCRIBER.equals(participant.getToken().getRole())) { // Initialize a PublisherEndpoint - this.publisher = new PublisherEndpoint(endpointType, this, participant.getParticipantPublicId(), - this.session.getPipeline(), this.openviduConfig, null); + initPublisherEndpoint(); } } + public void initPublisherEndpoint() { + this.publisher = new PublisherEndpoint(endpointType, this, this.participantPublicId, this.session.getPipeline(), + this.openviduConfig, null); + } + public void createPublishingEndpoint(MediaOptions mediaOptions, String streamId) { String type = mediaOptions.hasVideo() ? mediaOptions.getTypeOfVideo() : "MICRO"; if (streamId == null) { @@ -136,6 +140,10 @@ public class KurentoParticipant extends Participant { } } + public boolean isPublisherEndpointDefined() { + return this.publisher != null; + } + public PublisherEndpoint getPublisher() { try { if (!publisherLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/DummyRecordingDownloader.java b/openvidu-server/src/main/java/io/openvidu/server/recording/DummyRecordingDownloader.java index d62cb0ee..20ddbd0a 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/DummyRecordingDownloader.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/DummyRecordingDownloader.java @@ -23,7 +23,7 @@ import java.util.Collection; public class DummyRecordingDownloader implements RecordingDownloader { @Override - public void downloadRecording(Recording recording, Collection streamIds, Runnable callback) + public void downloadRecording(Recording recording, Collection wrappers, Runnable callback) throws IOException { // Just immediately run callback function callback.run(); diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/RecorderEndpointWrapper.java b/openvidu-server/src/main/java/io/openvidu/server/recording/RecorderEndpointWrapper.java index 83393369..655ba4a9 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/RecorderEndpointWrapper.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/RecorderEndpointWrapper.java @@ -17,16 +17,19 @@ package io.openvidu.server.recording; +import org.apache.commons.lang3.StringUtils; import org.kurento.client.RecorderEndpoint; import com.google.gson.JsonObject; import io.openvidu.server.kurento.core.KurentoParticipant; +import io.openvidu.server.recording.service.SingleStreamRecordingService; public class RecorderEndpointWrapper { private RecorderEndpoint recorder; private KurentoParticipant kParticipant; + private String name; private String connectionId; private String recordingId; private String streamId; @@ -40,7 +43,9 @@ public class RecorderEndpointWrapper { private long endTime; private long size; - public RecorderEndpointWrapper(RecorderEndpoint recorder, KurentoParticipant kParticipant, String recordingId) { + public RecorderEndpointWrapper(RecorderEndpoint recorder, KurentoParticipant kParticipant, String recordingId, + String name) { + this.name = name; this.recorder = recorder; this.kParticipant = kParticipant; this.recordingId = recordingId; @@ -53,6 +58,24 @@ public class RecorderEndpointWrapper { this.typeOfVideo = kParticipant.getPublisher().getMediaOptions().getTypeOfVideo(); } + public RecorderEndpointWrapper(JsonObject json) { + String nameAux = json.get("name").getAsString(); + // If the name includes the extension, remove it + this.name = StringUtils.removeEnd(nameAux, SingleStreamRecordingService.INDIVIDUAL_RECORDING_EXTENSION); + this.connectionId = json.get("connectionId").getAsString(); + this.streamId = json.get("streamId").getAsString(); + this.clientData = json.get("clientData").getAsString(); + this.serverData = json.get("serverData").getAsString(); + this.startTime = json.get("startTime").getAsLong(); + this.endTime = json.get("endTime").getAsLong(); + this.size = json.get("size").getAsLong(); + this.hasAudio = json.get("hasAudio").getAsBoolean(); + this.hasVideo = json.get("hasVideo").getAsBoolean(); + if (this.hasVideo) { + this.typeOfVideo = json.get("typeOfVideo").getAsString(); + } + } + public RecorderEndpoint getRecorder() { return recorder; } @@ -61,6 +84,14 @@ public class RecorderEndpointWrapper { return this.kParticipant; } + public String getName() { + return this.name; + } + + public String getNameWithExtension() { + return this.name + SingleStreamRecordingService.INDIVIDUAL_RECORDING_EXTENSION; + } + public String getConnectionId() { return connectionId; } @@ -119,6 +150,7 @@ public class RecorderEndpointWrapper { public JsonObject toJson() { JsonObject json = new JsonObject(); + json.addProperty("name", this.getNameWithExtension()); json.addProperty("connectionId", this.getConnectionId()); json.addProperty("streamId", this.getStreamId()); json.addProperty("clientData", this.getClientData()); diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/RecordingDownloader.java b/openvidu-server/src/main/java/io/openvidu/server/recording/RecordingDownloader.java index 65c750b5..71d392c5 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/RecordingDownloader.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/RecordingDownloader.java @@ -22,7 +22,7 @@ import java.util.Collection; public interface RecordingDownloader { - public void downloadRecording(Recording recording, Collection streamIds, Runnable callback) + public void downloadRecording(Recording recording, Collection wrappers, Runnable callback) throws IOException; public void cancelDownload(String recordingId); diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java index f4a09720..a7127cf4 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java @@ -726,7 +726,7 @@ public class RecordingManager { } final String testFolderPath = openviduRecordingPath + "/TEST_RECORDING_PATH_" + System.currentTimeMillis(); - final String testFilePath = testFolderPath + "/TEST_RECORDING_PATH.webm"; + final String testFilePath = testFolderPath + "/TEST_RECORDING_PATH" + SingleStreamRecordingService.INDIVIDUAL_RECORDING_EXTENSION; // Check Kurento Media Server write permissions in recording path if (this.kmsManager.getKmss().isEmpty()) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java index e49a054a..b395dc70 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java @@ -24,7 +24,9 @@ import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; import java.io.Reader; -import java.util.HashMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -49,6 +51,7 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonArray; import com.google.gson.JsonObject; +import com.google.gson.JsonParser; import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; @@ -71,10 +74,13 @@ public class SingleStreamRecordingService extends RecordingService { private static final Logger log = LoggerFactory.getLogger(SingleStreamRecordingService.class); + // One recorder endpoint active at a time per stream private Map> activeRecorders = new ConcurrentHashMap<>(); - private Map> storedRecorders = new ConcurrentHashMap<>(); + // Multiple recorder endpoints per stream during a recording + private Map>> storedRecorders = new ConcurrentHashMap<>(); private final String INDIVIDUAL_STREAM_METADATA_FILE = ".stream."; + public static final String INDIVIDUAL_RECORDING_EXTENSION = ".webm"; public SingleStreamRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader, RecordingUploader recordingUploader, OpenviduConfig openviduConfig, CallDetailRecord cdr, @@ -97,8 +103,8 @@ public class SingleStreamRecordingService extends RecordingService { Recording recording = new Recording(session.getSessionId(), recordingId, properties); this.recordingManager.recordingToStarting(recording); - activeRecorders.put(recording.getId(), new ConcurrentHashMap()); - storedRecorders.put(recording.getId(), new ConcurrentHashMap()); + activeRecorders.put(recording.getId(), new ConcurrentHashMap<>()); + storedRecorders.put(recording.getId(), new ConcurrentHashMap<>()); int activePublishersToRecord = session.getActiveIndividualRecordedPublishers(); final CountDownLatch recordingStartedCountdown = new CountDownLatch(activePublishersToRecord); @@ -149,10 +155,13 @@ public class SingleStreamRecordingService extends RecordingService { recording.hasVideo() ? (recording.hasAudio() ? "video+audio" : "video-only") : "audioOnly", recording.getId(), recording.getSessionId(), reason); - final HashMap wrappers = new HashMap<>(storedRecorders.get(recording.getId())); + final List wrappers = new ArrayList<>(); + storedRecorders.get(recording.getId()).values().forEach(list -> { + wrappers.addAll(list); + }); final CountDownLatch stoppedCountDown = new CountDownLatch(wrappers.size()); - for (RecorderEndpointWrapper wrapper : wrappers.values()) { + for (RecorderEndpointWrapper wrapper : wrappers) { this.stopRecorderEndpointOfPublisherEndpoint(recording.getId(), wrapper.getStreamId(), stoppedCountDown, kmsDisconnectionTime); } @@ -171,9 +180,9 @@ public class SingleStreamRecordingService extends RecordingService { final Recording[] finalRecordingArray = new Recording[1]; finalRecordingArray[0] = recording; try { - this.recordingDownloader.downloadRecording(finalRecordingArray[0], wrappers.keySet(), () -> { + this.recordingDownloader.downloadRecording(finalRecordingArray[0], wrappers, () -> { // Update recording entity files with final file size - for (RecorderEndpointWrapper wrapper : wrappers.values()) { + for (RecorderEndpointWrapper wrapper : wrappers) { if (wrapper.getSize() == 0) { updateIndividualMetadataFile(wrapper); } @@ -211,28 +220,36 @@ public class SingleStreamRecordingService extends RecordingService { log.info("Starting single stream recorder for stream {} in session {}", participant.getPublisherStreamId(), participant.getSessionId()); + final String streamId = participant.getPublisherStreamId(); + try { if (participant.singleRecordingLock.tryLock(15, TimeUnit.SECONDS)) { try { - if (this.activeRecorders.get(recordingId).containsKey(participant.getPublisherStreamId())) { + if (this.activeRecorders.get(recordingId).containsKey(streamId)) { log.warn("Concurrent initialization of RecorderEndpoint for stream {} of session {}. Returning", - participant.getPublisherStreamId(), participant.getSessionId()); + streamId, participant.getSessionId()); return; } + // Update stream recording counter + final List wrapperList = storedRecorders.get(recordingId).get(streamId); + final int streamCounter = wrapperList != null ? wrapperList.size() : 0; + String fileName = streamCounter == 0 ? streamId : (streamId + "-" + streamCounter); + KurentoParticipant kurentoParticipant = (KurentoParticipant) participant; MediaPipeline pipeline = kurentoParticipant.getPublisher().getPipeline(); RecorderEndpoint recorder = new RecorderEndpoint.Builder(pipeline, - "file://" + openviduConfig.getOpenViduRemoteRecordingPath() + recordingId + "/" - + participant.getPublisherStreamId() + ".webm").withMediaProfile(profile).build(); + "file://" + openviduConfig.getOpenViduRemoteRecordingPath() + recordingId + "/" + fileName + + SingleStreamRecordingService.INDIVIDUAL_RECORDING_EXTENSION) + .withMediaProfile(profile).build(); recorder.addRecordingListener(new EventListener() { @Override public void onEvent(RecordingEvent event) { - activeRecorders.get(recordingId).get(participant.getPublisherStreamId()) + activeRecorders.get(recordingId).get(streamId) .setStartTime(Long.parseLong(event.getTimestampMillis())); - log.info("Recording started event for stream {}", participant.getPublisherStreamId()); + log.info("Recording started event for stream {}", streamId); globalStartLatch.countDown(); } }); @@ -245,9 +262,13 @@ public class SingleStreamRecordingService extends RecordingService { }); RecorderEndpointWrapper wrapper = new RecorderEndpointWrapper(recorder, kurentoParticipant, - recordingId); - activeRecorders.get(recordingId).put(participant.getPublisherStreamId(), wrapper); - storedRecorders.get(recordingId).put(participant.getPublisherStreamId(), wrapper); + recordingId, fileName); + activeRecorders.get(recordingId).put(streamId, wrapper); + if (wrapperList != null) { + wrapperList.add(wrapper); + } else { + storedRecorders.get(recordingId).put(streamId, new ArrayList<>(Arrays.asList(wrapper))); + } connectAccordingToProfile(kurentoParticipant.getPublisher(), recorder, profile); wrapper.getRecorder().record(); @@ -269,33 +290,55 @@ public class SingleStreamRecordingService extends RecordingService { public void stopRecorderEndpointOfPublisherEndpoint(String recordingId, String streamId, CountDownLatch globalStopLatch, Long kmsDisconnectionTime) { + log.info("Stopping single stream recorder for stream {} in recording {}", streamId, recordingId); + final RecorderEndpointWrapper finalWrapper = activeRecorders.get(recordingId).remove(streamId); - if (finalWrapper != null && kmsDisconnectionTime == 0) { - finalWrapper.getRecorder().addStoppedListener(new EventListener() { - @Override - public void onEvent(StoppedEvent event) { - finalWrapper.setEndTime(Long.parseLong(event.getTimestampMillis())); - generateIndividualMetadataFile(finalWrapper); - log.info("Recording stopped event for stream {}", streamId); - finalWrapper.getRecorder().release(); - globalStopLatch.countDown(); - } - }); - finalWrapper.getRecorder().stop(); - } else { - if (kmsDisconnectionTime != 0) { - // Stopping recorder endpoint because of a KMS disconnection - finalWrapper.setEndTime(kmsDisconnectionTime); - generateIndividualMetadataFile(finalWrapper); - log.warn("Forcing individual recording stop after KMS restart for stream {} in recording {}", streamId, - recordingId); - } else { - if (storedRecorders.get(recordingId).containsKey(streamId)) { - log.info("Stream {} recording of recording {} was already stopped", streamId, recordingId); + if (finalWrapper != null) { + KurentoParticipant kParticipant = finalWrapper.getParticipant(); + try { + if (kParticipant.singleRecordingLock.tryLock(15, TimeUnit.SECONDS)) { + try { + if (kmsDisconnectionTime == null) { + finalWrapper.getRecorder().addStoppedListener(new EventListener() { + @Override + public void onEvent(StoppedEvent event) { + finalWrapper.setEndTime(Long.parseLong(event.getTimestampMillis())); + generateIndividualMetadataFile(finalWrapper); + log.info("Recording stopped event for stream {}", streamId); + finalWrapper.getRecorder().release(); + globalStopLatch.countDown(); + } + }); + finalWrapper.getRecorder().stop(); + } else { + // Stopping recorder endpoint because of a KMS disconnection + finalWrapper.setEndTime(kmsDisconnectionTime); + generateIndividualMetadataFile(finalWrapper); + globalStopLatch.countDown(); + log.warn( + "Forcing individual recording stop after KMS restart for stream {} in recording {}", + streamId, recordingId); + } + } finally { + kParticipant.singleRecordingLock.unlock(); + } } else { - log.info("Stream {} wasn't being recorded in recording {}", streamId, recordingId); + log.error( + "Timeout waiting for individual recording lock to be available to stop stream recording for participant {} of session {}", + kParticipant.getParticipantPublicId(), kParticipant.getSessionId()); } + } catch (InterruptedException e) { + log.error( + "InterruptedException waiting for individual recording lock to be available to stop stream recording for participant {} of session {}", + kParticipant.getParticipantPublicId(), kParticipant.getSessionId()); + } + } else { + // The streamId has no associated RecorderEndpoint + if (storedRecorders.get(recordingId).containsKey(streamId)) { + log.info("Stream {} recording of recording {} was already stopped", streamId, recordingId); + } else { + log.info("Stream {} wasn't being recorded in recording {}", streamId, recordingId); } globalStopLatch.countDown(); } @@ -384,9 +427,9 @@ public class SingleStreamRecordingService extends RecordingService { private void commonWriteIndividualMetadataFile(RecorderEndpointWrapper wrapper, BiFunction writeFunction) { String filesPath = this.openviduConfig.getOpenViduRecordingPath() + wrapper.getRecordingId() + "/"; - File videoFile = new File(filesPath + wrapper.getStreamId() + ".webm"); + File videoFile = new File(filesPath + wrapper.getNameWithExtension()); wrapper.setSize(videoFile.length()); - String metadataFilePath = filesPath + INDIVIDUAL_STREAM_METADATA_FILE + wrapper.getStreamId(); + String metadataFilePath = filesPath + INDIVIDUAL_STREAM_METADATA_FILE + wrapper.getName(); String metadataFileContent = wrapper.toJson().toString(); writeFunction.apply(metadataFilePath, metadataFileContent); } @@ -426,12 +469,14 @@ public class SingleStreamRecordingService extends RecordingService { } catch (FileNotFoundException e) { log.error("Error reading file {}. Error: {}", files[i].getAbsolutePath(), e.getMessage()); } - RecorderEndpointWrapper wr = gson.fromJson(reader, RecorderEndpointWrapper.class); + RecorderEndpointWrapper wr = new RecorderEndpointWrapper( + JsonParser.parseReader(reader).getAsJsonObject()); minStartTime = Math.min(minStartTime, wr.getStartTime()); maxEndTime = Math.max(maxEndTime, wr.getEndTime()); accumulatedSize += wr.getSize(); JsonObject jsonFile = new JsonObject(); + jsonFile.addProperty("name", wr.getNameWithExtension()); jsonFile.addProperty("connectionId", wr.getConnectionId()); jsonFile.addProperty("streamId", wr.getStreamId()); jsonFile.addProperty("size", wr.getSize()); @@ -474,7 +519,8 @@ public class SingleStreamRecordingService extends RecordingService { for (int i = 0; i < files.length; i++) { String fileExtension = FilenameUtils.getExtension(files[i].getName()); - if (files[i].isFile() && (fileExtension.equals("json") || fileExtension.equals("webm"))) { + if (files[i].isFile() && (fileExtension.equals("json") + || SingleStreamRecordingService.INDIVIDUAL_RECORDING_EXTENSION.equals("." + fileExtension))) { // Zip video files and json sync metadata file FileInputStream fis = new FileInputStream(files[i]);