openvidu-server: recording concurrent protection on first user publishing

pull/494/head
pabloFuente 2020-05-23 14:51:34 +02:00
parent f398c80d59
commit 3374e511ee
6 changed files with 91 additions and 41 deletions

View File

@ -78,6 +78,12 @@ public class Session implements SessionInterface {
*/
public Lock joinLeaveLock = new ReentrantLock();
/**
* This lock protects initialization of ALWAYS recordings upon first user
* publishing
*/
public Lock recordingLock = new ReentrantLock();
public final AtomicBoolean recordingManuallyStopped = new AtomicBoolean(false);
public Session(Session previousSession) {

View File

@ -397,31 +397,52 @@ public class KurentoSessionManager extends SessionManager {
if (this.openviduConfig.isRecordingModuleEnabled()
&& MediaMode.ROUTED.equals(kSession.getSessionProperties().mediaMode())
&& kSession.getActivePublishers() == 0) {
if (RecordingMode.ALWAYS.equals(kSession.getSessionProperties().recordingMode())
&& !recordingManager.sessionIsBeingRecorded(kSession.getSessionId())
&& !kSession.recordingManuallyStopped.get()) {
// Start automatic recording for sessions configured with RecordingMode.ALWAYS
new Thread(() -> {
recordingManager.startRecording(kSession,
new RecordingProperties.Builder().name("")
.outputMode(kSession.getSessionProperties().defaultOutputMode())
.recordingLayout(kSession.getSessionProperties().defaultRecordingLayout())
.customLayout(kSession.getSessionProperties().defaultCustomLayout()).build());
}).start();
} else if (RecordingMode.MANUAL.equals(kSession.getSessionProperties().recordingMode())
&& recordingManager.sessionIsBeingRecorded(kSession.getSessionId())) {
// Abort automatic recording stop (user published before timeout)
log.info("Participant {} published before timeout finished. Aborting automatic recording stop",
participant.getParticipantPublicId());
boolean stopAborted = recordingManager.abortAutomaticRecordingStopThread(kSession,
EndReason.automaticStop);
if (stopAborted) {
log.info("Automatic recording stopped successfully aborted");
try {
if (kSession.recordingLock.tryLock(15, TimeUnit.SECONDS)) {
try {
if (RecordingMode.ALWAYS.equals(kSession.getSessionProperties().recordingMode())
&& !recordingManager.sessionIsBeingRecorded(kSession.getSessionId())
&& !kSession.recordingManuallyStopped.get()) {
// Start automatic recording for sessions configured with RecordingMode.ALWAYS
new Thread(() -> {
recordingManager.startRecording(kSession, new RecordingProperties.Builder().name("")
.outputMode(kSession.getSessionProperties().defaultOutputMode())
.recordingLayout(kSession.getSessionProperties().defaultRecordingLayout())
.customLayout(kSession.getSessionProperties().defaultCustomLayout()).build());
}).start();
} else if (RecordingMode.MANUAL.equals(kSession.getSessionProperties().recordingMode())
&& recordingManager.sessionIsBeingRecorded(kSession.getSessionId())) {
// Abort automatic recording stop (user published before timeout)
log.info(
"Participant {} published before timeout finished. Aborting automatic recording stop",
participant.getParticipantPublicId());
boolean stopAborted = recordingManager.abortAutomaticRecordingStopThread(kSession,
EndReason.automaticStop);
if (stopAborted) {
log.info("Automatic recording stopped successfully aborted");
} else {
log.info(
"Automatic recording stopped couldn't be aborted. Recording of session {} has stopped",
kSession.getSessionId());
}
}
} finally {
kSession.recordingLock.unlock();
}
} else {
log.info("Automatic recording stopped couldn't be aborted. Recording of session {} has stopped",
kSession.getSessionId());
log.error(
"Timeout waiting for recording Session lock to be available for participant {} of session {} in publishVideo",
participant.getParticipantPublicId(), kSession.getSessionId());
}
} catch (InterruptedException e) {
log.error(
"InterruptedException waiting for recording Session lock to be available for participant {} of session {} in publishVideo",
participant.getParticipantPublicId(), kSession.getSessionId());
}
}
kSession.newPublisher(participant);

View File

@ -85,7 +85,7 @@ public class ComposedRecordingService extends RecordingService {
// Instantiate and store recording object
Recording recording = new Recording(session.getSessionId(), recordingId, properties);
this.recordingManager.startingRecordings.put(recording.getId(), recording);
this.recordingManager.recordingToStarting(recording);
if (properties.hasVideo()) {
// Docker container used

View File

@ -107,6 +107,7 @@ public class RecordingManager {
protected Map<String, Recording> startingRecordings = new ConcurrentHashMap<>();
protected Map<String, Recording> startedRecordings = new ConcurrentHashMap<>();
protected Map<String, Recording> sessionsRecordings = new ConcurrentHashMap<>();
protected Map<String, Recording> sessionsRecordingsStarting = new ConcurrentHashMap<>();
private final Map<String, ScheduledFuture<?>> automaticRecordingStopThreads = new ConcurrentHashMap<>();
private JsonUtils jsonUtils = new JsonUtils();
@ -239,10 +240,10 @@ public class RecordingManager {
recording = this.singleStreamRecordingService.startRecording(session, properties);
break;
}
} catch (OpenViduException e) {
} catch (Exception e) {
throw e;
}
this.updateRecordingManagerCollections(session, recording);
this.recordingFromStartingToStarted(recording);
this.cdr.recordRecordingStarted(recording);
this.cdr.recordRecordingStatusChanged(recording, null, recording.getCreatedAt(),
@ -313,8 +314,12 @@ public class RecordingManager {
Participant participant) {
Recording recording = this.sessionsRecordings.get(session.getSessionId());
if (recording == null) {
log.error("Cannot start recording of new stream {}. Session {} is not being recorded",
participant.getPublisherStreamId(), session.getSessionId());
recording = this.sessionsRecordingsStarting.get(session.getSessionId());
if (recording == null) {
log.error("Cannot start recording of new stream {}. Session {} is not being recorded",
participant.getPublisherStreamId(), session.getSessionId());
return;
}
}
if (io.openvidu.java.client.Recording.OutputMode.INDIVIDUAL.equals(recording.getOutputMode())) {
// Start new RecorderEndpoint for this stream
@ -363,7 +368,8 @@ public class RecordingManager {
}
public boolean sessionIsBeingRecorded(String sessionId) {
return (this.sessionsRecordings.get(sessionId) != null);
return (this.sessionsRecordings.get(sessionId) != null
|| this.sessionsRecordingsStarting.get(sessionId) != null);
}
public Recording getStartedRecording(String recordingId) {
@ -758,13 +764,25 @@ public class RecordingManager {
}
/**
* Changes recording from starting to started, updates global recording
* collections and sends RPC response to clients
* New starting recording
*/
private void updateRecordingManagerCollections(Session session, Recording recording) {
this.sessionHandler.setRecordingStarted(session.getSessionId(), recording);
this.sessionsRecordings.put(session.getSessionId(), recording);
public void recordingToStarting(Recording recording) throws RuntimeException {
if ((startingRecordings.putIfAbsent(recording.getId(), recording) != null)
|| (sessionsRecordingsStarting.putIfAbsent(recording.getSessionId(), recording) != null)) {
log.error("Concurrent session recording initialization. Aborting this thread");
throw new RuntimeException("Concurrent initialization of recording " + recording.getId());
}
}
/**
* Changes recording from starting to started, updating global recording
* collection
*/
private void recordingFromStartingToStarted(Recording recording) {
this.sessionHandler.setRecordingStarted(recording.getSessionId(), recording);
this.sessionsRecordings.put(recording.getSessionId(), recording);
this.startingRecordings.remove(recording.getId());
this.sessionsRecordingsStarting.remove(recording.getSessionId());
this.startedRecordings.put(recording.getId(), recording);
}

View File

@ -175,6 +175,7 @@ public abstract class RecordingService {
log.error("Recording start failed for session {}: {}", session.getSessionId(), errorMessage);
recording.setStatus(io.openvidu.java.client.Recording.Status.failed);
this.recordingManager.startingRecordings.remove(recording.getId());
this.recordingManager.sessionsRecordingsStarting.remove(session.getSessionId());
this.stopRecording(session, recording, null);
return new OpenViduException(Code.RECORDING_START_ERROR_CODE, errorMessage);
}

View File

@ -93,7 +93,7 @@ public class SingleStreamRecordingService extends RecordingService {
recordingId, session.getSessionId());
Recording recording = new Recording(session.getSessionId(), recordingId, properties);
this.recordingManager.startingRecordings.put(recording.getId(), recording);
this.recordingManager.recordingToStarting(recording);
activeRecorders.put(session.getSessionId(), new ConcurrentHashMap<String, RecorderEndpointWrapper>());
storedRecorders.put(session.getSessionId(), new ConcurrentHashMap<String, RecorderEndpointWrapper>());
@ -216,6 +216,15 @@ public class SingleStreamRecordingService extends RecordingService {
// session. If recordingId is defined is because Stream is being recorded from
// "startRecording" method
Recording recording = this.recordingManager.sessionsRecordings.get(session.getSessionId());
if (recording == null) {
recording = this.recordingManager.sessionsRecordingsStarting.get(session.getSessionId());
if (recording == null) {
log.error(
"Cannot start single stream recorder for stream {} in session {}. The recording {} cannot be found",
participant.getPublisherStreamId(), session.getSessionId(), recordingId);
return;
}
}
recordingId = recording.getId();
try {
@ -237,13 +246,8 @@ public class SingleStreamRecordingService extends RecordingService {
recorder.addRecordingListener(new EventListener<RecordingEvent>() {
@Override
public void onEvent(RecordingEvent event) {
activeRecorders
.get(session
.getSessionId())
.get(participant
.getPublisherStreamId())
.setStartTime(Long.parseLong(event
.getTimestampMillis()));
activeRecorders.get(session.getSessionId()).get(participant.getPublisherStreamId())
.setStartTime(Long.parseLong(event.getTimestampMillis()));
log.info("Recording started event for stream {}", participant.getPublisherStreamId());
globalStartLatch.countDown();
}