openvidu-server: asynchronous download of recordings

pull/375/head
pabloFuente 2019-06-20 14:06:43 +02:00
parent 964b273692
commit cb92fc7dd2
8 changed files with 115 additions and 41 deletions

View File

@ -265,7 +265,7 @@ public class KurentoSessionManager extends SessionManager {
+ kurentoOptions.getFilter().getType()); + kurentoOptions.getFilter().getType());
log.error("PARTICIPANT {}: Error applying filter. The token has no permissions to apply filter {}", log.error("PARTICIPANT {}: Error applying filter. The token has no permissions to apply filter {}",
participant.getParticipantPublicId(), kurentoOptions.getFilter().getType(), e); participant.getParticipantPublicId(), kurentoOptions.getFilter().getType(), e);
sessionEventsHandler.onPublishMedia(participant, null, kParticipant.getPublisher().createdAt(), sessionEventsHandler.onPublishMedia(participant, null, System.currentTimeMillis(),
kSession.getSessionId(), mediaOptions, sdpAnswer, participants, transactionId, e); kSession.getSessionId(), mediaOptions, sdpAnswer, participants, transactionId, e);
throw e; throw e;
} }

View File

@ -6,8 +6,11 @@ import java.util.Collection;
public class DummyRecordingDownloader implements RecordingDownloader { public class DummyRecordingDownloader implements RecordingDownloader {
@Override @Override
public void downloadRecording(Recording recording, Collection<String> streamIds) throws IOException { public void downloadRecording(Recording recording, Collection<String> streamIds, Runnable callback)
// Do nothing throws IOException {
// Just immediately run callback function
callback.run();
return;
} }
} }

View File

@ -22,6 +22,7 @@ import java.util.Collection;
public interface RecordingDownloader { public interface RecordingDownloader {
public void downloadRecording(Recording recording, Collection<String> streamIds) throws IOException; public void downloadRecording(Recording recording, Collection<String> streamIds, Runnable callback)
throws IOException;
} }

View File

