From f5171c9306c4847fd6d1039bf55042d3677522ae Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Sat, 23 May 2020 22:45:42 +0200 Subject: [PATCH] openvidu-server: concurrent double initialization of single stream recording fix --- .../io/openvidu/server/core/Participant.java | 9 ++ .../service/SingleStreamRecordingService.java | 140 +++++++++++------- 2 files changed, 92 insertions(+), 57 deletions(-) diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/Participant.java b/openvidu-server/src/main/java/io/openvidu/server/core/Participant.java index 7bb093e2..4c17cebe 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/Participant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/Participant.java @@ -17,6 +17,9 @@ package io.openvidu.server.core; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import com.google.gson.JsonObject; import io.openvidu.server.kurento.endpoint.EndpointType; @@ -41,6 +44,12 @@ public class Participant { private final String METADATA_SEPARATOR = "%/%"; + /** + * This lock protects the initialization of a RecorderEndpoint when INDIVIDUAL + * recording + */ + public Lock singleRecordingLock = new ReentrantLock(); + public Participant(String finalUserId, String participantPrivatetId, String participantPublicId, String sessionId, Token token, String clientMetadata, GeoLocation location, String platform, EndpointType endpointType, Long createdAt) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java index 9bfd4337..1ca3a7e3 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java @@ -208,69 +208,95 @@ public class SingleStreamRecordingService extends RecordingService { public void startRecorderEndpointForPublisherEndpoint(final Session session, String recordingId, MediaProfileSpecType profile, final Participant participant, CountDownLatch globalStartLatch) { + log.info("Starting single stream recorder for stream {} in session {}", participant.getPublisherStreamId(), session.getSessionId()); - if (recordingId == null) { - // Stream is being recorded because is a new publisher in an ongoing recorded - // session. If recordingId is defined is because Stream is being recorded from - // "startRecording" method - Recording recording = this.recordingManager.sessionsRecordings.get(session.getSessionId()); - if (recording == null) { - recording = this.recordingManager.sessionsRecordingsStarting.get(session.getSessionId()); - if (recording == null) { - log.error( - "Cannot start single stream recorder for stream {} in session {}. The recording {} cannot be found", - participant.getPublisherStreamId(), session.getSessionId(), recordingId); - return; + try { + if (participant.singleRecordingLock.tryLock(15, TimeUnit.SECONDS)) { + try { + if (this.activeRecorders.get(session.getSessionId()) + .containsKey(participant.getPublisherStreamId())) { + log.warn("Concurrent initialization of RecorderEndpoint for stream {} of session {}. Returning", + participant.getPublisherStreamId(), session.getSessionId()); + return; + } + + if (recordingId == null) { + // Stream is being recorded because is a new publisher in an ongoing recorded + // session. If recordingId is defined is because Stream is being recorded from + // "startRecording" method + Recording recording = this.recordingManager.sessionsRecordings.get(session.getSessionId()); + if (recording == null) { + recording = this.recordingManager.sessionsRecordingsStarting.get(session.getSessionId()); + if (recording == null) { + log.error( + "Cannot start single stream recorder for stream {} in session {}. The recording {} cannot be found", + participant.getPublisherStreamId(), session.getSessionId(), recordingId); + return; + } + } + recordingId = recording.getId(); + + try { + profile = generateMediaProfile(recording.getRecordingProperties(), participant); + } catch (OpenViduException e) { + log.error("Cannot start single stream recorder for stream {} in session {}: {}", + participant.getPublisherStreamId(), session.getSessionId(), e.getMessage()); + return; + } + } + + KurentoParticipant kurentoParticipant = (KurentoParticipant) participant; + MediaPipeline pipeline = kurentoParticipant.getPublisher().getPipeline(); + + RecorderEndpoint recorder = new RecorderEndpoint.Builder(pipeline, + "file://" + this.openviduConfig.getOpenViduRecordingPath() + recordingId + "/" + + participant.getPublisherStreamId() + ".webm").withMediaProfile(profile).build(); + + recorder.addRecordingListener(new EventListener() { + @Override + public void onEvent(RecordingEvent event) { + activeRecorders.get(session.getSessionId()).get(participant.getPublisherStreamId()) + .setStartTime(Long.parseLong(event.getTimestampMillis())); + log.info("Recording started event for stream {}", participant.getPublisherStreamId()); + globalStartLatch.countDown(); + } + }); + + recorder.addErrorListener(new EventListener() { + @Override + public void onEvent(ErrorEvent event) { + log.error(event.getErrorCode() + " " + event.getDescription()); + } + }); + + RecorderEndpointWrapper wrapper = new RecorderEndpointWrapper(recorder, + participant.getParticipantPublicId(), recordingId, participant.getPublisherStreamId(), + participant.getClientMetadata(), participant.getServerMetadata(), + kurentoParticipant.getPublisher().getMediaOptions().hasAudio(), + kurentoParticipant.getPublisher().getMediaOptions().hasVideo(), + kurentoParticipant.getPublisher().getMediaOptions().getTypeOfVideo()); + + activeRecorders.get(session.getSessionId()).put(participant.getPublisherStreamId(), wrapper); + storedRecorders.get(session.getSessionId()).put(participant.getPublisherStreamId(), wrapper); + + connectAccordingToProfile(kurentoParticipant.getPublisher(), recorder, profile); + wrapper.getRecorder().record(); + + } finally { + participant.singleRecordingLock.unlock(); } + } else { + log.error( + "Timeout waiting for individual recording lock to be available for participant {} of session {}", + participant.getParticipantPublicId(), session.getSessionId()); } - recordingId = recording.getId(); - - try { - profile = generateMediaProfile(recording.getRecordingProperties(), participant); - } catch (OpenViduException e) { - log.error("Cannot start single stream recorder for stream {} in session {}: {}", - participant.getPublisherStreamId(), session.getSessionId(), e.getMessage()); - return; - } + } catch (InterruptedException e) { + log.error( + "InterruptedException waiting for individual recording lock to be available for participant {} of session {}", + participant.getParticipantPublicId(), session.getSessionId()); } - - KurentoParticipant kurentoParticipant = (KurentoParticipant) participant; - MediaPipeline pipeline = kurentoParticipant.getPublisher().getPipeline(); - - RecorderEndpoint recorder = new RecorderEndpoint.Builder(pipeline, - "file://" + this.openviduConfig.getOpenViduRecordingPath() + recordingId + "/" - + participant.getPublisherStreamId() + ".webm").withMediaProfile(profile).build(); - - recorder.addRecordingListener(new EventListener() { - @Override - public void onEvent(RecordingEvent event) { - activeRecorders.get(session.getSessionId()).get(participant.getPublisherStreamId()) - .setStartTime(Long.parseLong(event.getTimestampMillis())); - log.info("Recording started event for stream {}", participant.getPublisherStreamId()); - globalStartLatch.countDown(); - } - }); - - recorder.addErrorListener(new EventListener() { - @Override - public void onEvent(ErrorEvent event) { - log.error(event.getErrorCode() + " " + event.getDescription()); - } - }); - - RecorderEndpointWrapper wrapper = new RecorderEndpointWrapper(recorder, participant.getParticipantPublicId(), - recordingId, participant.getPublisherStreamId(), participant.getClientMetadata(), - participant.getServerMetadata(), kurentoParticipant.getPublisher().getMediaOptions().hasAudio(), - kurentoParticipant.getPublisher().getMediaOptions().hasVideo(), - kurentoParticipant.getPublisher().getMediaOptions().getTypeOfVideo()); - - activeRecorders.get(session.getSessionId()).put(participant.getPublisherStreamId(), wrapper); - storedRecorders.get(session.getSessionId()).put(participant.getPublisherStreamId(), wrapper); - - connectAccordingToProfile(kurentoParticipant.getPublisher(), recorder, profile); - wrapper.getRecorder().record(); } public void stopRecorderEndpointOfPublisherEndpoint(String sessionId, String streamId,