openvidu-server: stored recording wrappers not removed until download

pull/375/head
pabloFuente 2019-07-08 14:29:21 +02:00
parent bd036fc419
commit c2186e2f35
1 changed files with 19 additions and 8 deletions

View File

@ -68,7 +68,9 @@ public class SingleStreamRecordingService extends RecordingService {
private static final Logger log = LoggerFactory.getLogger(SingleStreamRecordingService.class); private static final Logger log = LoggerFactory.getLogger(SingleStreamRecordingService.class);
private Map<String, Map<String, RecorderEndpointWrapper>> recorders = new ConcurrentHashMap<>(); private Map<String, Map<String, RecorderEndpointWrapper>> activeRecorders = new ConcurrentHashMap<>();
private Map<String, Map<String, RecorderEndpointWrapper>> storedRecorders = new ConcurrentHashMap<>();
private final String INDIVIDUAL_STREAM_METADATA_FILE = ".stream."; private final String INDIVIDUAL_STREAM_METADATA_FILE = ".stream.";
public SingleStreamRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader, public SingleStreamRecordingService(RecordingManager recordingManager, RecordingDownloader recordingDownloader,
@ -91,7 +93,8 @@ public class SingleStreamRecordingService extends RecordingService {
Recording recording = new Recording(session.getSessionId(), recordingId, properties); Recording recording = new Recording(session.getSessionId(), recordingId, properties);
this.recordingManager.startingRecordings.put(recording.getId(), recording); this.recordingManager.startingRecordings.put(recording.getId(), recording);
recorders.put(session.getSessionId(), new ConcurrentHashMap<String, RecorderEndpointWrapper>()); activeRecorders.put(session.getSessionId(), new ConcurrentHashMap<String, RecorderEndpointWrapper>());
storedRecorders.put(session.getSessionId(), new ConcurrentHashMap<String, RecorderEndpointWrapper>());
final int activePublishers = session.getActivePublishers(); final int activePublishers = session.getActivePublishers();
final CountDownLatch recordingStartedCountdown = new CountDownLatch(activePublishers); final CountDownLatch recordingStartedCountdown = new CountDownLatch(activePublishers);
@ -141,10 +144,10 @@ public class SingleStreamRecordingService extends RecordingService {
recording.getId(), recording.getSessionId(), reason); recording.getId(), recording.getSessionId(), reason);
final HashMap<String, RecorderEndpointWrapper> wrappers = new HashMap<>( final HashMap<String, RecorderEndpointWrapper> wrappers = new HashMap<>(
recorders.get(recording.getSessionId())); storedRecorders.get(recording.getSessionId()));
final CountDownLatch stoppedCountDown = new CountDownLatch(wrappers.size()); final CountDownLatch stoppedCountDown = new CountDownLatch(wrappers.size());
for (RecorderEndpointWrapper wrapper : recorders.get(recording.getSessionId()).values()) { for (RecorderEndpointWrapper wrapper : wrappers.values()) {
this.stopRecorderEndpointOfPublisherEndpoint(recording.getSessionId(), wrapper.getStreamId(), this.stopRecorderEndpointOfPublisherEndpoint(recording.getSessionId(), wrapper.getStreamId(),
stoppedCountDown, kmsDisconnectionTime); stoppedCountDown, kmsDisconnectionTime);
} }
@ -177,9 +180,12 @@ public class SingleStreamRecordingService extends RecordingService {
cdr.recordRecordingStopped(finalRecordingArray[0], reason, timestamp); cdr.recordRecordingStopped(finalRecordingArray[0], reason, timestamp);
cdr.recordRecordingStatusChanged(finalRecordingArray[0], reason, timestamp, cdr.recordRecordingStatusChanged(finalRecordingArray[0], reason, timestamp,
finalRecordingArray[0].getStatus()); finalRecordingArray[0].getStatus());
storedRecorders.remove(finalRecordingArray[0].getSessionId());
}); });
} catch (IOException e) { } catch (IOException e) {
log.error("Error while downloading recording {}", finalRecordingArray[0].getName()); log.error("Error while downloading recording {}", finalRecordingArray[0].getName());
storedRecorders.remove(finalRecordingArray[0].getSessionId());
} }
if (reason != null && session != null) { if (reason != null && session != null) {
@ -221,7 +227,7 @@ public class SingleStreamRecordingService extends RecordingService {
recorder.addRecordingListener(new EventListener<RecordingEvent>() { recorder.addRecordingListener(new EventListener<RecordingEvent>() {
@Override @Override
public void onEvent(RecordingEvent event) { public void onEvent(RecordingEvent event) {
recorders.get(session.getSessionId()).get(participant.getPublisherStreamId()) activeRecorders.get(session.getSessionId()).get(participant.getPublisherStreamId())
.setStartTime(Long.parseLong(event.getTimestampMillis())); .setStartTime(Long.parseLong(event.getTimestampMillis()));
log.info("Recording started event for stream {}", participant.getPublisherStreamId()); log.info("Recording started event for stream {}", participant.getPublisherStreamId());
globalStartLatch.countDown(); globalStartLatch.countDown();
@ -243,14 +249,15 @@ public class SingleStreamRecordingService extends RecordingService {
kurentoParticipant.getPublisher().getMediaOptions().hasVideo(), kurentoParticipant.getPublisher().getMediaOptions().hasVideo(),
kurentoParticipant.getPublisher().getMediaOptions().getTypeOfVideo()); kurentoParticipant.getPublisher().getMediaOptions().getTypeOfVideo());
recorders.get(session.getSessionId()).put(participant.getPublisherStreamId(), wrapper); activeRecorders.get(session.getSessionId()).put(participant.getPublisherStreamId(), wrapper);
storedRecorders.get(session.getSessionId()).put(participant.getPublisherStreamId(), wrapper);
wrapper.getRecorder().record(); wrapper.getRecorder().record();
} }
public void stopRecorderEndpointOfPublisherEndpoint(String sessionId, String streamId, public void stopRecorderEndpointOfPublisherEndpoint(String sessionId, String streamId,
CountDownLatch globalStopLatch, Long kmsDisconnectionTime) { CountDownLatch globalStopLatch, Long kmsDisconnectionTime) {
log.info("Stopping single stream recorder for stream {} in session {}", streamId, sessionId); log.info("Stopping single stream recorder for stream {} in session {}", streamId, sessionId);
final RecorderEndpointWrapper finalWrapper = recorders.get(sessionId).remove(streamId); final RecorderEndpointWrapper finalWrapper = activeRecorders.get(sessionId).remove(streamId);
if (finalWrapper != null && kmsDisconnectionTime == 0) { if (finalWrapper != null && kmsDisconnectionTime == 0) {
finalWrapper.getRecorder().addStoppedListener(new EventListener<StoppedEvent>() { finalWrapper.getRecorder().addStoppedListener(new EventListener<StoppedEvent>() {
@Override @Override
@ -271,7 +278,11 @@ public class SingleStreamRecordingService extends RecordingService {
log.warn("Forcing individual recording stop after KMS restart for stream {} in session {}", streamId, log.warn("Forcing individual recording stop after KMS restart for stream {} in session {}", streamId,
sessionId); sessionId);
} else { } else {
log.error("Stream {} wasn't being recorded in session {}", streamId, sessionId); if (storedRecorders.get(sessionId).containsKey(streamId)) {
log.info("Stream {} recording of session {} was already stopped", streamId, sessionId);
} else {
log.error("Stream {} wasn't being recorded in session {}", streamId, sessionId);
}
} }
globalStopLatch.countDown(); globalStopLatch.countDown();
} }