openvidu-server: refactor kmsDisconnectionTime parameter

pull/550/head
pabloFuente 2020-10-05 13:58:13 +02:00
parent baa7e37c2c
commit 0ae577ce6e
7 changed files with 56 additions and 69 deletions

View File

@ -194,7 +194,7 @@ public class KurentoParticipant extends Participant {
return sdpResponse; return sdpResponse;
} }
public void unpublishMedia(EndReason reason, long kmsDisconnectionTime) { public void unpublishMedia(EndReason reason, Long kmsDisconnectionTime) {
log.info("PARTICIPANT {}: unpublishing media stream from room {}", this.getParticipantPublicId(), log.info("PARTICIPANT {}: unpublishing media stream from room {}", this.getParticipantPublicId(),
this.session.getSessionId()); this.session.getSessionId());
final MediaOptions mediaOptions = this.getPublisher().getMediaOptions(); final MediaOptions mediaOptions = this.getPublisher().getMediaOptions();
@ -347,7 +347,7 @@ public class KurentoParticipant extends Participant {
} }
} }
public void close(EndReason reason, boolean definitelyClosed, long kmsDisconnectionTime) { public void close(EndReason reason, boolean definitelyClosed, Long kmsDisconnectionTime) {
log.debug("PARTICIPANT {}: Closing user", this.getParticipantPublicId()); log.debug("PARTICIPANT {}: Closing user", this.getParticipantPublicId());
if (isClosed()) { if (isClosed()) {
log.warn("PARTICIPANT {}: Already closed", this.getParticipantPublicId()); log.warn("PARTICIPANT {}: Already closed", this.getParticipantPublicId());
@ -420,7 +420,7 @@ public class KurentoParticipant extends Participant {
session.sendMediaError(this.getParticipantPrivateId(), desc); session.sendMediaError(this.getParticipantPrivateId(), desc);
} }
private void releasePublisherEndpoint(EndReason reason, long kmsDisconnectionTime) { private void releasePublisherEndpoint(EndReason reason, Long kmsDisconnectionTime) {
if (publisher != null && publisher.getEndpoint() != null) { if (publisher != null && publisher.getEndpoint() != null) {
final ReadWriteLock closingLock = publisher.closingLock; final ReadWriteLock closingLock = publisher.closingLock;
try { try {
@ -443,7 +443,7 @@ public class KurentoParticipant extends Participant {
} }
} }
private void releasePublisherEndpointAux(EndReason reason, long kmsDisconnectionTime) { private void releasePublisherEndpointAux(EndReason reason, Long kmsDisconnectionTime) {
// Remove streamId from publisher's map // Remove streamId from publisher's map
this.session.publishedStreamIds.remove(this.getPublisherStreamId()); this.session.publishedStreamIds.remove(this.getPublisherStreamId());

View File

@ -122,7 +122,7 @@ public class KurentoSession extends Session {
log.info("PARTICIPANT {}: Leaving session {}", participant.getParticipantPublicId(), this.sessionId); log.info("PARTICIPANT {}: Leaving session {}", participant.getParticipantPublicId(), this.sessionId);
this.removeParticipant(participant, reason); this.removeParticipant(participant, reason);
participant.close(reason, true, 0); participant.close(reason, true, null);
} }
@Override @Override
@ -133,7 +133,7 @@ public class KurentoSession extends Session {
for (Participant participant : participants.values()) { for (Participant participant : participants.values()) {
((KurentoParticipant) participant).releaseAllFilters(); ((KurentoParticipant) participant).releaseAllFilters();
((KurentoParticipant) participant).close(reason, true, 0); ((KurentoParticipant) participant).close(reason, true, null);
} }
participants.clear(); participants.clear();
@ -280,7 +280,7 @@ public class KurentoSession extends Session {
return this.publishedStreamIds.get(streamId); return this.publishedStreamIds.get(streamId);
} }
public void restartStatusInKurento(long kmsDisconnectionTime) { public void restartStatusInKurento(Long kmsDisconnectionTime) {
log.info("Resetting process: resetting remote media objects for active session {}", this.sessionId); log.info("Resetting process: resetting remote media objects for active session {}", this.sessionId);

View File

@ -495,7 +495,7 @@ public class KurentoSessionManager extends SessionManager {
throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE,
"Participant '" + participant.getParticipantPublicId() + "' is not streaming media"); "Participant '" + participant.getParticipantPublicId() + "' is not streaming media");
} }
kParticipant.unpublishMedia(reason, 0); kParticipant.unpublishMedia(reason, null);
session.cancelPublisher(participant, reason); session.cancelPublisher(participant, reason);
Set<Participant> participants = session.getParticipants(); Set<Participant> participants = session.getParticipants();

View File

@ -21,41 +21,34 @@ import org.kurento.client.RecorderEndpoint;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import io.openvidu.server.kurento.core.KurentoParticipant;
public class RecorderEndpointWrapper { public class RecorderEndpointWrapper {
private RecorderEndpoint recorder; private RecorderEndpoint recorder;
private String connectionId; private KurentoParticipant kParticipant;
private String recordingId; private String recordingId;
private String streamId;
private String clientData;
private String serverData;
private boolean hasAudio;
private boolean hasVideo;
private String typeOfVideo;
private long startTime; private long startTime;
private long endTime; private long endTime;
private long size; private long size;
public RecorderEndpointWrapper(RecorderEndpoint recorder, String connectionId, String recordingId, String streamId, public RecorderEndpointWrapper(RecorderEndpoint recorder, KurentoParticipant kParticipant, String recordingId) {
String clientData, String serverData, boolean hasAudio, boolean hasVideo, String typeOfVideo) {
this.recorder = recorder; this.recorder = recorder;
this.connectionId = connectionId; this.kParticipant = kParticipant;
this.recordingId = recordingId; this.recordingId = recordingId;
this.streamId = streamId;
this.clientData = clientData;
this.serverData = serverData;
this.hasAudio = hasAudio;
this.hasVideo = hasVideo;
this.typeOfVideo = typeOfVideo;
} }
public RecorderEndpoint getRecorder() { public RecorderEndpoint getRecorder() {
return recorder; return recorder;
} }
public KurentoParticipant getParticipant() {
return this.kParticipant;
}
public String getConnectionId() { public String getConnectionId() {
return connectionId; return kParticipant.getParticipantPublicId();
} }
public String getRecordingId() { public String getRecordingId() {
@ -63,15 +56,15 @@ public class RecorderEndpointWrapper {
} }
public String getStreamId() { public String getStreamId() {
return streamId; return kParticipant.getPublisherStreamId();
} }
public String getClientData() { public String getClientData() {
return clientData; return kParticipant.getClientMetadata();
} }
public String getServerData() { public String getServerData() {
return serverData; return kParticipant.getServerMetadata();
} }
public long getStartTime() { public long getStartTime() {
@ -99,31 +92,31 @@ public class RecorderEndpointWrapper {
} }
public boolean hasAudio() { public boolean hasAudio() {
return hasAudio; return kParticipant.getPublisher().getMediaOptions().hasAudio();
} }
public boolean hasVideo() { public boolean hasVideo() {
return hasVideo; return kParticipant.getPublisher().getMediaOptions().hasVideo();
} }
public String getTypeOfVideo() { public String getTypeOfVideo() {
return typeOfVideo; return kParticipant.getPublisher().getMediaOptions().getTypeOfVideo();
} }
public JsonObject toJson() { public JsonObject toJson() {
JsonObject json = new JsonObject(); JsonObject json = new JsonObject();
json.addProperty("connectionId", this.connectionId); json.addProperty("connectionId", this.getConnectionId());
json.addProperty("streamId", this.streamId); json.addProperty("streamId", this.getStreamId());
json.addProperty("clientData", this.clientData); json.addProperty("clientData", this.getClientData());
json.addProperty("serverData", this.serverData); json.addProperty("serverData", this.getServerData());
json.addProperty("startTime", this.startTime); json.addProperty("startTime", this.getStartTime());
json.addProperty("endTime", this.endTime); json.addProperty("endTime", this.getEndTime());
json.addProperty("duration", this.endTime - this.startTime); json.addProperty("duration", this.getEndTime() - this.getStartTime());
json.addProperty("size", this.size); json.addProperty("size", this.getSize());
json.addProperty("hasAudio", this.hasAudio); json.addProperty("hasAudio", this.hasAudio());
json.addProperty("hasVideo", this.hasVideo); json.addProperty("hasVideo", this.hasVideo());
if (this.hasVideo) { if (this.hasVideo()) {
json.addProperty("typeOfVideo", this.typeOfVideo); json.addProperty("typeOfVideo", this.getTypeOfVideo());
} }
return json; return json;
} }

View File

@ -109,11 +109,11 @@ public class ComposedRecordingService extends RecordingService {
if (recording.hasVideo()) { if (recording.hasVideo()) {
return this.stopRecordingWithVideo(session, recording, reason); return this.stopRecordingWithVideo(session, recording, reason);
} else { } else {
return this.stopRecordingAudioOnly(session, recording, reason, 0); return this.stopRecordingAudioOnly(session, recording, reason, null);
} }
} }
public Recording stopRecording(Session session, Recording recording, EndReason reason, long kmsDisconnectionTime) { public Recording stopRecording(Session session, Recording recording, EndReason reason, Long kmsDisconnectionTime) {
if (recording.hasVideo()) { if (recording.hasVideo()) {
return this.stopRecordingWithVideo(session, recording, reason); return this.stopRecordingWithVideo(session, recording, reason);
} else { } else {
@ -322,7 +322,7 @@ public class ComposedRecordingService extends RecordingService {
} }
private Recording stopRecordingAudioOnly(Session session, Recording recording, EndReason reason, private Recording stopRecordingAudioOnly(Session session, Recording recording, EndReason reason,
long kmsDisconnectionTime) { Long kmsDisconnectionTime) {
log.info("Stopping composed (audio-only) recording {} of session {}. Reason: {}", recording.getId(), log.info("Stopping composed (audio-only) recording {} of session {}. Reason: {}", recording.getId(),
recording.getSessionId(), reason); recording.getSessionId(), reason);

View File

@ -67,7 +67,6 @@ import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session; import io.openvidu.server.core.Session;
import io.openvidu.server.core.SessionEventsHandler; import io.openvidu.server.core.SessionEventsHandler;
import io.openvidu.server.core.SessionManager; import io.openvidu.server.core.SessionManager;
import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.kurento.kms.Kms; import io.openvidu.server.kurento.kms.Kms;
import io.openvidu.server.kurento.kms.KmsManager; import io.openvidu.server.kurento.kms.KmsManager;
import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.Recording;
@ -351,7 +350,7 @@ public class RecordingManager {
return recording; return recording;
} }
public Recording forceStopRecording(Session session, EndReason reason, long kmsDisconnectionTime) { public Recording forceStopRecording(Session session, EndReason reason, Long kmsDisconnectionTime) {
Recording recording; Recording recording;
recording = this.sessionsRecordings.get(session.getSessionId()); recording = this.sessionsRecordings.get(session.getSessionId());
switch (recording.getOutputMode()) { switch (recording.getOutputMode()) {
@ -409,8 +408,8 @@ public class RecordingManager {
return; return;
} }
this.singleStreamRecordingService.startRecorderEndpointForPublisherEndpoint(session, recording.getId(), this.singleStreamRecordingService.startRecorderEndpointForPublisherEndpoint(recording.getId(), profile,
profile, participant, startedCountDown); participant, startedCountDown);
} else if (RecordingUtils.IS_COMPOSED(recording.getOutputMode()) && !recording.hasVideo()) { } else if (RecordingUtils.IS_COMPOSED(recording.getOutputMode()) && !recording.hasVideo()) {
// Connect this stream to existing Composite recorder // Connect this stream to existing Composite recorder
log.info("Joining PublisherEndpoint to existing Composite in session {} for new stream of participant {}", log.info("Joining PublisherEndpoint to existing Composite in session {} for new stream of participant {}",
@ -419,7 +418,7 @@ public class RecordingManager {
} }
} }
public void stopOneIndividualStreamRecording(KurentoSession session, String streamId, long kmsDisconnectionTime) { public void stopOneIndividualStreamRecording(Session session, String streamId, Long kmsDisconnectionTime) {
Recording recording = this.sessionsRecordings.get(session.getSessionId()); Recording recording = this.sessionsRecordings.get(session.getSessionId());
if (recording == null) { if (recording == null) {
recording = this.sessionsRecordingsStarting.get(session.getSessionId()); recording = this.sessionsRecordingsStarting.get(session.getSessionId());

View File

@ -116,7 +116,7 @@ public class SingleStreamRecordingService extends RecordingService {
recordingStartedCountdown.countDown(); recordingStartedCountdown.countDown();
continue; continue;
} }
this.startRecorderEndpointForPublisherEndpoint(session, recording.getId(), profile, p, this.startRecorderEndpointForPublisherEndpoint(recording.getId(), profile, p,
recordingStartedCountdown); recordingStartedCountdown);
} }
} }
@ -141,10 +141,10 @@ public class SingleStreamRecordingService extends RecordingService {
@Override @Override
public Recording stopRecording(Session session, Recording recording, EndReason reason) { public Recording stopRecording(Session session, Recording recording, EndReason reason) {
return this.stopRecording(session, recording, reason, 0); return this.stopRecording(session, recording, reason, null);
} }
public Recording stopRecording(Session session, Recording recording, EndReason reason, long kmsDisconnectionTime) { public Recording stopRecording(Session session, Recording recording, EndReason reason, Long kmsDisconnectionTime) {
log.info("Stopping individual ({}) recording {} of session {}. Reason: {}", log.info("Stopping individual ({}) recording {} of session {}. Reason: {}",
recording.hasVideo() ? (recording.hasAudio() ? "video+audio" : "video-only") : "audioOnly", recording.hasVideo() ? (recording.hasAudio() ? "video+audio" : "video-only") : "audioOnly",
recording.getId(), recording.getSessionId(), reason); recording.getId(), recording.getSessionId(), reason);
@ -205,18 +205,18 @@ public class SingleStreamRecordingService extends RecordingService {
return finalRecordingArray[0]; return finalRecordingArray[0];
} }
public void startRecorderEndpointForPublisherEndpoint(final Session session, final String recordingId, public void startRecorderEndpointForPublisherEndpoint(final String recordingId, MediaProfileSpecType profile,
MediaProfileSpecType profile, final Participant participant, CountDownLatch globalStartLatch) { final Participant participant, CountDownLatch globalStartLatch) {
log.info("Starting single stream recorder for stream {} in session {}", participant.getPublisherStreamId(), log.info("Starting single stream recorder for stream {} in session {}", participant.getPublisherStreamId(),
session.getSessionId()); participant.getSessionId());
try { try {
if (participant.singleRecordingLock.tryLock(15, TimeUnit.SECONDS)) { if (participant.singleRecordingLock.tryLock(15, TimeUnit.SECONDS)) {
try { try {
if (this.activeRecorders.get(recordingId).containsKey(participant.getPublisherStreamId())) { if (this.activeRecorders.get(recordingId).containsKey(participant.getPublisherStreamId())) {
log.warn("Concurrent initialization of RecorderEndpoint for stream {} of session {}. Returning", log.warn("Concurrent initialization of RecorderEndpoint for stream {} of session {}. Returning",
participant.getPublisherStreamId(), session.getSessionId()); participant.getPublisherStreamId(), participant.getSessionId());
return; return;
} }
@ -244,13 +244,8 @@ public class SingleStreamRecordingService extends RecordingService {
} }
}); });
RecorderEndpointWrapper wrapper = new RecorderEndpointWrapper(recorder, RecorderEndpointWrapper wrapper = new RecorderEndpointWrapper(recorder, kurentoParticipant,
participant.getParticipantPublicId(), recordingId, participant.getPublisherStreamId(), recordingId);
participant.getClientMetadata(), participant.getServerMetadata(),
kurentoParticipant.getPublisher().getMediaOptions().hasAudio(),
kurentoParticipant.getPublisher().getMediaOptions().hasVideo(),
kurentoParticipant.getPublisher().getMediaOptions().getTypeOfVideo());
activeRecorders.get(recordingId).put(participant.getPublisherStreamId(), wrapper); activeRecorders.get(recordingId).put(participant.getPublisherStreamId(), wrapper);
storedRecorders.get(recordingId).put(participant.getPublisherStreamId(), wrapper); storedRecorders.get(recordingId).put(participant.getPublisherStreamId(), wrapper);
@ -262,13 +257,13 @@ public class SingleStreamRecordingService extends RecordingService {
} }
} else { } else {
log.error( log.error(
"Timeout waiting for individual recording lock to be available for participant {} of session {}", "Timeout waiting for individual recording lock to be available to start stream recording for participant {} of session {}",
participant.getParticipantPublicId(), session.getSessionId()); participant.getParticipantPublicId(), participant.getSessionId());
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error( log.error(
"InterruptedException waiting for individual recording lock to be available for participant {} of session {}", "InterruptedException waiting for individual recording lock to be available to start stream recording for participant {} of session {}",
participant.getParticipantPublicId(), session.getSessionId()); participant.getParticipantPublicId(), participant.getSessionId());
} }
} }