openvidu-server: fixed kmsDisconnectionTime for recordings

pull/375/head
pabloFuente 2019-06-12 18:29:41 +02:00
parent 2daf1c7f34
commit 5cb59193ee
8 changed files with 39 additions and 29 deletions

View File

@ -187,10 +187,10 @@ public class KurentoParticipant extends Participant {
return sdpResponse; return sdpResponse;
} }
public void unpublishMedia(EndReason reason) { public void unpublishMedia(EndReason reason, long kmsDisconnectionTime) {
log.info("PARTICIPANT {}: unpublishing media stream from room {}", this.getParticipantPublicId(), log.info("PARTICIPANT {}: unpublishing media stream from room {}", this.getParticipantPublicId(),
this.session.getSessionId()); this.session.getSessionId());
releasePublisherEndpoint(reason); releasePublisherEndpoint(reason, kmsDisconnectionTime);
this.publisher = new PublisherEndpoint(webParticipant, this, this.getParticipantPublicId(), this.getPipeline(), this.publisher = new PublisherEndpoint(webParticipant, this, this.getParticipantPublicId(), this.getPipeline(),
this.openviduConfig); this.openviduConfig);
log.info("PARTICIPANT {}: released publisher endpoint and left it initialized (ready for future streaming)", log.info("PARTICIPANT {}: released publisher endpoint and left it initialized (ready for future streaming)",
@ -298,7 +298,7 @@ public class KurentoParticipant extends Participant {
} }
} }
public void close(EndReason reason, boolean definitelyClosed) { public void close(EndReason reason, boolean definitelyClosed, long kmsDisconnectionTime) {
log.debug("PARTICIPANT {}: Closing user", this.getParticipantPublicId()); log.debug("PARTICIPANT {}: Closing user", this.getParticipantPublicId());
if (isClosed()) { if (isClosed()) {
log.warn("PARTICIPANT {}: Already closed", this.getParticipantPublicId()); log.warn("PARTICIPANT {}: Already closed", this.getParticipantPublicId());
@ -319,7 +319,7 @@ public class KurentoParticipant extends Participant {
} }
} }
this.subscribers.clear(); this.subscribers.clear();
releasePublisherEndpoint(reason); releasePublisherEndpoint(reason, kmsDisconnectionTime);
} }
/** /**
@ -364,7 +364,7 @@ public class KurentoParticipant extends Participant {
session.sendMediaError(this.getParticipantPrivateId(), desc); session.sendMediaError(this.getParticipantPrivateId(), desc);
} }
private void releasePublisherEndpoint(EndReason reason) { private void releasePublisherEndpoint(EndReason reason, long kmsDisconnectionTime) {
if (publisher != null && publisher.getEndpoint() != null) { if (publisher != null && publisher.getEndpoint() != null) {
// Remove streamId from publisher's map // Remove streamId from publisher's map
@ -372,7 +372,7 @@ public class KurentoParticipant extends Participant {
if (this.openviduConfig.isRecordingModuleEnabled() if (this.openviduConfig.isRecordingModuleEnabled()
&& this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) { && this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) {
this.recordingManager.stopOneIndividualStreamRecording(session, this.getPublisherStreamId()); this.recordingManager.stopOneIndividualStreamRecording(session, this.getPublisherStreamId(), kmsDisconnectionTime);
} }
publisher.unregisterErrorListeners(); publisher.unregisterErrorListeners();

View File

@ -137,7 +137,7 @@ public class KurentoSession extends Session {
log.info("PARTICIPANT {}: Leaving session {}", participant.getParticipantPublicId(), this.sessionId); log.info("PARTICIPANT {}: Leaving session {}", participant.getParticipantPublicId(), this.sessionId);
this.removeParticipant(participant, reason); this.removeParticipant(participant, reason);
participant.close(reason, true); participant.close(reason, true, 0);
} }
@Override @Override
@ -146,7 +146,7 @@ public class KurentoSession extends Session {
for (Participant participant : participants.values()) { for (Participant participant : participants.values()) {
((KurentoParticipant) participant).releaseAllFilters(); ((KurentoParticipant) participant).releaseAllFilters();
((KurentoParticipant) participant).close(reason, true); ((KurentoParticipant) participant).close(reason, true, 0);
} }
participants.clear(); participants.clear();
@ -288,13 +288,13 @@ public class KurentoSession extends Session {
return this.publishedStreamIds.get(streamId); return this.publishedStreamIds.get(streamId);
} }
public void restartStatusInKurento() { public void restartStatusInKurento(long kmsDisconnectionTime) {
log.info("Reseting process: reseting remote media objects for active session {}", this.sessionId); log.info("Reseting process: reseting remote media objects for active session {}", this.sessionId);
// Stop recording if session is being recorded // Stop recording if session is being recorded
if (recordingManager.sessionIsBeingRecorded(this.sessionId)) { if (recordingManager.sessionIsBeingRecorded(this.sessionId)) {
this.recordingManager.forceStopRecording(this, EndReason.mediaServerDisconnect); this.recordingManager.forceStopRecording(this, EndReason.mediaServerDisconnect, kmsDisconnectionTime);
} }
// Close all MediaEndpoints of participants // Close all MediaEndpoints of participants
@ -302,7 +302,7 @@ public class KurentoSession extends Session {
KurentoParticipant kParticipant = (KurentoParticipant) p; KurentoParticipant kParticipant = (KurentoParticipant) p;
final boolean wasStreaming = kParticipant.isStreaming(); final boolean wasStreaming = kParticipant.isStreaming();
kParticipant.releaseAllFilters(); kParticipant.releaseAllFilters();
kParticipant.close(EndReason.mediaServerDisconnect, false); kParticipant.close(EndReason.mediaServerDisconnect, false, kmsDisconnectionTime);
if (wasStreaming) { if (wasStreaming) {
kurentoSessionHandler.onUnpublishMedia(kParticipant, this.getParticipants(), null, null, null, kurentoSessionHandler.onUnpublishMedia(kParticipant, this.getParticipants(), null, null, null,
EndReason.mediaServerDisconnect); EndReason.mediaServerDisconnect);

View File

@ -339,7 +339,7 @@ public class KurentoSessionManager extends SessionManager {
throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE,
"Participant '" + participant.getParticipantPublicId() + "' is not streaming media"); "Participant '" + participant.getParticipantPublicId() + "' is not streaming media");
} }
kParticipant.unpublishMedia(reason); kParticipant.unpublishMedia(reason, 0);
session.cancelPublisher(participant, reason); session.cancelPublisher(participant, reason);
Set<Participant> participants = session.getParticipants(); Set<Participant> participants = session.getParticipants();

View File

@ -41,9 +41,12 @@ public class FixedOneKmsManager extends KmsManager {
// Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints) // Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints)
log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", kmsWsUri); log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", kmsWsUri);
log.warn("Updating all webrtc endpoints for active sessions"); log.warn("Updating all webrtc endpoints for active sessions");
final Kms kms = ((KurentoSessionManager) sessionManager).getKmsManager().kmss.get(kmsWsUri);
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
sessionManager.getSessions().forEach(s -> { sessionManager.getSessions().forEach(s -> {
((KurentoSession) s).restartStatusInKurento(); ((KurentoSession) s).restartStatusInKurento(timeOfKurentoDisconnection);
}); });
kms.setTimeOfKurentoClientDisconnection(0);
} else { } else {
// Same KMS. We may infer that openvidu-server/KMS connection has been lost, but // Same KMS. We may infer that openvidu-server/KMS connection has been lost, but
// not the clients/KMS connections // not the clients/KMS connections

View File

@ -117,10 +117,6 @@ public abstract class KmsManager {
return this.kmss.get(kms.getUri()).isKurentoClientConnected(); return this.kmss.get(kms.getUri()).isKurentoClientConnected();
} }
public long getTimeOfKurentoClientDisconnection(Kms kms) {
return this.kmss.get(kms.getUri()).getTimeOfKurentoClientDisconnection();
}
public void setKurentoClientConnectedToKms(String kmsUri, boolean isConnected) { public void setKurentoClientConnectedToKms(String kmsUri, boolean isConnected) {
this.kmss.get(kmsUri).setKurentoClientConnected(isConnected); this.kmss.get(kmsUri).setKurentoClientConnected(isConnected);
} }

View File

@ -94,7 +94,15 @@ public class ComposedRecordingService extends RecordingService {
if (recording.hasVideo()) { if (recording.hasVideo()) {
return this.stopRecordingWithVideo(session, recording, reason); return this.stopRecordingWithVideo(session, recording, reason);
} else { } else {
return this.stopRecordingAudioOnly(session, recording, reason); return this.stopRecordingAudioOnly(session, recording, reason, 0);
}
}
public Recording stopRecording(Session session, Recording recording, EndReason reason, long kmsDisconnectionTime) {
if (recording.hasVideo()) {
return this.stopRecordingWithVideo(session, recording, reason);
} else {
return this.stopRecordingAudioOnly(session, recording, reason, kmsDisconnectionTime);
} }
} }
@ -323,7 +331,8 @@ public class ComposedRecordingService extends RecordingService {
return recording; return recording;
} }
private Recording stopRecordingAudioOnly(Session session, Recording recording, EndReason reason) { private Recording stopRecordingAudioOnly(Session session, Recording recording, EndReason reason,
long kmsDisconnectionTime) {
log.info("Stopping composed (audio-only) recording {} of session {}. Reason: {}", recording.getId(), log.info("Stopping composed (audio-only) recording {} of session {}. Reason: {}", recording.getId(),
recording.getSessionId(), reason); recording.getSessionId(), reason);
@ -341,7 +350,7 @@ public class ComposedRecordingService extends RecordingService {
CompositeWrapper compositeWrapper = this.composites.remove(sessionId); CompositeWrapper compositeWrapper = this.composites.remove(sessionId);
final CountDownLatch stoppedCountDown = new CountDownLatch(1); final CountDownLatch stoppedCountDown = new CountDownLatch(1);
compositeWrapper.stopCompositeRecording(stoppedCountDown, ((KurentoSession)session).getKms().getTimeOfKurentoClientDisconnection()); compositeWrapper.stopCompositeRecording(stoppedCountDown, kmsDisconnectionTime);
try { try {
if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) { if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) {

View File

@ -213,12 +213,12 @@ public class RecordingManager {
return recording; return recording;
} }
public Recording forceStopRecording(Session session, EndReason reason) { public Recording forceStopRecording(Session session, EndReason reason, long kmsDisconnectionTime) {
Recording recording; Recording recording;
recording = this.sessionsRecordings.get(session.getSessionId()); recording = this.sessionsRecordings.get(session.getSessionId());
switch (recording.getOutputMode()) { switch (recording.getOutputMode()) {
case COMPOSED: case COMPOSED:
recording = this.composedRecordingService.stopRecording(session, recording, reason); recording = this.composedRecordingService.stopRecording(session, recording, reason, kmsDisconnectionTime);
if (recording.hasVideo()) { if (recording.hasVideo()) {
// Evict the recorder participant if composed recording with video // Evict the recorder participant if composed recording with video
this.sessionManager.evictParticipant( this.sessionManager.evictParticipant(
@ -227,7 +227,7 @@ public class RecordingManager {
} }
break; break;
case INDIVIDUAL: case INDIVIDUAL:
recording = this.singleStreamRecordingService.stopRecording(session, recording, reason); recording = this.singleStreamRecordingService.stopRecording(session, recording, reason, kmsDisconnectionTime);
break; break;
} }
this.abortAutomaticRecordingStopThread(session); this.abortAutomaticRecordingStopThread(session);
@ -257,7 +257,7 @@ public class RecordingManager {
} }
} }
public void stopOneIndividualStreamRecording(KurentoSession session, String streamId) { public void stopOneIndividualStreamRecording(KurentoSession session, String streamId, long kmsDisconnectionTime) {
Recording recording = this.sessionsRecordings.get(session.getSessionId()); Recording recording = this.sessionsRecordings.get(session.getSessionId());
if (recording == null) { if (recording == null) {
log.error("Cannot stop recording of existing stream {}. Session {} is not being recorded", streamId, log.error("Cannot stop recording of existing stream {}. Session {} is not being recorded", streamId,
@ -269,7 +269,7 @@ public class RecordingManager {
streamId); streamId);
final CountDownLatch stoppedCountDown = new CountDownLatch(1); final CountDownLatch stoppedCountDown = new CountDownLatch(1);
this.singleStreamRecordingService.stopRecorderEndpointOfPublisherEndpoint(session.getSessionId(), streamId, this.singleStreamRecordingService.stopRecorderEndpointOfPublisherEndpoint(session.getSessionId(), streamId,
stoppedCountDown, session.getKms().getTimeOfKurentoClientDisconnection()); stoppedCountDown, kmsDisconnectionTime);
try { try {
if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) { if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) {
log.error("Error waiting for recorder endpoint of stream {} to stop in session {}", streamId, log.error("Error waiting for recorder endpoint of stream {} to stop in session {}", streamId,

View File

@ -56,7 +56,6 @@ import io.openvidu.server.core.EndReason;
import io.openvidu.server.core.Participant; import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session; import io.openvidu.server.core.Session;
import io.openvidu.server.kurento.core.KurentoParticipant; import io.openvidu.server.kurento.core.KurentoParticipant;
import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.kurento.endpoint.PublisherEndpoint; import io.openvidu.server.kurento.endpoint.PublisherEndpoint;
import io.openvidu.server.recording.RecorderEndpointWrapper; import io.openvidu.server.recording.RecorderEndpointWrapper;
import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.Recording;
@ -127,6 +126,10 @@ public class SingleStreamRecordingService extends RecordingService {
@Override @Override
public Recording stopRecording(Session session, Recording recording, EndReason reason) { public Recording stopRecording(Session session, Recording recording, EndReason reason) {
return this.stopRecording(session, recording, reason, 0);
}
public Recording stopRecording(Session session, Recording recording, EndReason reason, long kmsDisconnectionTime) {
log.info("Stopping individual ({}) recording {} of session {}. Reason: {}", log.info("Stopping individual ({}) recording {} of session {}. Reason: {}",
recording.hasVideo() ? (recording.hasAudio() ? "video+audio" : "video-only") : "audioOnly", recording.hasVideo() ? (recording.hasAudio() ? "video+audio" : "video-only") : "audioOnly",
recording.getId(), recording.getSessionId(), reason); recording.getId(), recording.getSessionId(), reason);
@ -134,11 +137,9 @@ public class SingleStreamRecordingService extends RecordingService {
final int numberOfActiveRecorders = recorders.get(recording.getSessionId()).size(); final int numberOfActiveRecorders = recorders.get(recording.getSessionId()).size();
final CountDownLatch stoppedCountDown = new CountDownLatch(numberOfActiveRecorders); final CountDownLatch stoppedCountDown = new CountDownLatch(numberOfActiveRecorders);
final long timeOfKurentoClientDisconnection = ((KurentoSession) session).getKms()
.getTimeOfKurentoClientDisconnection();
for (RecorderEndpointWrapper wrapper : recorders.get(recording.getSessionId()).values()) { for (RecorderEndpointWrapper wrapper : recorders.get(recording.getSessionId()).values()) {
this.stopRecorderEndpointOfPublisherEndpoint(recording.getSessionId(), wrapper.getStreamId(), this.stopRecorderEndpointOfPublisherEndpoint(recording.getSessionId(), wrapper.getStreamId(),
stoppedCountDown, timeOfKurentoClientDisconnection); stoppedCountDown, kmsDisconnectionTime);
} }
try { try {
if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) { if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) {
@ -237,6 +238,7 @@ public class SingleStreamRecordingService extends RecordingService {
finalWrapper.getRecorder().stop(); finalWrapper.getRecorder().stop();
} else { } else {
if (kmsDisconnectionTime != 0) { if (kmsDisconnectionTime != 0) {
// Stopping recorder endpoint because of a KMS disconnection
finalWrapper.setEndTime(kmsDisconnectionTime); finalWrapper.setEndTime(kmsDisconnectionTime);
generateIndividualMetadataFile(finalWrapper); generateIndividualMetadataFile(finalWrapper);
log.warn("Forcing individual recording stop after KMS restart for stream {} in session {}", streamId, log.warn("Forcing individual recording stop after KMS restart for stream {} in session {}", streamId,