@ -48,6 +48,7 @@ import io.openvidu.server.kurento.core.KurentoParticipant;
import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.recording.CompositeWrapper; import io.openvidu.server.recording.CompositeWrapper;
import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.Recording;
import io.openvidu.server.recording.RecordingDownloader;
import io.openvidu.server.recording.RecordingInfoUtils; import io.openvidu.server.recording.RecordingInfoUtils;
import io.openvidu.server.utils.DockerManager; import io.openvidu.server.utils.DockerManager;
@ -61,8 +62,9 @@ public class ComposedRecordingService extends RecordingService {
private DockerManager dockerManager; private DockerManager dockerManager;
public ComposedRecordingService(RecordingManager recordingManager, OpenviduConfig openviduConfig) { public ComposedRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader,
super(recordingManager, openviduConfig); OpenviduConfig openviduConfig) {
super(recordingManager, recordingDownloader, openviduConfig);
this.dockerManager = new DockerManager(); this.dockerManager = new DockerManager();
} }
@ -94,6 +96,7 @@ public class ComposedRecordingService extends RecordingService {
if (recording.hasVideo()) { if (recording.hasVideo()) {
return this.stopRecordingWithVideo(session, recording, reason); return this.stopRecordingWithVideo(session, recording, reason);
} else { } else {
recording = this.sealRecordingMetadataFileAsProcessing(recording);
return this.stopRecordingAudioOnly(session, recording, reason, 0); return this.stopRecordingAudioOnly(session, recording, reason, 0);
} }
} }
@ -367,21 +370,28 @@ public class ComposedRecordingService extends RecordingService {
this.cleanRecordingMaps(recording); this.cleanRecordingMaps(recording);
// TODO: DOWNLOAD FILE IF SCALABILITY MODE
final Recording[] finalRecordingArray = new Recording[1];
try {
this.recordingDownloader.downloadRecording(recording, null, () -> {
String filesPath = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/"; String filesPath = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/";
File videoFile = new File(filesPath + recording.getName() + ".webm"); File videoFile = new File(filesPath + recording.getName() + ".webm");
long finalSize = videoFile.length(); long finalSize = videoFile.length();
double finalDuration = (double) compositeWrapper.getDuration() / 1000; double finalDuration = (double) compositeWrapper.getDuration() / 1000;
this.updateFilePermissions(filesPath); this.updateFilePermissions(filesPath);
this.sealRecordingMetadataFileAsStopped(recording, finalSize, finalDuration,
this.sealRecordingMetadataFile(recording, finalSize, finalDuration,
filesPath + RecordingManager.RECORDING_ENTITY_FILE + recording.getId()); filesPath + RecordingManager.RECORDING_ENTITY_FILE + recording.getId());
});
} catch (IOException e) {
log.error("Error while downloading recording {}: {}", recording.getName(), e.getMessage());
}
Recording finalRecording = finalRecordingArray[0] != null ? finalRecordingArray[0] : recording;
if (reason != null && session != null) { if (reason != null && session != null) {
this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, recording, reason); this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, finalRecording, reason);
} }
return recording; return finalRecording;
} }
private void waitForVideoFileNotEmpty(Recording recording) throws OpenViduException { private void waitForVideoFileNotEmpty(Recording recording) throws OpenViduException {

View File

@ -63,8 +63,8 @@ public abstract class RecordingService {
boolean newFolderCreated = this.fileWriter.createFolderIfNotExists(folder); boolean newFolderCreated = this.fileWriter.createFolderIfNotExists(folder);
if (newFolderCreated) { if (newFolderCreated) {
log.info( log.warn(
"New folder {} created. This means the recording started for a session with no publishers or no media type compatible publishers", "New folder {} created. This means A) Cluster mode is enabled B) The recording started for a session with no publishers or C) No media type compatible publishers",
folder); folder);
} else { } else {
log.info("Folder {} already existed. Some publisher is already being recorded", folder); log.info("Folder {} already existed. Some publisher is already being recorded", folder);
@ -78,22 +78,50 @@ public abstract class RecordingService {
} }
/** /**
* Update and overwrites metadata recording file with final values on recording * Update and overwrites metadata recording file to set it in "processing"
* stop (".recording.RECORDING_ID" JSON file to store Recording entity). * status. Recording size and duration will remain as 0
* *
* @return updated Recording object * @return updated Recording object
*/ */
protected Recording sealRecordingMetadataFile(Recording recording, long size, double duration, protected Recording sealRecordingMetadataFileAsProcessing(Recording recording) {
log.info("Recording {} is processing", recording.getId());
final String entityFile = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/"
+ RecordingManager.RECORDING_ENTITY_FILE + recording.getId();
return this.sealRecordingMetadataFile(recording, 0, 0, io.openvidu.java.client.Recording.Status.processing,
entityFile);
}
/**
* Update and overwrites metadata recording file to set it in "stopped" (or
* "failed") status
*
* @return updated Recording object
*/
protected Recording sealRecordingMetadataFileAsStopped(Recording recording, long size, double duration,
String metadataFilePath) { String metadataFilePath) {
log.info("Recording {} is processing", recording.getId());
io.openvidu.java.client.Recording.Status status = io.openvidu.java.client.Recording.Status.failed
.equals(recording.getStatus()) ? io.openvidu.java.client.Recording.Status.failed
: io.openvidu.java.client.Recording.Status.stopped;
final String entityFile = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/"
+ RecordingManager.RECORDING_ENTITY_FILE + recording.getId();
return this.sealRecordingMetadataFile(recording, size, duration, status, entityFile);
}
private Recording sealRecordingMetadataFile(Recording recording, long size, double duration,
io.openvidu.java.client.Recording.Status status, String metadataFilePath) {
recording.setStatus(status);
recording.setSize(size); // Size in bytes recording.setSize(size); // Size in bytes
recording.setDuration(duration > 0 ? duration : 0); // Duration in seconds recording.setDuration(duration > 0 ? duration : 0); // Duration in seconds
if (!io.openvidu.java.client.Recording.Status.failed.equals(recording.getStatus())) {
recording.setStatus(io.openvidu.java.client.Recording.Status.stopped);
}
this.fileWriter.overwriteFile(metadataFilePath, recording.toJson().toString()); this.fileWriter.overwriteFile(metadataFilePath, recording.toJson().toString());
recording = this.recordingManager.updateRecordingUrl(recording); recording = this.recordingManager.updateRecordingUrl(recording);
log.info("Sealed recording metadata file at {}", metadataFilePath); log.info("Sealed recording metadata file at {} with status [{}]", metadataFilePath, status.name());
return recording; return recording;
} }

View File

@ -24,10 +24,12 @@ import java.io.FileOutputStream;
import java.io.FileReader; import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.io.Reader; import java.io.Reader;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream; import java.util.zip.ZipOutputStream;
@ -59,6 +61,7 @@ import io.openvidu.server.kurento.core.KurentoParticipant;
import io.openvidu.server.kurento.endpoint.PublisherEndpoint; import io.openvidu.server.kurento.endpoint.PublisherEndpoint;
import io.openvidu.server.recording.RecorderEndpointWrapper; import io.openvidu.server.recording.RecorderEndpointWrapper;
import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.Recording;
import io.openvidu.server.recording.RecordingDownloader;
public class SingleStreamRecordingService extends RecordingService { public class SingleStreamRecordingService extends RecordingService {
@ -67,8 +70,9 @@ public class SingleStreamRecordingService extends RecordingService {
private Map<String, Map<String, RecorderEndpointWrapper>> recorders = new ConcurrentHashMap<>(); private Map<String, Map<String, RecorderEndpointWrapper>> recorders = new ConcurrentHashMap<>();
private final String INDIVIDUAL_STREAM_METADATA_FILE = ".stream."; private final String INDIVIDUAL_STREAM_METADATA_FILE = ".stream.";
public SingleStreamRecordingService(RecordingManager recordingManager, OpenviduConfig openviduConfig) { public SingleStreamRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader,
super(recordingManager, openviduConfig); OpenviduConfig openviduConfig) {
super(recordingManager, recordingDownloader, openviduConfig);
} }
@Override @Override
@ -126,6 +130,7 @@ public class SingleStreamRecordingService extends RecordingService {
@Override @Override
public Recording stopRecording(Session session, Recording recording, EndReason reason) { public Recording stopRecording(Session session, Recording recording, EndReason reason) {
recording = this.sealRecordingMetadataFileAsProcessing(recording);
return this.stopRecording(session, recording, reason, 0); return this.stopRecording(session, recording, reason, 0);
} }
@ -134,8 +139,9 @@ public class SingleStreamRecordingService extends RecordingService {
recording.hasVideo() ? (recording.hasAudio() ? "video+audio" : "video-only") : "audioOnly", recording.hasVideo() ? (recording.hasAudio() ? "video+audio" : "video-only") : "audioOnly",
recording.getId(), recording.getSessionId(), reason); recording.getId(), recording.getSessionId(), reason);
final int numberOfActiveRecorders = recorders.get(recording.getSessionId()).size(); final HashMap<String, RecorderEndpointWrapper> wrappers = new HashMap<>(
final CountDownLatch stoppedCountDown = new CountDownLatch(numberOfActiveRecorders); recorders.get(recording.getSessionId()));
final CountDownLatch stoppedCountDown = new CountDownLatch(wrappers.size());
for (RecorderEndpointWrapper wrapper : recorders.get(recording.getSessionId()).values()) { for (RecorderEndpointWrapper wrapper : recorders.get(recording.getSessionId()).values()) {
this.stopRecorderEndpointOfPublisherEndpoint(recording.getSessionId(), wrapper.getStreamId(), this.stopRecorderEndpointOfPublisherEndpoint(recording.getSessionId(), wrapper.getStreamId(),
@ -152,15 +158,29 @@ public class SingleStreamRecordingService extends RecordingService {
} }
this.cleanRecordingMaps(recording); this.cleanRecordingMaps(recording);
this.recorders.remove(recording.getSessionId());
recording = this.sealMetadataFiles(recording); // TODO: DOWNLOAD FILES IF SCALABILITY MODE
final Recording[] finalRecordingArray = new Recording[1];
try {
this.recordingDownloader.downloadRecording(recording, wrappers.keySet(), () -> {
// Update recording entity files with final file size
for (RecorderEndpointWrapper wrapper : wrappers.values()) {
if (wrapper.getSize() == 0) {
updateIndividualMetadataFile(wrapper);
}
}
finalRecordingArray[0] = this.sealMetadataFiles(recording);
});
} catch (IOException e) {
log.error("Error while downloading recording {}", recording.getName());
}
Recording finalRecording = finalRecordingArray[0] != null ? finalRecordingArray[0] : recording;
if (reason != null && session != null) { if (reason != null && session != null) {
this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, recording, reason); this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, finalRecording, reason);
} }
return recording; return finalRecording;
} }
public void startRecorderEndpointForPublisherEndpoint(Session session, String recordingId, public void startRecorderEndpointForPublisherEndpoint(Session session, String recordingId,
@ -223,7 +243,7 @@ public class SingleStreamRecordingService extends RecordingService {
public void stopRecorderEndpointOfPublisherEndpoint(String sessionId, String streamId, public void stopRecorderEndpointOfPublisherEndpoint(String sessionId, String streamId,
CountDownLatch globalStopLatch, Long kmsDisconnectionTime) { CountDownLatch globalStopLatch, Long kmsDisconnectionTime) {
log.info("Stopping single stream recorder for stream {} in session {}", streamId, sessionId); log.info("Stopping single stream recorder for stream {} in session {}", streamId, sessionId);
final RecorderEndpointWrapper finalWrapper = this.recorders.get(sessionId).remove(streamId); final RecorderEndpointWrapper finalWrapper = recorders.get(sessionId).remove(streamId);
if (finalWrapper != null && kmsDisconnectionTime == 0) { if (finalWrapper != null && kmsDisconnectionTime == 0) {
finalWrapper.getRecorder().addStoppedListener(new EventListener<StoppedEvent>() { finalWrapper.getRecorder().addStoppedListener(new EventListener<StoppedEvent>() {
@Override @Override
@ -323,12 +343,21 @@ public class SingleStreamRecordingService extends RecordingService {
} }
private void generateIndividualMetadataFile(RecorderEndpointWrapper wrapper) { private void generateIndividualMetadataFile(RecorderEndpointWrapper wrapper) {
this.commonWriteIndividualMetadataFile(wrapper, this.fileWriter::createAndWriteFile);
}
private void updateIndividualMetadataFile(RecorderEndpointWrapper wrapper) {
this.commonWriteIndividualMetadataFile(wrapper, this.fileWriter::overwriteFile);
}
private void commonWriteIndividualMetadataFile(RecorderEndpointWrapper wrapper,
BiFunction<String, String, Boolean> writeFunction) {
String filesPath = this.openviduConfig.getOpenViduRecordingPath() + wrapper.getRecordingId() + "/"; String filesPath = this.openviduConfig.getOpenViduRecordingPath() + wrapper.getRecordingId() + "/";
File videoFile = new File(filesPath + wrapper.getStreamId() + ".webm"); File videoFile = new File(filesPath + wrapper.getStreamId() + ".webm");
wrapper.setSize(videoFile.length()); wrapper.setSize(videoFile.length());
String metadataFilePath = filesPath + INDIVIDUAL_STREAM_METADATA_FILE + wrapper.getStreamId(); String metadataFilePath = filesPath + INDIVIDUAL_STREAM_METADATA_FILE + wrapper.getStreamId();
String metadataFileContent = wrapper.toJson().toString(); String metadataFileContent = wrapper.toJson().toString();
this.fileWriter.createAndWriteFile(metadataFilePath, metadataFileContent); writeFunction.apply(metadataFilePath, metadataFileContent);
} }
private Recording sealMetadataFiles(Recording recording) { private Recording sealMetadataFiles(Recording recording) {
@ -336,7 +365,6 @@ public class SingleStreamRecordingService extends RecordingService {
// individual recordings) and "size" (sum of all individual recordings size) // individual recordings) and "size" (sum of all individual recordings size)
String folderPath = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/"; String folderPath = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/";
String metadataFilePath = folderPath + RecordingManager.RECORDING_ENTITY_FILE + recording.getId(); String metadataFilePath = folderPath + RecordingManager.RECORDING_ENTITY_FILE + recording.getId();
String syncFilePath = folderPath + recording.getName() + ".json"; String syncFilePath = folderPath + recording.getName() + ".json";
@ -397,7 +425,7 @@ public class SingleStreamRecordingService extends RecordingService {
double duration = (double) (maxEndTime - minStartTime) / 1000; double duration = (double) (maxEndTime - minStartTime) / 1000;
duration = duration > 0 ? duration : 0; duration = duration > 0 ? duration : 0;
recording = this.sealRecordingMetadataFile(recording, accumulatedSize, duration, metadataFilePath); recording = this.sealRecordingMetadataFileAsStopped(recording, accumulatedSize, duration, metadataFilePath);
return recording; return recording;
} }

View File

@ -30,19 +30,23 @@ public class CustomFileManager {
private static final Logger log = LoggerFactory.getLogger(CustomFileManager.class); private static final Logger log = LoggerFactory.getLogger(CustomFileManager.class);
public void createAndWriteFile(String filePath, String text) { public boolean createAndWriteFile(String filePath, String text) {
try { try {
this.writeAndCloseOnOutputStreamWriter(new FileOutputStream(filePath), text); this.writeAndCloseOnOutputStreamWriter(new FileOutputStream(filePath), text);
return true;
} catch (IOException e) { } catch (IOException e) {
log.error("Couldn't create file {}. Error: {}", filePath, e.getMessage()); log.error("Couldn't create file {}. Error: {}", filePath, e.getMessage());
return false;
} }
} }
public void overwriteFile(String filePath, String text) { public boolean overwriteFile(String filePath, String text) {
try { try {
this.writeAndCloseOnOutputStreamWriter(new FileOutputStream(filePath, false), text); this.writeAndCloseOnOutputStreamWriter(new FileOutputStream(filePath, false), text);
return true;
} catch (IOException e) { } catch (IOException e) {
log.error("Couldn't overwrite file {}. Error: {}", filePath, e.getMessage()); log.error("Couldn't overwrite file {}. Error: {}", filePath, e.getMessage());
return false;
} }
} }