openvidu-server: refactor single stream recorder

pull/546/head
pabloFuente 2020-09-18 15:04:15 +02:00
parent 23e965f9f4
commit 70831114fe
3 changed files with 21 additions and 37 deletions

View File

@ -183,7 +183,7 @@ public class KurentoParticipant extends Participant {
if (this.openviduConfig.isRecordingModuleEnabled()
&& this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) {
this.recordingManager.startOneIndividualStreamRecording(session, null, null, this);
this.recordingManager.startOneIndividualStreamRecording(session, this);
}
if (!silent) {

View File

@ -370,8 +370,7 @@ public class RecordingManager {
return recording;
}
public void startOneIndividualStreamRecording(Session session, String recordingId, MediaProfileSpecType profile,
Participant participant) {
public void startOneIndividualStreamRecording(Session session, Participant participant) {
Recording recording = this.sessionsRecordings.get(session.getSessionId());
if (recording == null) {
recording = this.sessionsRecordingsStarting.get(session.getSessionId());
@ -386,13 +385,24 @@ public class RecordingManager {
log.info("Starting new RecorderEndpoint in session {} for new stream of participant {}",
session.getSessionId(), participant.getParticipantPublicId());
final CountDownLatch startedCountDown = new CountDownLatch(1);
this.singleStreamRecordingService.startRecorderEndpointForPublisherEndpoint(session, recordingId, profile,
MediaProfileSpecType profile = null;
try {
profile = this.singleStreamRecordingService.generateMediaProfile(recording.getRecordingProperties(),
participant);
} catch (OpenViduException e) {
log.error("Cannot start single stream recorder for stream {} in session {}: {}",
participant.getPublisherStreamId(), session.getSessionId(), e.getMessage());
return;
}
this.singleStreamRecordingService.startRecorderEndpointForPublisherEndpoint(session, recording.getId(), profile,
participant, startedCountDown);
} else if (RecordingUtils.IS_COMPOSED(recording.getOutputMode()) && !recording.hasVideo()) {
// Connect this stream to existing Composite recorder
log.info("Joining PublisherEndpoint to existing Composite in session {} for new stream of participant {}",
session.getSessionId(), participant.getParticipantPublicId());
this.composedRecordingService.joinPublisherEndpointToComposite(session, recordingId, participant);
this.composedRecordingService.joinPublisherEndpointToComposite(session, recording.getId(), participant);
}
}
@ -470,7 +480,8 @@ public class RecordingManager {
}
if (Status.stopped.equals(recording.getStatus())) {
// Recording is being downloaded from remote host or being uploaded
log.warn("Recording {} status is \"stopped\". Cancelling possible ongoing download process", recording.getId());
log.warn("Recording {} status is \"stopped\". Cancelling possible ongoing download process",
recording.getId());
this.recordingDownloader.cancelDownload(recording.getId());
}

View File

@ -116,7 +116,7 @@ public class SingleStreamRecordingService extends RecordingService {
recordingStartedCountdown.countDown();
continue;
}
this.startRecorderEndpointForPublisherEndpoint(session, recordingId, profile, p,
this.startRecorderEndpointForPublisherEndpoint(session, recording.getId(), profile, p,
recordingStartedCountdown);
}
}
@ -205,7 +205,7 @@ public class SingleStreamRecordingService extends RecordingService {
return finalRecordingArray[0];
}
public void startRecorderEndpointForPublisherEndpoint(final Session session, String recordingId,
public void startRecorderEndpointForPublisherEndpoint(final Session session, final String recordingId,
MediaProfileSpecType profile, final Participant participant, CountDownLatch globalStartLatch) {
log.info("Starting single stream recorder for stream {} in session {}", participant.getPublisherStreamId(),
@ -220,31 +220,6 @@ public class SingleStreamRecordingService extends RecordingService {
return;
}
if (recordingId == null) {
// Stream is being recorded because is a new publisher in an ongoing recorded
// session. If recordingId is defined is because Stream is being recorded from
// "startRecording" method
Recording recording = this.recordingManager.sessionsRecordings.get(session.getSessionId());
if (recording == null) {
recording = this.recordingManager.sessionsRecordingsStarting.get(session.getSessionId());
if (recording == null) {
log.error(
"Cannot start single stream recorder for stream {} in session {}. The recording {} cannot be found",
participant.getPublisherStreamId(), session.getSessionId(), recordingId);
return;
}
}
recordingId = recording.getId();
try {
profile = generateMediaProfile(recording.getRecordingProperties(), participant);
} catch (OpenViduException e) {
log.error("Cannot start single stream recorder for stream {} in session {}: {}",
participant.getPublisherStreamId(), session.getSessionId(), e.getMessage());
return;
}
}
KurentoParticipant kurentoParticipant = (KurentoParticipant) participant;
MediaPipeline pipeline = kurentoParticipant.getPublisher().getPipeline();
@ -252,12 +227,10 @@ public class SingleStreamRecordingService extends RecordingService {
"file://" + openviduConfig.getOpenViduRemoteRecordingPath() + recordingId + "/"
+ participant.getPublisherStreamId() + ".webm").withMediaProfile(profile).build();
final String finalRecordingId = recordingId;
recorder.addRecordingListener(new EventListener<RecordingEvent>() {
@Override
public void onEvent(RecordingEvent event) {
activeRecorders.get(finalRecordingId).get(participant.getPublisherStreamId())
activeRecorders.get(recordingId).get(participant.getPublisherStreamId())
.setStartTime(Long.parseLong(event.getTimestampMillis()));
log.info("Recording started event for stream {}", participant.getPublisherStreamId());
globalStartLatch.countDown();
@ -333,7 +306,7 @@ public class SingleStreamRecordingService extends RecordingService {
}
}
private MediaProfileSpecType generateMediaProfile(RecordingProperties properties, Participant participant)
MediaProfileSpecType generateMediaProfile(RecordingProperties properties, Participant participant)
throws OpenViduException {
KurentoParticipant kParticipant = (KurentoParticipant) participant;