From 207acdd2b6ca74b071ac525487097b8cdfa6ee0d Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Fri, 18 Sep 2020 12:37:13 +0200 Subject: [PATCH] openvidu-server: use recordingId instead of sessionId for managing single stream recorders (NullPointer fix) --- .../recording/service/RecordingManager.java | 2 +- .../service/SingleStreamRecordingService.java | 48 +++++++++---------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java index 9d4dc50b..4f9ca237 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java @@ -407,7 +407,7 @@ public class RecordingManager { log.info("Stopping RecorderEndpoint in session {} for stream of participant {}", session.getSessionId(), streamId); final CountDownLatch stoppedCountDown = new CountDownLatch(1); - this.singleStreamRecordingService.stopRecorderEndpointOfPublisherEndpoint(session.getSessionId(), streamId, + this.singleStreamRecordingService.stopRecorderEndpointOfPublisherEndpoint(recording.getId(), streamId, stoppedCountDown, kmsDisconnectionTime); try { if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java index 27c7e61f..170ef9a7 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java @@ -97,8 +97,8 @@ public class SingleStreamRecordingService extends RecordingService { Recording recording = new Recording(session.getSessionId(), recordingId, properties); this.recordingManager.recordingToStarting(recording); - activeRecorders.put(session.getSessionId(), new ConcurrentHashMap()); - storedRecorders.put(session.getSessionId(), new ConcurrentHashMap()); + activeRecorders.put(recording.getId(), new ConcurrentHashMap()); + storedRecorders.put(recording.getId(), new ConcurrentHashMap()); final int activePublishers = session.getActivePublishers(); final CountDownLatch recordingStartedCountdown = new CountDownLatch(activePublishers); @@ -149,13 +149,12 @@ public class SingleStreamRecordingService extends RecordingService { recording.hasVideo() ? (recording.hasAudio() ? "video+audio" : "video-only") : "audioOnly", recording.getId(), recording.getSessionId(), reason); - final HashMap wrappers = new HashMap<>( - storedRecorders.get(recording.getSessionId())); + final HashMap wrappers = new HashMap<>(storedRecorders.get(recording.getId())); final CountDownLatch stoppedCountDown = new CountDownLatch(wrappers.size()); for (RecorderEndpointWrapper wrapper : wrappers.values()) { - this.stopRecorderEndpointOfPublisherEndpoint(recording.getSessionId(), wrapper.getStreamId(), - stoppedCountDown, kmsDisconnectionTime); + this.stopRecorderEndpointOfPublisherEndpoint(recording.getId(), wrapper.getStreamId(), stoppedCountDown, + kmsDisconnectionTime); } try { if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) { @@ -181,7 +180,7 @@ public class SingleStreamRecordingService extends RecordingService { } finalRecordingArray[0] = this.sealMetadataFiles(finalRecordingArray[0]); - cleanRecordingWrappers(finalRecordingArray[0].getSessionId()); + cleanRecordingWrappers(finalRecordingArray[0]); // Decrement active recordings once it is downloaded ((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet(); @@ -195,7 +194,7 @@ public class SingleStreamRecordingService extends RecordingService { }); } catch (IOException e) { log.error("Error while downloading recording {}", finalRecordingArray[0].getName()); - cleanRecordingWrappers(finalRecordingArray[0].getSessionId()); + cleanRecordingWrappers(finalRecordingArray[0]); } if (reason != null && session != null) { @@ -215,8 +214,7 @@ public class SingleStreamRecordingService extends RecordingService { try { if (participant.singleRecordingLock.tryLock(15, TimeUnit.SECONDS)) { try { - if (this.activeRecorders.get(session.getSessionId()) - .containsKey(participant.getPublisherStreamId())) { + if (this.activeRecorders.get(recordingId).containsKey(participant.getPublisherStreamId())) { log.warn("Concurrent initialization of RecorderEndpoint for stream {} of session {}. Returning", participant.getPublisherStreamId(), session.getSessionId()); return; @@ -254,10 +252,12 @@ public class SingleStreamRecordingService extends RecordingService { "file://" + openviduConfig.getOpenViduRemoteRecordingPath() + recordingId + "/" + participant.getPublisherStreamId() + ".webm").withMediaProfile(profile).build(); + final String finalRecordingId = recordingId; + recorder.addRecordingListener(new EventListener() { @Override public void onEvent(RecordingEvent event) { - activeRecorders.get(session.getSessionId()).get(participant.getPublisherStreamId()) + activeRecorders.get(finalRecordingId).get(participant.getPublisherStreamId()) .setStartTime(Long.parseLong(event.getTimestampMillis())); log.info("Recording started event for stream {}", participant.getPublisherStreamId()); globalStartLatch.countDown(); @@ -278,8 +278,8 @@ public class SingleStreamRecordingService extends RecordingService { kurentoParticipant.getPublisher().getMediaOptions().hasVideo(), kurentoParticipant.getPublisher().getMediaOptions().getTypeOfVideo()); - activeRecorders.get(session.getSessionId()).put(participant.getPublisherStreamId(), wrapper); - storedRecorders.get(session.getSessionId()).put(participant.getPublisherStreamId(), wrapper); + activeRecorders.get(recordingId).put(participant.getPublisherStreamId(), wrapper); + storedRecorders.get(recordingId).put(participant.getPublisherStreamId(), wrapper); connectAccordingToProfile(kurentoParticipant.getPublisher(), recorder, profile); wrapper.getRecorder().record(); @@ -299,10 +299,10 @@ public class SingleStreamRecordingService extends RecordingService { } } - public void stopRecorderEndpointOfPublisherEndpoint(String sessionId, String streamId, + public void stopRecorderEndpointOfPublisherEndpoint(String recordingId, String streamId, CountDownLatch globalStopLatch, Long kmsDisconnectionTime) { - log.info("Stopping single stream recorder for stream {} in session {}", streamId, sessionId); - final RecorderEndpointWrapper finalWrapper = activeRecorders.get(sessionId).remove(streamId); + log.info("Stopping single stream recorder for stream {} in recording {}", streamId, recordingId); + final RecorderEndpointWrapper finalWrapper = activeRecorders.get(recordingId).remove(streamId); if (finalWrapper != null && kmsDisconnectionTime == 0) { finalWrapper.getRecorder().addStoppedListener(new EventListener() { @Override @@ -320,13 +320,13 @@ public class SingleStreamRecordingService extends RecordingService { // Stopping recorder endpoint because of a KMS disconnection finalWrapper.setEndTime(kmsDisconnectionTime); generateIndividualMetadataFile(finalWrapper); - log.warn("Forcing individual recording stop after KMS restart for stream {} in session {}", streamId, - sessionId); + log.warn("Forcing individual recording stop after KMS restart for stream {} in recording {}", streamId, + recordingId); } else { - if (storedRecorders.get(sessionId).containsKey(streamId)) { - log.info("Stream {} recording of session {} was already stopped", streamId, sessionId); + if (storedRecorders.get(recordingId).containsKey(streamId)) { + log.info("Stream {} recording of recording {} was already stopped", streamId, recordingId); } else { - log.error("Stream {} wasn't being recorded in session {}", streamId, sessionId); + log.error("Stream {} wasn't being recorded in recording {}", streamId, recordingId); } } globalStopLatch.countDown(); @@ -539,9 +539,9 @@ public class SingleStreamRecordingService extends RecordingService { } } - private void cleanRecordingWrappers(String sessionId) { - this.storedRecorders.remove(sessionId); - this.activeRecorders.remove(sessionId); + private void cleanRecordingWrappers(Recording recording) { + this.storedRecorders.remove(recording.getId()); + this.activeRecorders.remove(recording.getId()); } }