diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/Session.java b/openvidu-server/src/main/java/io/openvidu/server/core/Session.java index 88e98b1d..099574ae 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/Session.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/Session.java @@ -78,6 +78,12 @@ public class Session implements SessionInterface { */ public Lock joinLeaveLock = new ReentrantLock(); + /** + * This lock protects initialization of ALWAYS recordings upon first user + * publishing + */ + public Lock recordingLock = new ReentrantLock(); + public final AtomicBoolean recordingManuallyStopped = new AtomicBoolean(false); public Session(Session previousSession) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index 3db64363..8807ff9f 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -397,31 +397,52 @@ public class KurentoSessionManager extends SessionManager { if (this.openviduConfig.isRecordingModuleEnabled() && MediaMode.ROUTED.equals(kSession.getSessionProperties().mediaMode()) && kSession.getActivePublishers() == 0) { - if (RecordingMode.ALWAYS.equals(kSession.getSessionProperties().recordingMode()) - && !recordingManager.sessionIsBeingRecorded(kSession.getSessionId()) - && !kSession.recordingManuallyStopped.get()) { - // Start automatic recording for sessions configured with RecordingMode.ALWAYS - new Thread(() -> { - recordingManager.startRecording(kSession, - new RecordingProperties.Builder().name("") - .outputMode(kSession.getSessionProperties().defaultOutputMode()) - .recordingLayout(kSession.getSessionProperties().defaultRecordingLayout()) - .customLayout(kSession.getSessionProperties().defaultCustomLayout()).build()); - }).start(); - } else if (RecordingMode.MANUAL.equals(kSession.getSessionProperties().recordingMode()) - && recordingManager.sessionIsBeingRecorded(kSession.getSessionId())) { - // Abort automatic recording stop (user published before timeout) - log.info("Participant {} published before timeout finished. Aborting automatic recording stop", - participant.getParticipantPublicId()); - boolean stopAborted = recordingManager.abortAutomaticRecordingStopThread(kSession, - EndReason.automaticStop); - if (stopAborted) { - log.info("Automatic recording stopped successfully aborted"); + + try { + if (kSession.recordingLock.tryLock(15, TimeUnit.SECONDS)) { + try { + + if (RecordingMode.ALWAYS.equals(kSession.getSessionProperties().recordingMode()) + && !recordingManager.sessionIsBeingRecorded(kSession.getSessionId()) + && !kSession.recordingManuallyStopped.get()) { + // Start automatic recording for sessions configured with RecordingMode.ALWAYS + new Thread(() -> { + recordingManager.startRecording(kSession, new RecordingProperties.Builder().name("") + .outputMode(kSession.getSessionProperties().defaultOutputMode()) + .recordingLayout(kSession.getSessionProperties().defaultRecordingLayout()) + .customLayout(kSession.getSessionProperties().defaultCustomLayout()).build()); + }).start(); + } else if (RecordingMode.MANUAL.equals(kSession.getSessionProperties().recordingMode()) + && recordingManager.sessionIsBeingRecorded(kSession.getSessionId())) { + // Abort automatic recording stop (user published before timeout) + log.info( + "Participant {} published before timeout finished. Aborting automatic recording stop", + participant.getParticipantPublicId()); + boolean stopAborted = recordingManager.abortAutomaticRecordingStopThread(kSession, + EndReason.automaticStop); + if (stopAborted) { + log.info("Automatic recording stopped successfully aborted"); + } else { + log.info( + "Automatic recording stopped couldn't be aborted. Recording of session {} has stopped", + kSession.getSessionId()); + } + } + + } finally { + kSession.recordingLock.unlock(); + } } else { - log.info("Automatic recording stopped couldn't be aborted. Recording of session {} has stopped", - kSession.getSessionId()); + log.error( + "Timeout waiting for recording Session lock to be available for participant {} of session {} in publishVideo", + participant.getParticipantPublicId(), kSession.getSessionId()); } + } catch (InterruptedException e) { + log.error( + "InterruptedException waiting for recording Session lock to be available for participant {} of session {} in publishVideo", + participant.getParticipantPublicId(), kSession.getSessionId()); } + } kSession.newPublisher(participant); diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java index a55cc1f4..ab9bd6e1 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java @@ -85,7 +85,7 @@ public class ComposedRecordingService extends RecordingService { // Instantiate and store recording object Recording recording = new Recording(session.getSessionId(), recordingId, properties); - this.recordingManager.startingRecordings.put(recording.getId(), recording); + this.recordingManager.recordingToStarting(recording); if (properties.hasVideo()) { // Docker container used diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java index 33ac3a80..2f993944 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java @@ -107,6 +107,7 @@ public class RecordingManager { protected Map startingRecordings = new ConcurrentHashMap<>(); protected Map startedRecordings = new ConcurrentHashMap<>(); protected Map sessionsRecordings = new ConcurrentHashMap<>(); + protected Map sessionsRecordingsStarting = new ConcurrentHashMap<>(); private final Map> automaticRecordingStopThreads = new ConcurrentHashMap<>(); private JsonUtils jsonUtils = new JsonUtils(); @@ -239,10 +240,10 @@ public class RecordingManager { recording = this.singleStreamRecordingService.startRecording(session, properties); break; } - } catch (OpenViduException e) { + } catch (Exception e) { throw e; } - this.updateRecordingManagerCollections(session, recording); + this.recordingFromStartingToStarted(recording); this.cdr.recordRecordingStarted(recording); this.cdr.recordRecordingStatusChanged(recording, null, recording.getCreatedAt(), @@ -313,8 +314,12 @@ public class RecordingManager { Participant participant) { Recording recording = this.sessionsRecordings.get(session.getSessionId()); if (recording == null) { - log.error("Cannot start recording of new stream {}. Session {} is not being recorded", - participant.getPublisherStreamId(), session.getSessionId()); + recording = this.sessionsRecordingsStarting.get(session.getSessionId()); + if (recording == null) { + log.error("Cannot start recording of new stream {}. Session {} is not being recorded", + participant.getPublisherStreamId(), session.getSessionId()); + return; + } } if (io.openvidu.java.client.Recording.OutputMode.INDIVIDUAL.equals(recording.getOutputMode())) { // Start new RecorderEndpoint for this stream @@ -363,7 +368,8 @@ public class RecordingManager { } public boolean sessionIsBeingRecorded(String sessionId) { - return (this.sessionsRecordings.get(sessionId) != null); + return (this.sessionsRecordings.get(sessionId) != null + || this.sessionsRecordingsStarting.get(sessionId) != null); } public Recording getStartedRecording(String recordingId) { @@ -758,13 +764,25 @@ public class RecordingManager { } /** - * Changes recording from starting to started, updates global recording - * collections and sends RPC response to clients + * New starting recording */ - private void updateRecordingManagerCollections(Session session, Recording recording) { - this.sessionHandler.setRecordingStarted(session.getSessionId(), recording); - this.sessionsRecordings.put(session.getSessionId(), recording); + public void recordingToStarting(Recording recording) throws RuntimeException { + if ((startingRecordings.putIfAbsent(recording.getId(), recording) != null) + || (sessionsRecordingsStarting.putIfAbsent(recording.getSessionId(), recording) != null)) { + log.error("Concurrent session recording initialization. Aborting this thread"); + throw new RuntimeException("Concurrent initialization of recording " + recording.getId()); + } + } + + /** + * Changes recording from starting to started, updating global recording + * collection + */ + private void recordingFromStartingToStarted(Recording recording) { + this.sessionHandler.setRecordingStarted(recording.getSessionId(), recording); + this.sessionsRecordings.put(recording.getSessionId(), recording); this.startingRecordings.remove(recording.getId()); + this.sessionsRecordingsStarting.remove(recording.getSessionId()); this.startedRecordings.put(recording.getId(), recording); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingService.java index 1f8dd514..fb09cd51 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingService.java @@ -175,6 +175,7 @@ public abstract class RecordingService { log.error("Recording start failed for session {}: {}", session.getSessionId(), errorMessage); recording.setStatus(io.openvidu.java.client.Recording.Status.failed); this.recordingManager.startingRecordings.remove(recording.getId()); + this.recordingManager.sessionsRecordingsStarting.remove(session.getSessionId()); this.stopRecording(session, recording, null); return new OpenViduException(Code.RECORDING_START_ERROR_CODE, errorMessage); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java index ed5c3ffb..9bfd4337 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java @@ -93,7 +93,7 @@ public class SingleStreamRecordingService extends RecordingService { recordingId, session.getSessionId()); Recording recording = new Recording(session.getSessionId(), recordingId, properties); - this.recordingManager.startingRecordings.put(recording.getId(), recording); + this.recordingManager.recordingToStarting(recording); activeRecorders.put(session.getSessionId(), new ConcurrentHashMap()); storedRecorders.put(session.getSessionId(), new ConcurrentHashMap()); @@ -216,6 +216,15 @@ public class SingleStreamRecordingService extends RecordingService { // session. If recordingId is defined is because Stream is being recorded from // "startRecording" method Recording recording = this.recordingManager.sessionsRecordings.get(session.getSessionId()); + if (recording == null) { + recording = this.recordingManager.sessionsRecordingsStarting.get(session.getSessionId()); + if (recording == null) { + log.error( + "Cannot start single stream recorder for stream {} in session {}. The recording {} cannot be found", + participant.getPublisherStreamId(), session.getSessionId(), recordingId); + return; + } + } recordingId = recording.getId(); try { @@ -237,13 +246,8 @@ public class SingleStreamRecordingService extends RecordingService { recorder.addRecordingListener(new EventListener() { @Override public void onEvent(RecordingEvent event) { - activeRecorders - .get(session - .getSessionId()) - .get(participant - .getPublisherStreamId()) - .setStartTime(Long.parseLong(event - .getTimestampMillis())); + activeRecorders.get(session.getSessionId()).get(participant.getPublisherStreamId()) + .setStartTime(Long.parseLong(event.getTimestampMillis())); log.info("Recording started event for stream {}", participant.getPublisherStreamId()); globalStartLatch.countDown(); }