openvidu-server: concurrent double initialization of single stream recording fix

pull/494/head
pabloFuente 2020-05-23 22:45:42 +02:00
parent 3374e511ee
commit f5171c9306
2 changed files with 92 additions and 57 deletions

View File

@ -17,6 +17,9 @@
package io.openvidu.server.core;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import com.google.gson.JsonObject;
import io.openvidu.server.kurento.endpoint.EndpointType;
@ -41,6 +44,12 @@ public class Participant {
private final String METADATA_SEPARATOR = "%/%";
/**
* This lock protects the initialization of a RecorderEndpoint when INDIVIDUAL
* recording
*/
public Lock singleRecordingLock = new ReentrantLock();
public Participant(String finalUserId, String participantPrivatetId, String participantPublicId, String sessionId,
Token token, String clientMetadata, GeoLocation location, String platform, EndpointType endpointType,
Long createdAt) {

View File

@ -208,69 +208,95 @@ public class SingleStreamRecordingService extends RecordingService {
public void startRecorderEndpointForPublisherEndpoint(final Session session, String recordingId,
MediaProfileSpecType profile, final Participant participant, CountDownLatch globalStartLatch) {
log.info("Starting single stream recorder for stream {} in session {}", participant.getPublisherStreamId(),
session.getSessionId());
if (recordingId == null) {
// Stream is being recorded because is a new publisher in an ongoing recorded
// session. If recordingId is defined is because Stream is being recorded from
// "startRecording" method
Recording recording = this.recordingManager.sessionsRecordings.get(session.getSessionId());
if (recording == null) {
recording = this.recordingManager.sessionsRecordingsStarting.get(session.getSessionId());
if (recording == null) {
log.error(
"Cannot start single stream recorder for stream {} in session {}. The recording {} cannot be found",
participant.getPublisherStreamId(), session.getSessionId(), recordingId);
return;
try {
if (participant.singleRecordingLock.tryLock(15, TimeUnit.SECONDS)) {
try {
if (this.activeRecorders.get(session.getSessionId())
.containsKey(participant.getPublisherStreamId())) {
log.warn("Concurrent initialization of RecorderEndpoint for stream {} of session {}. Returning",
participant.getPublisherStreamId(), session.getSessionId());
return;
}
if (recordingId == null) {
// Stream is being recorded because is a new publisher in an ongoing recorded
// session. If recordingId is defined is because Stream is being recorded from
// "startRecording" method
Recording recording = this.recordingManager.sessionsRecordings.get(session.getSessionId());
if (recording == null) {
recording = this.recordingManager.sessionsRecordingsStarting.get(session.getSessionId());
if (recording == null) {
log.error(
"Cannot start single stream recorder for stream {} in session {}. The recording {} cannot be found",
participant.getPublisherStreamId(), session.getSessionId(), recordingId);
return;
}
}
recordingId = recording.getId();
try {
profile = generateMediaProfile(recording.getRecordingProperties(), participant);
} catch (OpenViduException e) {
log.error("Cannot start single stream recorder for stream {} in session {}: {}",
participant.getPublisherStreamId(), session.getSessionId(), e.getMessage());
return;
}
}
KurentoParticipant kurentoParticipant = (KurentoParticipant) participant;
MediaPipeline pipeline = kurentoParticipant.getPublisher().getPipeline();
RecorderEndpoint recorder = new RecorderEndpoint.Builder(pipeline,
"file://" + this.openviduConfig.getOpenViduRecordingPath() + recordingId + "/"
+ participant.getPublisherStreamId() + ".webm").withMediaProfile(profile).build();
recorder.addRecordingListener(new EventListener<RecordingEvent>() {
@Override
public void onEvent(RecordingEvent event) {
activeRecorders.get(session.getSessionId()).get(participant.getPublisherStreamId())
.setStartTime(Long.parseLong(event.getTimestampMillis()));
log.info("Recording started event for stream {}", participant.getPublisherStreamId());
globalStartLatch.countDown();
}
});
recorder.addErrorListener(new EventListener<ErrorEvent>() {
@Override
public void onEvent(ErrorEvent event) {
log.error(event.getErrorCode() + " " + event.getDescription());
}
});
RecorderEndpointWrapper wrapper = new RecorderEndpointWrapper(recorder,
participant.getParticipantPublicId(), recordingId, participant.getPublisherStreamId(),
participant.getClientMetadata(), participant.getServerMetadata(),
kurentoParticipant.getPublisher().getMediaOptions().hasAudio(),
kurentoParticipant.getPublisher().getMediaOptions().hasVideo(),
kurentoParticipant.getPublisher().getMediaOptions().getTypeOfVideo());
activeRecorders.get(session.getSessionId()).put(participant.getPublisherStreamId(), wrapper);
storedRecorders.get(session.getSessionId()).put(participant.getPublisherStreamId(), wrapper);
connectAccordingToProfile(kurentoParticipant.getPublisher(), recorder, profile);
wrapper.getRecorder().record();
} finally {
participant.singleRecordingLock.unlock();
}
} else {
log.error(
"Timeout waiting for individual recording lock to be available for participant {} of session {}",
participant.getParticipantPublicId(), session.getSessionId());
}
recordingId = recording.getId();
try {
profile = generateMediaProfile(recording.getRecordingProperties(), participant);
} catch (OpenViduException e) {
log.error("Cannot start single stream recorder for stream {} in session {}: {}",
participant.getPublisherStreamId(), session.getSessionId(), e.getMessage());
return;
}
} catch (InterruptedException e) {
log.error(
"InterruptedException waiting for individual recording lock to be available for participant {} of session {}",
participant.getParticipantPublicId(), session.getSessionId());
}
KurentoParticipant kurentoParticipant = (KurentoParticipant) participant;
MediaPipeline pipeline = kurentoParticipant.getPublisher().getPipeline();
RecorderEndpoint recorder = new RecorderEndpoint.Builder(pipeline,
"file://" + this.openviduConfig.getOpenViduRecordingPath() + recordingId + "/"
+ participant.getPublisherStreamId() + ".webm").withMediaProfile(profile).build();
recorder.addRecordingListener(new EventListener<RecordingEvent>() {
@Override
public void onEvent(RecordingEvent event) {
activeRecorders.get(session.getSessionId()).get(participant.getPublisherStreamId())
.setStartTime(Long.parseLong(event.getTimestampMillis()));
log.info("Recording started event for stream {}", participant.getPublisherStreamId());
globalStartLatch.countDown();
}
});
recorder.addErrorListener(new EventListener<ErrorEvent>() {
@Override
public void onEvent(ErrorEvent event) {
log.error(event.getErrorCode() + " " + event.getDescription());
}
});
RecorderEndpointWrapper wrapper = new RecorderEndpointWrapper(recorder, participant.getParticipantPublicId(),
recordingId, participant.getPublisherStreamId(), participant.getClientMetadata(),
participant.getServerMetadata(), kurentoParticipant.getPublisher().getMediaOptions().hasAudio(),
kurentoParticipant.getPublisher().getMediaOptions().hasVideo(),
kurentoParticipant.getPublisher().getMediaOptions().getTypeOfVideo());
activeRecorders.get(session.getSessionId()).put(participant.getPublisherStreamId(), wrapper);
storedRecorders.get(session.getSessionId()).put(participant.getPublisherStreamId(), wrapper);
connectAccordingToProfile(kurentoParticipant.getPublisher(), recorder, profile);
wrapper.getRecorder().record();
}
public void stopRecorderEndpointOfPublisherEndpoint(String sessionId, String streamId,