openvidu-server: recording stop refactoring

pull/391/head
pabloFuente 2020-01-23 18:45:35 +01:00
parent 6c1a168aca
commit 4fcb6839d0
6 changed files with 156 additions and 129 deletions

View File

@ -102,6 +102,11 @@ public class Session implements SessionInterface {
return null; return null;
} }
public boolean onlyRecorderParticipant() {
return this.participants.size() == 1 && ProtocolElements.RECORDER_PARTICIPANT_PUBLICID
.equals(this.participants.values().iterator().next().getParticipantPublicId());
}
public int getActivePublishers() { public int getActivePublishers() {
return activePublishers.get(); return activePublishers.get();
} }

View File

@ -461,19 +461,16 @@ public abstract class SessionManager {
* was forcibly closed. * was forcibly closed.
* *
* @param sessionId identifier of the session * @param sessionId identifier of the session
* @return
* @return set of {@link Participant} POJOS representing the session's
* participants
* @throws OpenViduException in case the session doesn't exist or has been * @throws OpenViduException in case the session doesn't exist or has been
* already closed * already closed
*/ */
public Set<Participant> closeSession(String sessionId, EndReason reason) { public void closeSession(String sessionId, EndReason reason) {
Session session = sessions.get(sessionId); Session session = sessions.get(sessionId);
if (session == null) { if (session == null) {
throw new OpenViduException(Code.ROOM_NOT_FOUND_ERROR_CODE, "Session '" + sessionId + "' not found"); throw new OpenViduException(Code.ROOM_NOT_FOUND_ERROR_CODE, "Session '" + sessionId + "' not found");
} }
if (session.isClosed()) { if (session.isClosed()) {
this.closeSessionAndEmptyCollections(session, reason); this.cleanCollections(sessionId);
throw new OpenViduException(Code.ROOM_CLOSED_ERROR_CODE, "Session '" + sessionId + "' already closed"); throw new OpenViduException(Code.ROOM_CLOSED_ERROR_CODE, "Session '" + sessionId + "' already closed");
} }
Set<Participant> participants = getParticipants(sessionId); Set<Participant> participants = getParticipants(sessionId);
@ -489,21 +486,27 @@ public abstract class SessionManager {
} }
if (!sessionClosedByLastParticipant) { if (!sessionClosedByLastParticipant) {
// This code should never be executed, as last evicted participant must trigger // This code should only be executed when there were no participants connected
// session close // to the session. That is: if the session was in the automatic recording stop
this.closeSessionAndEmptyCollections(session, reason); // timeout with INDIVIDUAL recording (no docker participant connected)
this.closeSessionAndEmptyCollections(session, reason, true);
} }
return participants;
} }
public void closeSessionAndEmptyCollections(Session session, EndReason reason) { public void closeSessionAndEmptyCollections(Session session, EndReason reason, boolean stopRecording) {
if (openviduConfig.isRecordingModuleEnabled() if (openviduConfig.isRecordingModuleEnabled() && stopRecording
&& this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) { && this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) {
recordingManager.stopRecording(session, null, RecordingManager.finalReason(reason)); recordingManager.stopRecording(session, null, RecordingManager.finalReason(reason));
} }
if (EndReason.automaticStop.equals(reason) && !session.getParticipants().isEmpty()
&& !session.onlyRecorderParticipant()) {
log.warn(
"Some user connected to the session between automatic recording stop and session close up. Canceling session close up");
return;
}
final String mediaNodeId = session.getMediaNodeId(); final String mediaNodeId = session.getMediaNodeId();
if (session.close(reason)) { if (session.close(reason)) {

View File

@ -214,7 +214,7 @@ public class KurentoSessionManager extends SessionManager {
recordingManager.initAutomaticRecordingStopThread(session); recordingManager.initAutomaticRecordingStopThread(session);
} else { } else {
log.info("No more participants in session '{}', removing it and closing it", sessionId); log.info("No more participants in session '{}', removing it and closing it", sessionId);
this.closeSessionAndEmptyCollections(session, reason); this.closeSessionAndEmptyCollections(session, reason, true);
sessionClosedByLastParticipant = true; sessionClosedByLastParticipant = true;
showTokens(); showTokens();
} }
@ -223,7 +223,7 @@ public class KurentoSessionManager extends SessionManager {
&& this.recordingManager.sessionIsBeingRecorded(sessionId) && this.recordingManager.sessionIsBeingRecorded(sessionId)
&& ProtocolElements.RECORDER_PARTICIPANT_PUBLICID && ProtocolElements.RECORDER_PARTICIPANT_PUBLICID
.equals(remainingParticipants.iterator().next().getParticipantPublicId())) { .equals(remainingParticipants.iterator().next().getParticipantPublicId())) {
// Start countdown // RECORDER participant is the last one standing. Start countdown
log.info("Last participant left. Starting {} seconds countdown for stopping recording of session {}", log.info("Last participant left. Starting {} seconds countdown for stopping recording of session {}",
this.openviduConfig.getOpenviduRecordingAutostopTimeout(), sessionId); this.openviduConfig.getOpenviduRecordingAutostopTimeout(), sessionId);
recordingManager.initAutomaticRecordingStopThread(session); recordingManager.initAutomaticRecordingStopThread(session);

View File

@ -97,7 +97,7 @@ public class ComposedRecordingService extends RecordingService {
} }
// Increment active recordings // Increment active recordings
((KurentoSession) session).getKms().getActiveRecordings().incrementAndGet(); // ((KurentoSession) session).getKms().getActiveRecordings().incrementAndGet();
return recording; return recording;
} }
@ -225,6 +225,9 @@ public class ComposedRecordingService extends RecordingService {
this.generateRecordingMetadataFile(recording); this.generateRecordingMetadataFile(recording);
// Increment active recordings
((KurentoSession) session).getKms().getActiveRecordings().incrementAndGet();
return recording; return recording;
} }
@ -235,7 +238,6 @@ public class ComposedRecordingService extends RecordingService {
RecordingManager.finalReason(reason)); RecordingManager.finalReason(reason));
String containerId = this.sessionsContainers.remove(recording.getSessionId()); String containerId = this.sessionsContainers.remove(recording.getSessionId());
this.cleanRecordingMaps(recording);
final String recordingId = recording.getId(); final String recordingId = recording.getId();
@ -248,95 +250,61 @@ public class ComposedRecordingService extends RecordingService {
} }
if (containerId == null) { if (containerId == null) {
if (this.recordingManager.startingRecordings.containsKey(recordingId)) {
// Session was closed while recording container was initializing // Session was closed while recording container was initializing
// Wait until containerId is available and force its stop and deletion // Wait until containerId is available and force its stop and deletion
new Thread(() -> { final Recording recordingAux = recording;
log.warn("Session closed while starting recording container"); new Thread(() -> {
boolean containerClosed = false; log.warn("Session closed while starting recording container");
String containerIdAux; boolean containerClosed = false;
int i = 0; String containerIdAux;
final int timeout = 30; int i = 0;
while (!containerClosed && (i < timeout)) { final int timeout = 30;
containerIdAux = this.sessionsContainers.remove(session.getSessionId()); while (!containerClosed && (i < timeout)) {
if (containerIdAux == null) { containerIdAux = this.sessionsContainers.remove(session.getSessionId());
try { if (containerIdAux == null) {
log.warn("Waiting for container to be launched..."); try {
i++; log.warn("Waiting for container to be launched...");
Thread.sleep(500); i++;
} catch (InterruptedException e) { Thread.sleep(500);
e.printStackTrace(); } catch (InterruptedException e) {
} e.printStackTrace();
} else { }
log.warn("Removing container {} for closed session {}...", containerIdAux, } else {
session.getSessionId()); log.warn("Removing container {} for closed session {}...", containerIdAux,
dockerManager.removeDockerContainer(containerIdAux, true); session.getSessionId());
containers.remove(containerId); dockerManager.removeDockerContainer(containerIdAux, true);
containerClosed = true; containers.remove(containerId);
log.warn("Container {} for closed session {} succesfully stopped and removed", containerIdAux, containerClosed = true;
session.getSessionId()); log.warn("Container {} for closed session {} succesfully stopped and removed",
log.warn("Deleting unusable files for recording {}", recordingId); containerIdAux, session.getSessionId());
if (HttpStatus.NO_CONTENT log.warn("Deleting unusable files for recording {}", recordingId);
.equals(this.recordingManager.deleteRecordingFromHost(recordingId, true))) { if (HttpStatus.NO_CONTENT
log.warn("Files properly deleted"); .equals(this.recordingManager.deleteRecordingFromHost(recordingId, true))) {
log.warn("Files properly deleted");
}
} }
} }
} cleanRecordingMaps(recordingAux);
if (i == timeout) { if (i == timeout) {
log.error("Container did not launched in {} seconds", timeout / 2); log.error("Container did not launched in {} seconds", timeout / 2);
return; return;
} }
}).start(); // Decrement active recordings
// ((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet();
}).start();
}
} else { } else {
// Gracefully stop ffmpeg process stopAndRemoveRecordingContainer(recording, containerId, 30);
try { recording = updateRecordingAttributes(recording);
dockerManager.runCommandInContainer(containerId, "echo 'q' > stop", 0);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
// Wait for the container to be gracefully self-stopped final String folderPath = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/";
final int timeOfWait = 30; final String metadataFilePath = folderPath + RecordingManager.RECORDING_ENTITY_FILE + recording.getId();
try { this.sealRecordingMetadataFileAsReady(recording, recording.getSize(), recording.getDuration(),
dockerManager.waitForContainerStopped(containerId, timeOfWait); metadataFilePath);
} catch (Exception e) { cleanRecordingMaps(recording);
failRecordingCompletion(recording, containerId,
new OpenViduException(Code.RECORDING_COMPLETION_ERROR_CODE,
"The recording completion process couldn't finish in " + timeOfWait + " seconds"));
}
// Remove container
dockerManager.removeDockerContainer(containerId, false);
containers.remove(containerId);
// Update recording attributes reading from video report file
try {
RecordingInfoUtils infoUtils = new RecordingInfoUtils(
this.openviduConfig.getOpenViduRecordingPath() + recordingId + "/" + recordingId + ".info");
if (!infoUtils.hasVideo()) {
log.error("COMPOSED recording {} with hasVideo=true has not video track", recordingId);
recording.setStatus(io.openvidu.java.client.Recording.Status.failed);
} else {
recording.setStatus(io.openvidu.java.client.Recording.Status.ready);
recording.setDuration(infoUtils.getDurationInSeconds());
recording.setSize(infoUtils.getSizeInBytes());
recording.setResolution(infoUtils.videoWidth() + "x" + infoUtils.videoHeight());
recording.setHasAudio(infoUtils.hasAudio());
recording.setHasVideo(infoUtils.hasVideo());
}
infoUtils.deleteFilePath();
} catch (IOException e) {
recording.setStatus(io.openvidu.java.client.Recording.Status.failed);
throw new OpenViduException(Code.RECORDING_REPORT_ERROR_CODE,
"There was an error generating the metadata report file for the recording: " + e.getMessage());
}
String filesPath = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/";
recording = this.sealRecordingMetadataFileAsReady(recording, recording.getSize(), recording.getDuration(),
filesPath + RecordingManager.RECORDING_ENTITY_FILE + recording.getId());
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
this.cdr.recordRecordingStatusChanged(recording, reason, timestamp, recording.getStatus()); this.cdr.recordRecordingStatusChanged(recording, reason, timestamp, recording.getStatus());
@ -344,10 +312,10 @@ public class ComposedRecordingService extends RecordingService {
if (session != null && reason != null) { if (session != null && reason != null) {
this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, recording, reason); this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, recording, reason);
} }
}
// Decrement active recordings // Decrement active recordings
((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet(); // ((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet();
}
return recording; return recording;
} }
@ -425,6 +393,53 @@ public class ComposedRecordingService extends RecordingService {
return finalRecordingArray[0]; return finalRecordingArray[0];
} }
private void stopAndRemoveRecordingContainer(Recording recording, String containerId, int secondsOfWait) {
// Gracefully stop ffmpeg process
try {
dockerManager.runCommandInContainer(containerId, "echo 'q' > stop", 0);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
// Wait for the container to be gracefully self-stopped
final int timeOfWait = 30;
try {
dockerManager.waitForContainerStopped(containerId, timeOfWait);
} catch (Exception e) {
failRecordingCompletion(recording, containerId, new OpenViduException(Code.RECORDING_COMPLETION_ERROR_CODE,
"The recording completion process couldn't finish in " + timeOfWait + " seconds"));
}
// Remove container
dockerManager.removeDockerContainer(containerId, false);
containers.remove(containerId);
}
private Recording updateRecordingAttributes(Recording recording) {
try {
RecordingInfoUtils infoUtils = new RecordingInfoUtils(this.openviduConfig.getOpenViduRecordingPath()
+ recording.getId() + "/" + recording.getId() + ".info");
if (!infoUtils.hasVideo()) {
log.error("COMPOSED recording {} with hasVideo=true has not video track", recording.getId());
recording.setStatus(io.openvidu.java.client.Recording.Status.failed);
} else {
recording.setStatus(io.openvidu.java.client.Recording.Status.ready);
recording.setDuration(infoUtils.getDurationInSeconds());
recording.setSize(infoUtils.getSizeInBytes());
recording.setResolution(infoUtils.videoWidth() + "x" + infoUtils.videoHeight());
recording.setHasAudio(infoUtils.hasAudio());
recording.setHasVideo(infoUtils.hasVideo());
}
infoUtils.deleteFilePath();
return recording;
} catch (IOException e) {
recording.setStatus(io.openvidu.java.client.Recording.Status.failed);
throw new OpenViduException(Code.RECORDING_REPORT_ERROR_CODE,
"There was an error generating the metadata report file for the recording: " + e.getMessage());
}
}
private void waitForVideoFileNotEmpty(Recording recording) throws OpenViduException { private void waitForVideoFileNotEmpty(Recording recording) throws OpenViduException {
boolean isPresent = false; boolean isPresent = false;
int i = 1; int i = 1;

View File

@ -474,48 +474,50 @@ public class RecordingManager {
public void initAutomaticRecordingStopThread(final Session session) { public void initAutomaticRecordingStopThread(final Session session) {
final String recordingId = this.sessionsRecordings.get(session.getSessionId()).getId(); final String recordingId = this.sessionsRecordings.get(session.getSessionId()).getId();
ScheduledFuture<?> future = this.automaticRecordingStopExecutor.schedule(() -> {
log.info("Stopping recording {} after {} seconds wait (no publisher published before timeout)", recordingId, this.automaticRecordingStopThreads.computeIfAbsent(session.getSessionId(), f -> {
this.openviduConfig.getOpenviduRecordingAutostopTimeout());
if (this.automaticRecordingStopThreads.remove(session.getSessionId()) != null) { ScheduledFuture<?> future = this.automaticRecordingStopExecutor.schedule(() -> {
if (session.getParticipants().size() == 0 || (session.getParticipants().size() == 1 log.info("Stopping recording {} after {} seconds wait (no publisher published before timeout)",
&& session.getParticipantByPublicId(ProtocolElements.RECORDER_PARTICIPANT_PUBLICID) != null)) { recordingId, this.openviduConfig.getOpenviduRecordingAutostopTimeout());
// Close session if there are no participants connected (except for RECORDER).
// This code won't be executed only when some user reconnects to the session if (this.automaticRecordingStopThreads.remove(session.getSessionId()) != null) {
// but never publishing (publishers automatically abort this thread) if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) {
log.info("Closing session {} after automatic stop of recording {}", session.getSessionId(), // Close session if there are no participants connected (RECORDER does not
recordingId); // count) and publishing
sessionManager.closeSessionAndEmptyCollections(session, EndReason.automaticStop); log.info("Closing session {} after automatic stop of recording {}", session.getSessionId(),
sessionManager.showTokens(); recordingId);
sessionManager.closeSessionAndEmptyCollections(session, EndReason.automaticStop, true);
sessionManager.showTokens();
} else {
// There are users connected, but no one is publishing
log.info(
"Automatic stopping recording {}. There are users connected to session {}, but no one is publishing",
recordingId, session.getSessionId());
this.stopRecording(session, recordingId, EndReason.automaticStop);
}
} else { } else {
this.stopRecording(session, recordingId, EndReason.automaticStop); // This code shouldn't be reachable
log.warn("Recording {} was already automatically stopped by a previous thread", recordingId);
} }
} else { }, this.openviduConfig.getOpenviduRecordingAutostopTimeout(), TimeUnit.SECONDS);
// This code is reachable if there already was an automatic stop of a recording
// caused by not user publishing within timeout after recording started, and a
// new automatic stop thread was started by last user leaving the session
log.warn("Recording {} was already automatically stopped by a previous thread", recordingId);
}
}, this.openviduConfig.getOpenviduRecordingAutostopTimeout(), TimeUnit.SECONDS); return future;
this.automaticRecordingStopThreads.putIfAbsent(session.getSessionId(), future); });
} }
public boolean abortAutomaticRecordingStopThread(Session session, EndReason reason) { public boolean abortAutomaticRecordingStopThread(Session session, EndReason reason) {
ScheduledFuture<?> future = this.automaticRecordingStopThreads.remove(session.getSessionId()); ScheduledFuture<?> future = this.automaticRecordingStopThreads.remove(session.getSessionId());
if (future != null) { if (future != null) {
boolean cancelled = future.cancel(false); boolean cancelled = future.cancel(false);
if (session.getParticipants().size() == 0 || (session.getParticipants().size() == 1 if (session.getParticipants().size() == 0 || session.onlyRecorderParticipant()) {
&& session.getParticipantByPublicId(ProtocolElements.RECORDER_PARTICIPANT_PUBLICID) != null)) {
// Close session if there are no participants connected (except for RECORDER). // Close session if there are no participants connected (except for RECORDER).
// This code will only be executed if recording is manually stopped during the // This code will only be executed if recording is manually stopped during the
// automatic stop timeout, so the session must be also closed // automatic stop timeout, so the session must be also closed
log.info( log.info(
"Ongoing recording of session {} was explicetly stopped within timeout for automatic recording stop. Closing session", "Ongoing recording of session {} was explicetly stopped within timeout for automatic recording stop. Closing session",
session.getSessionId()); session.getSessionId());
sessionManager.closeSessionAndEmptyCollections(session, reason); sessionManager.closeSessionAndEmptyCollections(session, reason, false);
sessionManager.showTokens(); sessionManager.showTokens();
} }
return cancelled; return cancelled;

View File

@ -223,7 +223,8 @@ public class SessionRestController {
Session sessionNotActive = this.sessionManager.getSessionNotActive(sessionId); Session sessionNotActive = this.sessionManager.getSessionNotActive(sessionId);
if (sessionNotActive != null) { if (sessionNotActive != null) {
this.sessionManager.closeSessionAndEmptyCollections(sessionNotActive, EndReason.sessionClosedByServer); this.sessionManager.closeSessionAndEmptyCollections(sessionNotActive, EndReason.sessionClosedByServer,
true);
return new ResponseEntity<>(HttpStatus.NO_CONTENT); return new ResponseEntity<>(HttpStatus.NO_CONTENT);
} else { } else {
return new ResponseEntity<>(HttpStatus.NOT_FOUND); return new ResponseEntity<>(HttpStatus.NOT_FOUND);
@ -541,7 +542,8 @@ public class SessionRestController {
session.recordingManuallyStopped.set(true); session.recordingManuallyStopped.set(true);
if (session != null && OutputMode.COMPOSED.equals(recording.getOutputMode()) && recording.hasVideo()) { if (session != null && !session.isClosed() && OutputMode.COMPOSED.equals(recording.getOutputMode())
&& recording.hasVideo()) {
sessionManager.evictParticipant( sessionManager.evictParticipant(
session.getParticipantByPublicId(ProtocolElements.RECORDER_PARTICIPANT_PUBLICID), null, null, null); session.getParticipantByPublicId(ProtocolElements.RECORDER_PARTICIPANT_PUBLICID), null, null, null);
} }