openvidu-server: SingleStreamRecordingService refactoring for dynamic records

pull/550/head
pabloFuente 2020-10-07 12:09:26 +02:00
parent 8102784df3
commit 2e919b69e6
6 changed files with 136 additions and 50 deletions

View File

@ -88,11 +88,15 @@ public class KurentoParticipant extends Participant {
if (!OpenViduRole.SUBSCRIBER.equals(participant.getToken().getRole())) { if (!OpenViduRole.SUBSCRIBER.equals(participant.getToken().getRole())) {
// Initialize a PublisherEndpoint // Initialize a PublisherEndpoint
this.publisher = new PublisherEndpoint(endpointType, this, participant.getParticipantPublicId(), initPublisherEndpoint();
this.session.getPipeline(), this.openviduConfig, null);
} }
} }
public void initPublisherEndpoint() {
this.publisher = new PublisherEndpoint(endpointType, this, this.participantPublicId, this.session.getPipeline(),
this.openviduConfig, null);
}
public void createPublishingEndpoint(MediaOptions mediaOptions, String streamId) { public void createPublishingEndpoint(MediaOptions mediaOptions, String streamId) {
String type = mediaOptions.hasVideo() ? mediaOptions.getTypeOfVideo() : "MICRO"; String type = mediaOptions.hasVideo() ? mediaOptions.getTypeOfVideo() : "MICRO";
if (streamId == null) { if (streamId == null) {
@ -136,6 +140,10 @@ public class KurentoParticipant extends Participant {
} }
} }
public boolean isPublisherEndpointDefined() {
return this.publisher != null;
}
public PublisherEndpoint getPublisher() { public PublisherEndpoint getPublisher() {
try { try {
if (!publisherLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) { if (!publisherLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) {

View File

@ -23,7 +23,7 @@ import java.util.Collection;
public class DummyRecordingDownloader implements RecordingDownloader { public class DummyRecordingDownloader implements RecordingDownloader {
@Override @Override
public void downloadRecording(Recording recording, Collection<String> streamIds, Runnable callback) public void downloadRecording(Recording recording, Collection<RecorderEndpointWrapper> wrappers, Runnable callback)
throws IOException { throws IOException {
// Just immediately run callback function // Just immediately run callback function
callback.run(); callback.run();

View File

@ -17,16 +17,19 @@
package io.openvidu.server.recording; package io.openvidu.server.recording;
import org.apache.commons.lang3.StringUtils;
import org.kurento.client.RecorderEndpoint; import org.kurento.client.RecorderEndpoint;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import io.openvidu.server.kurento.core.KurentoParticipant; import io.openvidu.server.kurento.core.KurentoParticipant;
import io.openvidu.server.recording.service.SingleStreamRecordingService;
public class RecorderEndpointWrapper { public class RecorderEndpointWrapper {
private RecorderEndpoint recorder; private RecorderEndpoint recorder;
private KurentoParticipant kParticipant; private KurentoParticipant kParticipant;
private String name;
private String connectionId; private String connectionId;
private String recordingId; private String recordingId;
private String streamId; private String streamId;
@ -40,7 +43,9 @@ public class RecorderEndpointWrapper {
private long endTime; private long endTime;
private long size; private long size;
public RecorderEndpointWrapper(RecorderEndpoint recorder, KurentoParticipant kParticipant, String recordingId) { public RecorderEndpointWrapper(RecorderEndpoint recorder, KurentoParticipant kParticipant, String recordingId,
String name) {
this.name = name;
this.recorder = recorder; this.recorder = recorder;
this.kParticipant = kParticipant; this.kParticipant = kParticipant;
this.recordingId = recordingId; this.recordingId = recordingId;
@ -53,6 +58,24 @@ public class RecorderEndpointWrapper {
this.typeOfVideo = kParticipant.getPublisher().getMediaOptions().getTypeOfVideo(); this.typeOfVideo = kParticipant.getPublisher().getMediaOptions().getTypeOfVideo();
} }
public RecorderEndpointWrapper(JsonObject json) {
String nameAux = json.get("name").getAsString();
// If the name includes the extension, remove it
this.name = StringUtils.removeEnd(nameAux, SingleStreamRecordingService.INDIVIDUAL_RECORDING_EXTENSION);
this.connectionId = json.get("connectionId").getAsString();
this.streamId = json.get("streamId").getAsString();
this.clientData = json.get("clientData").getAsString();
this.serverData = json.get("serverData").getAsString();
this.startTime = json.get("startTime").getAsLong();
this.endTime = json.get("endTime").getAsLong();
this.size = json.get("size").getAsLong();
this.hasAudio = json.get("hasAudio").getAsBoolean();
this.hasVideo = json.get("hasVideo").getAsBoolean();
if (this.hasVideo) {
this.typeOfVideo = json.get("typeOfVideo").getAsString();
}
}
public RecorderEndpoint getRecorder() { public RecorderEndpoint getRecorder() {
return recorder; return recorder;
} }
@ -61,6 +84,14 @@ public class RecorderEndpointWrapper {
return this.kParticipant; return this.kParticipant;
} }
public String getName() {
return this.name;
}
public String getNameWithExtension() {
return this.name + SingleStreamRecordingService.INDIVIDUAL_RECORDING_EXTENSION;
}
public String getConnectionId() { public String getConnectionId() {
return connectionId; return connectionId;
} }
@ -119,6 +150,7 @@ public class RecorderEndpointWrapper {
public JsonObject toJson() { public JsonObject toJson() {
JsonObject json = new JsonObject(); JsonObject json = new JsonObject();
json.addProperty("name", this.getNameWithExtension());
json.addProperty("connectionId", this.getConnectionId()); json.addProperty("connectionId", this.getConnectionId());
json.addProperty("streamId", this.getStreamId()); json.addProperty("streamId", this.getStreamId());
json.addProperty("clientData", this.getClientData()); json.addProperty("clientData", this.getClientData());

View File

@ -22,7 +22,7 @@ import java.util.Collection;
public interface RecordingDownloader { public interface RecordingDownloader {
public void downloadRecording(Recording recording, Collection<String> streamIds, Runnable callback) public void downloadRecording(Recording recording, Collection<RecorderEndpointWrapper> wrappers, Runnable callback)
throws IOException; throws IOException;
public void cancelDownload(String recordingId); public void cancelDownload(String recordingId);

View File

@ -726,7 +726,7 @@ public class RecordingManager {
} }
final String testFolderPath = openviduRecordingPath + "/TEST_RECORDING_PATH_" + System.currentTimeMillis(); final String testFolderPath = openviduRecordingPath + "/TEST_RECORDING_PATH_" + System.currentTimeMillis();
final String testFilePath = testFolderPath + "/TEST_RECORDING_PATH.webm"; final String testFilePath = testFolderPath + "/TEST_RECORDING_PATH" + SingleStreamRecordingService.INDIVIDUAL_RECORDING_EXTENSION;
// Check Kurento Media Server write permissions in recording path // Check Kurento Media Server write permissions in recording path
if (this.kmsManager.getKmss().isEmpty()) { if (this.kmsManager.getKmss().isEmpty()) {

View File

@ -24,7 +24,9 @@ import java.io.FileOutputStream;
import java.io.FileReader; import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.io.Reader; import java.io.Reader;
import java.util.HashMap; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -49,6 +51,7 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder; import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray; import com.google.gson.JsonArray;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code; import io.openvidu.client.OpenViduException.Code;
@ -71,10 +74,13 @@ public class SingleStreamRecordingService extends RecordingService {
private static final Logger log = LoggerFactory.getLogger(SingleStreamRecordingService.class); private static final Logger log = LoggerFactory.getLogger(SingleStreamRecordingService.class);
// One recorder endpoint active at a time per stream
private Map<String, Map<String, RecorderEndpointWrapper>> activeRecorders = new ConcurrentHashMap<>(); private Map<String, Map<String, RecorderEndpointWrapper>> activeRecorders = new ConcurrentHashMap<>();
private Map<String, Map<String, RecorderEndpointWrapper>> storedRecorders = new ConcurrentHashMap<>(); // Multiple recorder endpoints per stream during a recording
private Map<String, Map<String, List<RecorderEndpointWrapper>>> storedRecorders = new ConcurrentHashMap<>();
private final String INDIVIDUAL_STREAM_METADATA_FILE = ".stream."; private final String INDIVIDUAL_STREAM_METADATA_FILE = ".stream.";
public static final String INDIVIDUAL_RECORDING_EXTENSION = ".webm";
public SingleStreamRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader, public SingleStreamRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader,
RecordingUploader recordingUploader, OpenviduConfig openviduConfig, CallDetailRecord cdr, RecordingUploader recordingUploader, OpenviduConfig openviduConfig, CallDetailRecord cdr,
@ -97,8 +103,8 @@ public class SingleStreamRecordingService extends RecordingService {
Recording recording = new Recording(session.getSessionId(), recordingId, properties); Recording recording = new Recording(session.getSessionId(), recordingId, properties);
this.recordingManager.recordingToStarting(recording); this.recordingManager.recordingToStarting(recording);
activeRecorders.put(recording.getId(), new ConcurrentHashMap<String, RecorderEndpointWrapper>()); activeRecorders.put(recording.getId(), new ConcurrentHashMap<>());
storedRecorders.put(recording.getId(), new ConcurrentHashMap<String, RecorderEndpointWrapper>()); storedRecorders.put(recording.getId(), new ConcurrentHashMap<>());
int activePublishersToRecord = session.getActiveIndividualRecordedPublishers(); int activePublishersToRecord = session.getActiveIndividualRecordedPublishers();
final CountDownLatch recordingStartedCountdown = new CountDownLatch(activePublishersToRecord); final CountDownLatch recordingStartedCountdown = new CountDownLatch(activePublishersToRecord);
@ -149,10 +155,13 @@ public class SingleStreamRecordingService extends RecordingService {
recording.hasVideo() ? (recording.hasAudio() ? "video+audio" : "video-only") : "audioOnly", recording.hasVideo() ? (recording.hasAudio() ? "video+audio" : "video-only") : "audioOnly",
recording.getId(), recording.getSessionId(), reason); recording.getId(), recording.getSessionId(), reason);
final HashMap<String, RecorderEndpointWrapper> wrappers = new HashMap<>(storedRecorders.get(recording.getId())); final List<RecorderEndpointWrapper> wrappers = new ArrayList<>();
storedRecorders.get(recording.getId()).values().forEach(list -> {
wrappers.addAll(list);
});
final CountDownLatch stoppedCountDown = new CountDownLatch(wrappers.size()); final CountDownLatch stoppedCountDown = new CountDownLatch(wrappers.size());
for (RecorderEndpointWrapper wrapper : wrappers.values()) { for (RecorderEndpointWrapper wrapper : wrappers) {
this.stopRecorderEndpointOfPublisherEndpoint(recording.getId(), wrapper.getStreamId(), stoppedCountDown, this.stopRecorderEndpointOfPublisherEndpoint(recording.getId(), wrapper.getStreamId(), stoppedCountDown,
kmsDisconnectionTime); kmsDisconnectionTime);
} }
@ -171,9 +180,9 @@ public class SingleStreamRecordingService extends RecordingService {
final Recording[] finalRecordingArray = new Recording[1]; final Recording[] finalRecordingArray = new Recording[1];
finalRecordingArray[0] = recording; finalRecordingArray[0] = recording;
try { try {
this.recordingDownloader.downloadRecording(finalRecordingArray[0], wrappers.keySet(), () -> { this.recordingDownloader.downloadRecording(finalRecordingArray[0], wrappers, () -> {
// Update recording entity files with final file size // Update recording entity files with final file size
for (RecorderEndpointWrapper wrapper : wrappers.values()) { for (RecorderEndpointWrapper wrapper : wrappers) {
if (wrapper.getSize() == 0) { if (wrapper.getSize() == 0) {
updateIndividualMetadataFile(wrapper); updateIndividualMetadataFile(wrapper);
} }
@ -211,28 +220,36 @@ public class SingleStreamRecordingService extends RecordingService {
log.info("Starting single stream recorder for stream {} in session {}", participant.getPublisherStreamId(), log.info("Starting single stream recorder for stream {} in session {}", participant.getPublisherStreamId(),
participant.getSessionId()); participant.getSessionId());
final String streamId = participant.getPublisherStreamId();
try { try {
if (participant.singleRecordingLock.tryLock(15, TimeUnit.SECONDS)) { if (participant.singleRecordingLock.tryLock(15, TimeUnit.SECONDS)) {
try { try {
if (this.activeRecorders.get(recordingId).containsKey(participant.getPublisherStreamId())) { if (this.activeRecorders.get(recordingId).containsKey(streamId)) {
log.warn("Concurrent initialization of RecorderEndpoint for stream {} of session {}. Returning", log.warn("Concurrent initialization of RecorderEndpoint for stream {} of session {}. Returning",
participant.getPublisherStreamId(), participant.getSessionId()); streamId, participant.getSessionId());
return; return;
} }
// Update stream recording counter
final List<RecorderEndpointWrapper> wrapperList = storedRecorders.get(recordingId).get(streamId);
final int streamCounter = wrapperList != null ? wrapperList.size() : 0;
String fileName = streamCounter == 0 ? streamId : (streamId + "-" + streamCounter);
KurentoParticipant kurentoParticipant = (KurentoParticipant) participant; KurentoParticipant kurentoParticipant = (KurentoParticipant) participant;
MediaPipeline pipeline = kurentoParticipant.getPublisher().getPipeline(); MediaPipeline pipeline = kurentoParticipant.getPublisher().getPipeline();
RecorderEndpoint recorder = new RecorderEndpoint.Builder(pipeline, RecorderEndpoint recorder = new RecorderEndpoint.Builder(pipeline,
"file://" + openviduConfig.getOpenViduRemoteRecordingPath() + recordingId + "/" "file://" + openviduConfig.getOpenViduRemoteRecordingPath() + recordingId + "/" + fileName
+ participant.getPublisherStreamId() + ".webm").withMediaProfile(profile).build(); + SingleStreamRecordingService.INDIVIDUAL_RECORDING_EXTENSION)
.withMediaProfile(profile).build();
recorder.addRecordingListener(new EventListener<RecordingEvent>() { recorder.addRecordingListener(new EventListener<RecordingEvent>() {
@Override @Override
public void onEvent(RecordingEvent event) { public void onEvent(RecordingEvent event) {
activeRecorders.get(recordingId).get(participant.getPublisherStreamId()) activeRecorders.get(recordingId).get(streamId)
.setStartTime(Long.parseLong(event.getTimestampMillis())); .setStartTime(Long.parseLong(event.getTimestampMillis()));
log.info("Recording started event for stream {}", participant.getPublisherStreamId()); log.info("Recording started event for stream {}", streamId);
globalStartLatch.countDown(); globalStartLatch.countDown();
} }
}); });
@ -245,9 +262,13 @@ public class SingleStreamRecordingService extends RecordingService {
}); });
RecorderEndpointWrapper wrapper = new RecorderEndpointWrapper(recorder, kurentoParticipant, RecorderEndpointWrapper wrapper = new RecorderEndpointWrapper(recorder, kurentoParticipant,
recordingId); recordingId, fileName);
activeRecorders.get(recordingId).put(participant.getPublisherStreamId(), wrapper); activeRecorders.get(recordingId).put(streamId, wrapper);
storedRecorders.get(recordingId).put(participant.getPublisherStreamId(), wrapper); if (wrapperList != null) {
wrapperList.add(wrapper);
} else {
storedRecorders.get(recordingId).put(streamId, new ArrayList<>(Arrays.asList(wrapper)));
}
connectAccordingToProfile(kurentoParticipant.getPublisher(), recorder, profile); connectAccordingToProfile(kurentoParticipant.getPublisher(), recorder, profile);
wrapper.getRecorder().record(); wrapper.getRecorder().record();
@ -269,9 +290,16 @@ public class SingleStreamRecordingService extends RecordingService {
public void stopRecorderEndpointOfPublisherEndpoint(String recordingId, String streamId, public void stopRecorderEndpointOfPublisherEndpoint(String recordingId, String streamId,
CountDownLatch globalStopLatch, Long kmsDisconnectionTime) { CountDownLatch globalStopLatch, Long kmsDisconnectionTime) {
log.info("Stopping single stream recorder for stream {} in recording {}", streamId, recordingId); log.info("Stopping single stream recorder for stream {} in recording {}", streamId, recordingId);
final RecorderEndpointWrapper finalWrapper = activeRecorders.get(recordingId).remove(streamId); final RecorderEndpointWrapper finalWrapper = activeRecorders.get(recordingId).remove(streamId);
if (finalWrapper != null && kmsDisconnectionTime == 0) { if (finalWrapper != null) {
KurentoParticipant kParticipant = finalWrapper.getParticipant();
try {
if (kParticipant.singleRecordingLock.tryLock(15, TimeUnit.SECONDS)) {
try {
if (kmsDisconnectionTime == null) {
finalWrapper.getRecorder().addStoppedListener(new EventListener<StoppedEvent>() { finalWrapper.getRecorder().addStoppedListener(new EventListener<StoppedEvent>() {
@Override @Override
public void onEvent(StoppedEvent event) { public void onEvent(StoppedEvent event) {
@ -284,19 +312,34 @@ public class SingleStreamRecordingService extends RecordingService {
}); });
finalWrapper.getRecorder().stop(); finalWrapper.getRecorder().stop();
} else { } else {
if (kmsDisconnectionTime != 0) {
// Stopping recorder endpoint because of a KMS disconnection // Stopping recorder endpoint because of a KMS disconnection
finalWrapper.setEndTime(kmsDisconnectionTime); finalWrapper.setEndTime(kmsDisconnectionTime);
generateIndividualMetadataFile(finalWrapper); generateIndividualMetadataFile(finalWrapper);
log.warn("Forcing individual recording stop after KMS restart for stream {} in recording {}", streamId, globalStopLatch.countDown();
recordingId); log.warn(
"Forcing individual recording stop after KMS restart for stream {} in recording {}",
streamId, recordingId);
}
} finally {
kParticipant.singleRecordingLock.unlock();
}
} else { } else {
log.error(
"Timeout waiting for individual recording lock to be available to stop stream recording for participant {} of session {}",
kParticipant.getParticipantPublicId(), kParticipant.getSessionId());
}
} catch (InterruptedException e) {
log.error(
"InterruptedException waiting for individual recording lock to be available to stop stream recording for participant {} of session {}",
kParticipant.getParticipantPublicId(), kParticipant.getSessionId());
}
} else {
// The streamId has no associated RecorderEndpoint
if (storedRecorders.get(recordingId).containsKey(streamId)) { if (storedRecorders.get(recordingId).containsKey(streamId)) {
log.info("Stream {} recording of recording {} was already stopped", streamId, recordingId); log.info("Stream {} recording of recording {} was already stopped", streamId, recordingId);
} else { } else {
log.info("Stream {} wasn't being recorded in recording {}", streamId, recordingId); log.info("Stream {} wasn't being recorded in recording {}", streamId, recordingId);
} }
}
globalStopLatch.countDown(); globalStopLatch.countDown();
} }
} }
@ -384,9 +427,9 @@ public class SingleStreamRecordingService extends RecordingService {
private void commonWriteIndividualMetadataFile(RecorderEndpointWrapper wrapper, private void commonWriteIndividualMetadataFile(RecorderEndpointWrapper wrapper,
BiFunction<String, String, Boolean> writeFunction) { BiFunction<String, String, Boolean> writeFunction) {
String filesPath = this.openviduConfig.getOpenViduRecordingPath() + wrapper.getRecordingId() + "/"; String filesPath = this.openviduConfig.getOpenViduRecordingPath() + wrapper.getRecordingId() + "/";
File videoFile = new File(filesPath + wrapper.getStreamId() + ".webm"); File videoFile = new File(filesPath + wrapper.getNameWithExtension());
wrapper.setSize(videoFile.length()); wrapper.setSize(videoFile.length());
String metadataFilePath = filesPath + INDIVIDUAL_STREAM_METADATA_FILE + wrapper.getStreamId(); String metadataFilePath = filesPath + INDIVIDUAL_STREAM_METADATA_FILE + wrapper.getName();
String metadataFileContent = wrapper.toJson().toString(); String metadataFileContent = wrapper.toJson().toString();
writeFunction.apply(metadataFilePath, metadataFileContent); writeFunction.apply(metadataFilePath, metadataFileContent);
} }
@ -426,12 +469,14 @@ public class SingleStreamRecordingService extends RecordingService {
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
log.error("Error reading file {}. Error: {}", files[i].getAbsolutePath(), e.getMessage()); log.error("Error reading file {}. Error: {}", files[i].getAbsolutePath(), e.getMessage());
} }
RecorderEndpointWrapper wr = gson.fromJson(reader, RecorderEndpointWrapper.class); RecorderEndpointWrapper wr = new RecorderEndpointWrapper(
JsonParser.parseReader(reader).getAsJsonObject());
minStartTime = Math.min(minStartTime, wr.getStartTime()); minStartTime = Math.min(minStartTime, wr.getStartTime());
maxEndTime = Math.max(maxEndTime, wr.getEndTime()); maxEndTime = Math.max(maxEndTime, wr.getEndTime());
accumulatedSize += wr.getSize(); accumulatedSize += wr.getSize();
JsonObject jsonFile = new JsonObject(); JsonObject jsonFile = new JsonObject();
jsonFile.addProperty("name", wr.getNameWithExtension());
jsonFile.addProperty("connectionId", wr.getConnectionId()); jsonFile.addProperty("connectionId", wr.getConnectionId());
jsonFile.addProperty("streamId", wr.getStreamId()); jsonFile.addProperty("streamId", wr.getStreamId());
jsonFile.addProperty("size", wr.getSize()); jsonFile.addProperty("size", wr.getSize());
@ -474,7 +519,8 @@ public class SingleStreamRecordingService extends RecordingService {
for (int i = 0; i < files.length; i++) { for (int i = 0; i < files.length; i++) {
String fileExtension = FilenameUtils.getExtension(files[i].getName()); String fileExtension = FilenameUtils.getExtension(files[i].getName());
if (files[i].isFile() && (fileExtension.equals("json") || fileExtension.equals("webm"))) { if (files[i].isFile() && (fileExtension.equals("json")
|| SingleStreamRecordingService.INDIVIDUAL_RECORDING_EXTENSION.equals("." + fileExtension))) {
// Zip video files and json sync metadata file // Zip video files and json sync metadata file
FileInputStream fis = new FileInputStream(files[i]); FileInputStream fis = new FileInputStream(files[i]);