diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java index 4d7eea18..9236003b 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java @@ -68,7 +68,9 @@ public class SingleStreamRecordingService extends RecordingService { private static final Logger log = LoggerFactory.getLogger(SingleStreamRecordingService.class); - private Map> recorders = new ConcurrentHashMap<>(); + private Map> activeRecorders = new ConcurrentHashMap<>(); + private Map> storedRecorders = new ConcurrentHashMap<>(); + private final String INDIVIDUAL_STREAM_METADATA_FILE = ".stream."; public SingleStreamRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader, @@ -91,7 +93,8 @@ public class SingleStreamRecordingService extends RecordingService { Recording recording = new Recording(session.getSessionId(), recordingId, properties); this.recordingManager.startingRecordings.put(recording.getId(), recording); - recorders.put(session.getSessionId(), new ConcurrentHashMap()); + activeRecorders.put(session.getSessionId(), new ConcurrentHashMap()); + storedRecorders.put(session.getSessionId(), new ConcurrentHashMap()); final int activePublishers = session.getActivePublishers(); final CountDownLatch recordingStartedCountdown = new CountDownLatch(activePublishers); @@ -141,10 +144,10 @@ public class SingleStreamRecordingService extends RecordingService { recording.getId(), recording.getSessionId(), reason); final HashMap wrappers = new HashMap<>( - recorders.get(recording.getSessionId())); + storedRecorders.get(recording.getSessionId())); final CountDownLatch stoppedCountDown = new CountDownLatch(wrappers.size()); - for (RecorderEndpointWrapper wrapper : recorders.get(recording.getSessionId()).values()) { + for (RecorderEndpointWrapper wrapper : wrappers.values()) { this.stopRecorderEndpointOfPublisherEndpoint(recording.getSessionId(), wrapper.getStreamId(), stoppedCountDown, kmsDisconnectionTime); } @@ -177,9 +180,12 @@ public class SingleStreamRecordingService extends RecordingService { cdr.recordRecordingStopped(finalRecordingArray[0], reason, timestamp); cdr.recordRecordingStatusChanged(finalRecordingArray[0], reason, timestamp, finalRecordingArray[0].getStatus()); + + storedRecorders.remove(finalRecordingArray[0].getSessionId()); }); } catch (IOException e) { log.error("Error while downloading recording {}", finalRecordingArray[0].getName()); + storedRecorders.remove(finalRecordingArray[0].getSessionId()); } if (reason != null && session != null) { @@ -221,7 +227,7 @@ public class SingleStreamRecordingService extends RecordingService { recorder.addRecordingListener(new EventListener() { @Override public void onEvent(RecordingEvent event) { - recorders.get(session.getSessionId()).get(participant.getPublisherStreamId()) + activeRecorders.get(session.getSessionId()).get(participant.getPublisherStreamId()) .setStartTime(Long.parseLong(event.getTimestampMillis())); log.info("Recording started event for stream {}", participant.getPublisherStreamId()); globalStartLatch.countDown(); @@ -243,14 +249,15 @@ public class SingleStreamRecordingService extends RecordingService { kurentoParticipant.getPublisher().getMediaOptions().hasVideo(), kurentoParticipant.getPublisher().getMediaOptions().getTypeOfVideo()); - recorders.get(session.getSessionId()).put(participant.getPublisherStreamId(), wrapper); + activeRecorders.get(session.getSessionId()).put(participant.getPublisherStreamId(), wrapper); + storedRecorders.get(session.getSessionId()).put(participant.getPublisherStreamId(), wrapper); wrapper.getRecorder().record(); } public void stopRecorderEndpointOfPublisherEndpoint(String sessionId, String streamId, CountDownLatch globalStopLatch, Long kmsDisconnectionTime) { log.info("Stopping single stream recorder for stream {} in session {}", streamId, sessionId); - final RecorderEndpointWrapper finalWrapper = recorders.get(sessionId).remove(streamId); + final RecorderEndpointWrapper finalWrapper = activeRecorders.get(sessionId).remove(streamId); if (finalWrapper != null && kmsDisconnectionTime == 0) { finalWrapper.getRecorder().addStoppedListener(new EventListener() { @Override @@ -271,7 +278,11 @@ public class SingleStreamRecordingService extends RecordingService { log.warn("Forcing individual recording stop after KMS restart for stream {} in session {}", streamId, sessionId); } else { - log.error("Stream {} wasn't being recorded in session {}", streamId, sessionId); + if (storedRecorders.get(sessionId).containsKey(streamId)) { + log.info("Stream {} recording of session {} was already stopped", streamId, sessionId); + } else { + log.error("Stream {} wasn't being recorded in session {}", streamId, sessionId); + } } globalStopLatch.countDown(); }