openvidu-server: use recordingId instead of sessionId for managing single stream recorders (NullPointer fix)

pull/546/head
pabloFuente 2020-09-18 12:37:13 +02:00
parent 01894918b7
commit 207acdd2b6
2 changed files with 25 additions and 25 deletions

View File

@ -407,7 +407,7 @@ public class RecordingManager {
log.info("Stopping RecorderEndpoint in session {} for stream of participant {}", session.getSessionId(), log.info("Stopping RecorderEndpoint in session {} for stream of participant {}", session.getSessionId(),
streamId); streamId);
final CountDownLatch stoppedCountDown = new CountDownLatch(1); final CountDownLatch stoppedCountDown = new CountDownLatch(1);
this.singleStreamRecordingService.stopRecorderEndpointOfPublisherEndpoint(session.getSessionId(), streamId, this.singleStreamRecordingService.stopRecorderEndpointOfPublisherEndpoint(recording.getId(), streamId,
stoppedCountDown, kmsDisconnectionTime); stoppedCountDown, kmsDisconnectionTime);
try { try {
if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) { if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) {

View File

@ -97,8 +97,8 @@ public class SingleStreamRecordingService extends RecordingService {
Recording recording = new Recording(session.getSessionId(), recordingId, properties); Recording recording = new Recording(session.getSessionId(), recordingId, properties);
this.recordingManager.recordingToStarting(recording); this.recordingManager.recordingToStarting(recording);
activeRecorders.put(session.getSessionId(), new ConcurrentHashMap<String, RecorderEndpointWrapper>()); activeRecorders.put(recording.getId(), new ConcurrentHashMap<String, RecorderEndpointWrapper>());
storedRecorders.put(session.getSessionId(), new ConcurrentHashMap<String, RecorderEndpointWrapper>()); storedRecorders.put(recording.getId(), new ConcurrentHashMap<String, RecorderEndpointWrapper>());
final int activePublishers = session.getActivePublishers(); final int activePublishers = session.getActivePublishers();
final CountDownLatch recordingStartedCountdown = new CountDownLatch(activePublishers); final CountDownLatch recordingStartedCountdown = new CountDownLatch(activePublishers);
@ -149,13 +149,12 @@ public class SingleStreamRecordingService extends RecordingService {
recording.hasVideo() ? (recording.hasAudio() ? "video+audio" : "video-only") : "audioOnly", recording.hasVideo() ? (recording.hasAudio() ? "video+audio" : "video-only") : "audioOnly",
recording.getId(), recording.getSessionId(), reason); recording.getId(), recording.getSessionId(), reason);
final HashMap<String, RecorderEndpointWrapper> wrappers = new HashMap<>( final HashMap<String, RecorderEndpointWrapper> wrappers = new HashMap<>(storedRecorders.get(recording.getId()));
storedRecorders.get(recording.getSessionId()));
final CountDownLatch stoppedCountDown = new CountDownLatch(wrappers.size()); final CountDownLatch stoppedCountDown = new CountDownLatch(wrappers.size());
for (RecorderEndpointWrapper wrapper : wrappers.values()) { for (RecorderEndpointWrapper wrapper : wrappers.values()) {
this.stopRecorderEndpointOfPublisherEndpoint(recording.getSessionId(), wrapper.getStreamId(), this.stopRecorderEndpointOfPublisherEndpoint(recording.getId(), wrapper.getStreamId(), stoppedCountDown,
stoppedCountDown, kmsDisconnectionTime); kmsDisconnectionTime);
} }
try { try {
if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) { if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) {
@ -181,7 +180,7 @@ public class SingleStreamRecordingService extends RecordingService {
} }
finalRecordingArray[0] = this.sealMetadataFiles(finalRecordingArray[0]); finalRecordingArray[0] = this.sealMetadataFiles(finalRecordingArray[0]);
cleanRecordingWrappers(finalRecordingArray[0].getSessionId()); cleanRecordingWrappers(finalRecordingArray[0]);
// Decrement active recordings once it is downloaded // Decrement active recordings once it is downloaded
((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet(); ((KurentoSession) session).getKms().getActiveRecordings().decrementAndGet();
@ -195,7 +194,7 @@ public class SingleStreamRecordingService extends RecordingService {
}); });
} catch (IOException e) { } catch (IOException e) {
log.error("Error while downloading recording {}", finalRecordingArray[0].getName()); log.error("Error while downloading recording {}", finalRecordingArray[0].getName());
cleanRecordingWrappers(finalRecordingArray[0].getSessionId()); cleanRecordingWrappers(finalRecordingArray[0]);
} }
if (reason != null && session != null) { if (reason != null && session != null) {
@ -215,8 +214,7 @@ public class SingleStreamRecordingService extends RecordingService {
try { try {
if (participant.singleRecordingLock.tryLock(15, TimeUnit.SECONDS)) { if (participant.singleRecordingLock.tryLock(15, TimeUnit.SECONDS)) {
try { try {
if (this.activeRecorders.get(session.getSessionId()) if (this.activeRecorders.get(recordingId).containsKey(participant.getPublisherStreamId())) {
.containsKey(participant.getPublisherStreamId())) {
log.warn("Concurrent initialization of RecorderEndpoint for stream {} of session {}. Returning", log.warn("Concurrent initialization of RecorderEndpoint for stream {} of session {}. Returning",
participant.getPublisherStreamId(), session.getSessionId()); participant.getPublisherStreamId(), session.getSessionId());
return; return;
@ -254,10 +252,12 @@ public class SingleStreamRecordingService extends RecordingService {
"file://" + openviduConfig.getOpenViduRemoteRecordingPath() + recordingId + "/" "file://" + openviduConfig.getOpenViduRemoteRecordingPath() + recordingId + "/"
+ participant.getPublisherStreamId() + ".webm").withMediaProfile(profile).build(); + participant.getPublisherStreamId() + ".webm").withMediaProfile(profile).build();
final String finalRecordingId = recordingId;
recorder.addRecordingListener(new EventListener<RecordingEvent>() { recorder.addRecordingListener(new EventListener<RecordingEvent>() {
@Override @Override
public void onEvent(RecordingEvent event) { public void onEvent(RecordingEvent event) {
activeRecorders.get(session.getSessionId()).get(participant.getPublisherStreamId()) activeRecorders.get(finalRecordingId).get(participant.getPublisherStreamId())
.setStartTime(Long.parseLong(event.getTimestampMillis())); .setStartTime(Long.parseLong(event.getTimestampMillis()));
log.info("Recording started event for stream {}", participant.getPublisherStreamId()); log.info("Recording started event for stream {}", participant.getPublisherStreamId());
globalStartLatch.countDown(); globalStartLatch.countDown();
@ -278,8 +278,8 @@ public class SingleStreamRecordingService extends RecordingService {
kurentoParticipant.getPublisher().getMediaOptions().hasVideo(), kurentoParticipant.getPublisher().getMediaOptions().hasVideo(),
kurentoParticipant.getPublisher().getMediaOptions().getTypeOfVideo()); kurentoParticipant.getPublisher().getMediaOptions().getTypeOfVideo());
activeRecorders.get(session.getSessionId()).put(participant.getPublisherStreamId(), wrapper); activeRecorders.get(recordingId).put(participant.getPublisherStreamId(), wrapper);
storedRecorders.get(session.getSessionId()).put(participant.getPublisherStreamId(), wrapper); storedRecorders.get(recordingId).put(participant.getPublisherStreamId(), wrapper);
connectAccordingToProfile(kurentoParticipant.getPublisher(), recorder, profile); connectAccordingToProfile(kurentoParticipant.getPublisher(), recorder, profile);
wrapper.getRecorder().record(); wrapper.getRecorder().record();
@ -299,10 +299,10 @@ public class SingleStreamRecordingService extends RecordingService {
} }
} }
public void stopRecorderEndpointOfPublisherEndpoint(String sessionId, String streamId, public void stopRecorderEndpointOfPublisherEndpoint(String recordingId, String streamId,
CountDownLatch globalStopLatch, Long kmsDisconnectionTime) { CountDownLatch globalStopLatch, Long kmsDisconnectionTime) {
log.info("Stopping single stream recorder for stream {} in session {}", streamId, sessionId); log.info("Stopping single stream recorder for stream {} in recording {}", streamId, recordingId);
final RecorderEndpointWrapper finalWrapper = activeRecorders.get(sessionId).remove(streamId); final RecorderEndpointWrapper finalWrapper = activeRecorders.get(recordingId).remove(streamId);
if (finalWrapper != null && kmsDisconnectionTime == 0) { if (finalWrapper != null && kmsDisconnectionTime == 0) {
finalWrapper.getRecorder().addStoppedListener(new EventListener<StoppedEvent>() { finalWrapper.getRecorder().addStoppedListener(new EventListener<StoppedEvent>() {
@Override @Override
@ -320,13 +320,13 @@ public class SingleStreamRecordingService extends RecordingService {
// Stopping recorder endpoint because of a KMS disconnection // Stopping recorder endpoint because of a KMS disconnection
finalWrapper.setEndTime(kmsDisconnectionTime); finalWrapper.setEndTime(kmsDisconnectionTime);
generateIndividualMetadataFile(finalWrapper); generateIndividualMetadataFile(finalWrapper);
log.warn("Forcing individual recording stop after KMS restart for stream {} in session {}", streamId, log.warn("Forcing individual recording stop after KMS restart for stream {} in recording {}", streamId,
sessionId); recordingId);
} else { } else {
if (storedRecorders.get(sessionId).containsKey(streamId)) { if (storedRecorders.get(recordingId).containsKey(streamId)) {
log.info("Stream {} recording of session {} was already stopped", streamId, sessionId); log.info("Stream {} recording of recording {} was already stopped", streamId, recordingId);
} else { } else {
log.error("Stream {} wasn't being recorded in session {}", streamId, sessionId); log.error("Stream {} wasn't being recorded in recording {}", streamId, recordingId);
} }
} }
globalStopLatch.countDown(); globalStopLatch.countDown();
@ -539,9 +539,9 @@ public class SingleStreamRecordingService extends RecordingService {
} }
} }
private void cleanRecordingWrappers(String sessionId) { private void cleanRecordingWrappers(Recording recording) {
this.storedRecorders.remove(sessionId); this.storedRecorders.remove(recording.getId());
this.activeRecorders.remove(sessionId); this.activeRecorders.remove(recording.getId());
} }
} }