diff --git a/openvidu-server/src/main/java/io/openvidu/server/cdr/CallDetailRecord.java b/openvidu-server/src/main/java/io/openvidu/server/cdr/CallDetailRecord.java index a3874589..fb3ab580 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/cdr/CallDetailRecord.java +++ b/openvidu-server/src/main/java/io/openvidu/server/cdr/CallDetailRecord.java @@ -67,10 +67,10 @@ import io.openvidu.server.recording.service.RecordingManager; * - resolution string * - recordingLayout: string * - size: number - * - webrtcConnectionDestroyed.reason: "unsubscribe", "unpublish", "disconnect", "networkDisconnect", "openviduServerStopped" + * - webrtcConnectionDestroyed.reason: "unsubscribe", "unpublish", "disconnect", "networkDisconnect", "mediaServerDisconnect", "openviduServerStopped" * - participantLeft.reason: "unsubscribe", "unpublish", "disconnect", "networkDisconnect", "openviduServerStopped" * - sessionDestroyed.reason: "lastParticipantLeft", "openviduServerStopped" - * - recordingStopped.reason: "recordingStoppedByServer", "lastParticipantLeft", "sessionClosedByServer", "automaticStop", "openviduServerStopped" + * - recordingStopped.reason: "recordingStoppedByServer", "lastParticipantLeft", "sessionClosedByServer", "automaticStop", "mediaServerDisconnect", "openviduServerStopped" * * [OPTIONAL_PROPERTIES]: * - receivingFrom: only if connection = "INBOUND" @@ -105,7 +105,9 @@ public class CallDetailRecord { public void recordSessionDestroyed(String sessionId, String reason) { CDREvent e = this.sessions.remove(sessionId); - this.log(new CDREventSession(e, RecordingManager.finalReason(reason))); + if (e != null) { + this.log(new CDREventSession(e, RecordingManager.finalReason(reason))); + } } public void recordParticipantJoined(Participant participant, String sessionId) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java index f2215342..da6d01fe 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java @@ -414,6 +414,7 @@ public abstract class SessionManager { throw new OpenViduException(Code.ROOM_NOT_FOUND_ERROR_CODE, "Session '" + sessionId + "' not found"); } if (session.isClosed()) { + this.closeSessionAndEmptyCollections(session, reason); throw new OpenViduException(Code.ROOM_CLOSED_ERROR_CODE, "Session '" + sessionId + "' already closed"); } Set participants = getParticipants(sessionId); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java index e6134563..b85dd921 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java @@ -17,8 +17,6 @@ package io.openvidu.server.kurento.core; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -29,7 +27,6 @@ import org.apache.commons.lang3.RandomStringUtils; import org.kurento.client.Continuation; import org.kurento.client.ErrorEvent; import org.kurento.client.Filter; -import org.kurento.client.GenericMediaElement; import org.kurento.client.IceCandidate; import org.kurento.client.IceComponentState; import org.kurento.client.MediaElement; @@ -46,12 +43,12 @@ import com.google.gson.JsonObject; import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.client.internal.ProtocolElements; +import io.openvidu.java.client.OpenViduRole; import io.openvidu.server.cdr.CallDetailRecord; import io.openvidu.server.config.InfoHandler; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.MediaOptions; import io.openvidu.server.core.Participant; -import io.openvidu.server.kurento.TrackType; import io.openvidu.server.kurento.endpoint.KmsEvent; import io.openvidu.server.kurento.endpoint.KmsMediaEvent; import io.openvidu.server.kurento.endpoint.MediaEndpoint; @@ -72,7 +69,6 @@ public class KurentoParticipant extends Participant { private boolean webParticipant = true; private final KurentoSession session; - private final MediaPipeline pipeline; private PublisherEndpoint publisher; private CountDownLatch endPointLatch = new CountDownLatch(1); @@ -80,9 +76,8 @@ public class KurentoParticipant extends Participant { private final ConcurrentMap filters = new ConcurrentHashMap<>(); private final ConcurrentMap subscribers = new ConcurrentHashMap(); - public KurentoParticipant(Participant participant, KurentoSession kurentoSession, MediaPipeline pipeline, - InfoHandler infoHandler, CallDetailRecord CDR, OpenviduConfig openviduConfig, - RecordingManager recordingManager) { + public KurentoParticipant(Participant participant, KurentoSession kurentoSession, InfoHandler infoHandler, + CallDetailRecord CDR, OpenviduConfig openviduConfig, RecordingManager recordingManager) { super(participant.getParticipantPrivateId(), participant.getParticipantPublicId(), participant.getToken(), participant.getClientMetadata(), participant.getLocation(), participant.getPlatform(), participant.getCreatedAt()); @@ -91,9 +86,11 @@ public class KurentoParticipant extends Participant { this.openviduConfig = openviduConfig; this.recordingManager = recordingManager; this.session = kurentoSession; - this.pipeline = pipeline; - this.publisher = new PublisherEndpoint(webParticipant, this, participant.getParticipantPublicId(), pipeline, - this.openviduConfig); + + if (!OpenViduRole.SUBSCRIBER.equals(participant.getToken().getRole())) { + this.publisher = new PublisherEndpoint(webParticipant, this, participant.getParticipantPublicId(), + this.session.getPipeline(), this.openviduConfig); + } for (Participant other : session.getParticipants()) { if (!other.getParticipantPublicId().equals(this.getParticipantPublicId())) { @@ -113,7 +110,7 @@ public class KurentoParticipant extends Participant { String publisherStreamId = this.getParticipantPublicId() + "_" + (mediaOptions.hasVideo() ? mediaOptions.getTypeOfVideo() : "MICRO") + "_" + RandomStringUtils.random(5, true, false).toUpperCase(); - this.publisher.getEndpoint().setName(publisherStreamId); + this.publisher.setStreamId(publisherStreamId); addEndpointListeners(this.publisher); // Remove streamId from publisher's map @@ -121,35 +118,10 @@ public class KurentoParticipant extends Participant { } - public void shapePublisherMedia(GenericMediaElement element, MediaType type) { - if (type == null) { - this.publisher.apply(element); - } else { - this.publisher.apply(element, type); - } - } - public synchronized Filter getFilterElement(String id) { return filters.get(id); } - /* - * public synchronized void addFilterElement(String id, Filter filter) { - * filters.put(id, filter); shapePublisherMedia(filter, null); } - * - * public synchronized void disableFilterelement(String filterID, boolean - * releaseElement) { Filter filter = getFilterElement(filterID); - * - * if (filter != null) { try { publisher.revert(filter, releaseElement); } catch - * (OpenViduException e) { // Ignore error } } } - * - * public synchronized void enableFilterelement(String filterID) { Filter filter - * = getFilterElement(filterID); - * - * if (filter != null) { try { publisher.apply(filter); } catch - * (OpenViduException e) { // Ignore exception if element is already used } } } - */ - public synchronized void removeFilterElement(String id) { Filter filter = getFilterElement(id); filters.remove(id); @@ -161,7 +133,7 @@ public class KurentoParticipant extends Participant { public synchronized void releaseAllFilters() { // Check this, mutable array? filters.forEach((s, filter) -> removeFilterElement(s)); - if (this.publisher.getFilter() != null) { + if (this.publisher != null && this.publisher.getFilter() != null) { this.publisher.revert(this.publisher.getFilter()); } } @@ -191,38 +163,6 @@ public class KurentoParticipant extends Participant { return session; } - public Set getAllConnectedSubscribedEndpoints() { - Set subscribedToSet = new HashSet<>(); - for (SubscriberEndpoint se : subscribers.values()) { - if (se.isConnectedToPublisher()) { - subscribedToSet.add(se); - } - } - return subscribedToSet; - } - - public Set getConnectedSubscribedEndpoints(PublisherEndpoint publisher) { - Set subscribedToSet = new HashSet<>(); - for (SubscriberEndpoint se : subscribers.values()) { - if (se.isConnectedToPublisher() && se.getPublisher().equals(publisher)) { - subscribedToSet.add(se); - } - } - return subscribedToSet; - } - - public String preparePublishConnection() { - log.info("PARTICIPANT {}: Request to publish video in room {} by " + "initiating connection from server", - this.getParticipantPublicId(), this.session.getSessionId()); - - String sdpOffer = this.getPublisher().preparePublishConnection(); - - log.trace("PARTICIPANT {}: Publishing SdpOffer is {}", this.getParticipantPublicId(), sdpOffer); - log.info("PARTICIPANT {}: Generated Sdp offer for publishing in room {}", this.getParticipantPublicId(), - this.session.getSessionId()); - return sdpOffer; - } - public String publishToRoom(SdpType sdpType, String sdpString, boolean doLoopback, MediaElement loopbackAlternativeSrc, MediaType loopbackConnectionType) { log.info("PARTICIPANT {}: Request to publish video in room {} (sdp type {})", this.getParticipantPublicId(), @@ -252,7 +192,7 @@ public class KurentoParticipant extends Participant { log.info("PARTICIPANT {}: unpublishing media stream from room {}", this.getParticipantPublicId(), this.session.getSessionId()); releasePublisherEndpoint(reason); - this.publisher = new PublisherEndpoint(webParticipant, this, this.getParticipantPublicId(), pipeline, + this.publisher = new PublisherEndpoint(webParticipant, this, this.getParticipantPublicId(), this.getPipeline(), this.openviduConfig); log.info("PARTICIPANT {}: released publisher endpoint and left it initialized (ready for future streaming)", this.getParticipantPublicId()); @@ -357,21 +297,13 @@ public class KurentoParticipant extends Participant { } } - public void mutePublishedMedia(TrackType trackType) { - this.getPublisher().mute(trackType); - } - - public void unmutePublishedMedia(TrackType trackType) { - this.getPublisher().unmute(trackType); - } - - public void close(String reason) { + public void close(String reason, boolean definitelyClosed) { log.debug("PARTICIPANT {}: Closing user", this.getParticipantPublicId()); if (isClosed()) { log.warn("PARTICIPANT {}: Already closed", this.getParticipantPublicId()); return; } - this.closed = true; + this.closed = definitelyClosed; for (String remoteParticipantName : subscribers.keySet()) { SubscriberEndpoint subscriber = this.subscribers.get(remoteParticipantName); if (subscriber != null && subscriber.getEndpoint() != null) { @@ -385,6 +317,7 @@ public class KurentoParticipant extends Participant { this.getParticipantPublicId(), remoteParticipantName); } } + this.subscribers.clear(); releasePublisherEndpoint(reason); } @@ -397,8 +330,8 @@ public class KurentoParticipant extends Participant { */ public SubscriberEndpoint getNewOrExistingSubscriber(String senderPublicId) { - SubscriberEndpoint sendingEndpoint = new SubscriberEndpoint(webParticipant, this, senderPublicId, pipeline, - this.openviduConfig); + SubscriberEndpoint sendingEndpoint = new SubscriberEndpoint(webParticipant, this, senderPublicId, + this.getPipeline(), this.openviduConfig); SubscriberEndpoint existingSendingEndpoint = this.subscribers.putIfAbsent(senderPublicId, sendingEndpoint); if (existingSendingEndpoint != null) { @@ -440,7 +373,7 @@ public class KurentoParticipant extends Participant { if (this.openviduConfig.isRecordingModuleEnabled() && this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) { this.recordingManager.stopOneIndividualStreamRecording(session.getSessionId(), - this.getPublisherStreamId()); + this.getPublisherStreamId(), false); } publisher.unregisterErrorListeners(); @@ -450,6 +383,7 @@ public class KurentoParticipant extends Participant { releaseElement(getParticipantPublicId(), publisher.getEndpoint()); this.streaming = false; publisher = null; + this.session.deregisterPublisher(); CDR.stopPublisher(this.getParticipantPublicId(), reason); @@ -596,12 +530,18 @@ public class KurentoParticipant extends Participant { } public MediaPipeline getPipeline() { - return this.pipeline; + return this.session.getPipeline(); } @Override public String getPublisherStreamId() { - return this.publisher.getEndpoint().getName(); + return this.publisher.getStreamId(); + } + + public void resetPublisherEndpoint() { + log.info("Reseting publisher endpoint for participant {}", this.getParticipantPublicId()); + this.publisher = new PublisherEndpoint(webParticipant, this, this.getParticipantPublicId(), + this.session.getPipeline(), this.openviduConfig); } @Override diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java index 591e7e50..92d60280 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java @@ -33,8 +33,11 @@ import org.slf4j.LoggerFactory; import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.client.internal.ProtocolElements; +import io.openvidu.java.client.OpenViduRole; +import io.openvidu.java.client.Recording.OutputMode; import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; +import io.openvidu.server.recording.Recording; /** * @author Pablo Fuente (pablofuenteperez@gmail.com) @@ -54,7 +57,6 @@ public class KurentoSession extends Session { private Object pipelineCreateLock = new Object(); private Object pipelineReleaseLock = new Object(); - private volatile boolean pipelineReleased = false; private boolean destroyKurentoClient; public final ConcurrentHashMap publishedStreamIds = new ConcurrentHashMap<>(); @@ -73,7 +75,7 @@ public class KurentoSession extends Session { checkClosed(); createPipeline(); - KurentoParticipant kurentoParticipant = new KurentoParticipant(participant, this, getPipeline(), + KurentoParticipant kurentoParticipant = new KurentoParticipant(participant, this, kurentoSessionHandler.getInfoHandler(), this.CDR, this.openviduConfig, this.recordingManager); participants.put(participant.getParticipantPrivateId(), kurentoParticipant); @@ -105,15 +107,12 @@ public class KurentoSession extends Session { } public void cancelPublisher(Participant participant, String reason) { - deregisterPublisher(); - - // cancel recv video from this publisher + // Cancel all subscribers for this publisher for (Participant subscriber : participants.values()) { if (participant.equals(subscriber)) { continue; } ((KurentoParticipant) subscriber).cancelReceivingMedia(participant.getParticipantPublicId(), reason); - } log.debug("SESSION {}: Unsubscribed other participants {} from the publisher {}", sessionId, @@ -134,11 +133,9 @@ public class KurentoSession extends Session { participant.releaseAllFilters(); log.info("PARTICIPANT {}: Leaving session {}", participant.getParticipantPublicId(), this.sessionId); - if (participant.isStreaming()) { - this.deregisterPublisher(); - } + this.removeParticipant(participant, reason); - participant.close(reason); + participant.close(reason, true); if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(participant.getParticipantPublicId())) { CDR.recordParticipantLeft(participant, participant.getSession().getSessionId(), reason); @@ -151,12 +148,12 @@ public class KurentoSession extends Session { for (Participant participant : participants.values()) { ((KurentoParticipant) participant).releaseAllFilters(); - ((KurentoParticipant) participant).close(reason); + ((KurentoParticipant) participant).close(reason, true); } participants.clear(); - closePipeline(); + closePipeline(null); log.debug("Session {} closed", this.sessionId); @@ -248,23 +245,30 @@ public class KurentoSession extends Session { } } - private void closePipeline() { + private void closePipeline(Runnable callback) { synchronized (pipelineReleaseLock) { - if (pipeline == null || pipelineReleased) { + if (pipeline == null) { return; } getPipeline().release(new Continuation() { - @Override public void onSuccess(Void result) throws Exception { log.debug("SESSION {}: Released Pipeline", sessionId); - pipelineReleased = true; + pipeline = null; + pipelineLatch = new CountDownLatch(1); + if (callback != null) { + callback.run(); + } } @Override public void onError(Throwable cause) throws Exception { log.warn("SESSION {}: Could not successfully release Pipeline", sessionId, cause); - pipelineReleased = true; + pipeline = null; + pipelineLatch = new CountDownLatch(1); + if (callback != null) { + callback.run(); + } } }); } @@ -274,4 +278,49 @@ public class KurentoSession extends Session { return this.publishedStreamIds.get(streamId); } + public void restartStatusInKurento() { + + log.info("Reseting remote media objects for active session {}", this.sessionId); + + // Stop recording if session is being recorded + if (recordingManager.sessionIsBeingRecorded(this.sessionId)) { + Recording stoppedRecording = this.recordingManager.forceStopRecording(this, "mediaServerDisconnect"); + if (OutputMode.COMPOSED.equals(stoppedRecording.getOutputMode()) && stoppedRecording.hasVideo()) { + recordingManager.getSessionManager().evictParticipant( + this.getParticipantByPublicId(ProtocolElements.RECORDER_PARTICIPANT_PUBLICID), null, null, + "EVICT_RECORDER"); + } + } + + // Close all MediaEndpoints of participants + this.getParticipants().forEach(p -> { + KurentoParticipant kParticipant = (KurentoParticipant) p; + final boolean wasStreaming = kParticipant.isStreaming(); + kParticipant.releaseAllFilters(); + kParticipant.close("mediaServerDisconnect", false); + if (wasStreaming) { + kurentoSessionHandler.onUnpublishMedia(kParticipant, this.getParticipants(), null, null, null, + "mediaServerDisconnect"); + } + }); + + // Release pipeline, create a new one and prepare new PublisherEndpoints for + // allowed users + this.closePipeline(() -> { + createPipeline(); + try { + if (!pipelineLatch.await(20, TimeUnit.SECONDS)) { + throw new Exception("MediaPipleine was not created in 20 seconds"); + } + getParticipants().forEach(p -> { + if (!OpenViduRole.SUBSCRIBER.equals(p.getToken().getRole())) { + ((KurentoParticipant) p).resetPublisherEndpoint(); + } + }); + } catch (Exception e) { + log.error("Error waiting to new MediaPipeline on KurentoSession restart: {}", e.getMessage()); + } + }); + } + } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java index b4ebba76..b4e56850 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java @@ -74,6 +74,8 @@ public class PublisherEndpoint extends MediaEndpoint { private Map elementsErrorSubscriptions = new HashMap(); + private String streamId; + public PublisherEndpoint(boolean web, KurentoParticipant owner, String endpointName, MediaPipeline pipeline, OpenviduConfig openviduConfig) { super(web, owner, endpointName, pipeline, openviduConfig, log); @@ -136,7 +138,7 @@ public class PublisherEndpoint extends MediaEndpoint { public boolean removeParticipantAsListenerOfFilterEvent(String eventType, String participantPublicId) { if (!this.subscribersToFilterEvents.containsKey(eventType)) { - String streamId = this.getEndpoint().getName(); + String streamId = this.getStreamId(); log.error("Request to removeFilterEventListener to stream {} gone wrong: Filter {} has no listener added", streamId, eventType); throw new OpenViduException(Code.FILTER_EVENT_LISTENER_NOT_FOUND, @@ -164,16 +166,12 @@ public class PublisherEndpoint extends MediaEndpoint { * itself (after applying the intermediate media elements and the * {@link PassThrough}) to allow loopback of the media stream. * - * @param sdpType - * indicates the type of the sdpString (offer or answer) - * @param sdpString - * offer or answer from the remote peer - * @param doLoopback - * loopback flag - * @param loopbackAlternativeSrc - * alternative loopback source - * @param loopbackConnectionType - * how to connect the loopback source + * @param sdpType indicates the type of the sdpString (offer or + * answer) + * @param sdpString offer or answer from the remote peer + * @param doLoopback loopback flag + * @param loopbackAlternativeSrc alternative loopback source + * @param loopbackConnectionType how to connect the loopback source * @return the SDP response (the answer if processing an offer SDP, otherwise is * the updated offer generated previously by this endpoint) */ @@ -238,12 +236,10 @@ public class PublisherEndpoint extends MediaEndpoint { * is left ready for when the connections between elements will materialize and * the streaming begins. * - * @param shaper - * {@link MediaElement} that will be linked to the end of the chain - * (e.g. a filter) + * @param shaper {@link MediaElement} that will be linked to the end of the + * chain (e.g. a filter) * @return the element's id - * @throws OpenViduException - * if thrown, the media element was not added + * @throws OpenViduException if thrown, the media element was not added */ public String apply(GenericMediaElement shaper) throws OpenViduException { return apply(shaper, null); @@ -253,15 +249,12 @@ public class PublisherEndpoint extends MediaEndpoint { * Same as {@link #apply(MediaElement)}, can specify the media type that will be * streamed through the shaper element. * - * @param shaper - * {@link MediaElement} that will be linked to the end of the chain - * (e.g. a filter) - * @param type - * indicates which type of media will be connected to the shaper - * ({@link MediaType}), if null then the connection is mixed + * @param shaper {@link MediaElement} that will be linked to the end of the + * chain (e.g. a filter) + * @param type indicates which type of media will be connected to the shaper + * ({@link MediaType}), if null then the connection is mixed * @return the element's id - * @throws OpenViduException - * if thrown, the media element was not added + * @throws OpenViduException if thrown, the media element was not added */ public synchronized String apply(GenericMediaElement shaper, MediaType type) throws OpenViduException { String id = shaper.getId(); @@ -299,10 +292,8 @@ public class PublisherEndpoint extends MediaEndpoint { * object is released. If the chain is connected, both adjacent remaining * elements will be interconnected. * - * @param shaper - * {@link MediaElement} that will be removed from the chain - * @throws OpenViduException - * if thrown, the media element was not removed + * @param shaper {@link MediaElement} that will be removed from the chain + * @throws OpenViduException if thrown, the media element was not removed */ public synchronized void revert(MediaElement shaper) throws OpenViduException { revert(shaper, true); @@ -477,9 +468,9 @@ public class PublisherEndpoint extends MediaEndpoint { * * @param source * @param sink - * @param type - * if null, {@link #internalSinkConnect(MediaElement, MediaElement)} - * will be used instead + * @param type if null, + * {@link #internalSinkConnect(MediaElement, MediaElement)} will + * be used instead * @see #internalSinkConnect(MediaElement, MediaElement) */ private void internalSinkConnect(final MediaElement source, final MediaElement sink, final MediaType type) { @@ -524,9 +515,9 @@ public class PublisherEndpoint extends MediaEndpoint { * * @param source * @param sink - * @param type - * if null, {@link #internalSinkConnect(MediaElement, MediaElement)} - * will be used instead + * @param type if null, + * {@link #internalSinkConnect(MediaElement, MediaElement)} will + * be used instead * @see #internalSinkConnect(MediaElement, MediaElement) */ private void internalSinkDisconnect(final MediaElement source, final MediaElement sink, final MediaType type) { @@ -565,7 +556,7 @@ public class PublisherEndpoint extends MediaEndpoint { @Override public JsonObject toJson() { JsonObject json = super.toJson(); - json.addProperty("streamId", this.getEndpoint().getName()); + json.addProperty("streamId", this.getStreamId()); json.add("mediaOptions", this.mediaOptions.toJson()); return json; } @@ -584,4 +575,13 @@ public class PublisherEndpoint extends MediaEndpoint { return "{filter: " + ((this.filter != null) ? this.filter.getName() : "null") + ", listener: " + this.filterListeners.toString() + ", subscribers: " + this.subscribersToFilterEvents.toString() + "}"; } + + public void setStreamId(String publisherStreamId) { + this.streamId = publisherStreamId; + this.getEndpoint().setName(publisherStreamId); + } + + public String getStreamId() { + return this.streamId != null ? this.streamId : this.getEndpoint().getName(); + } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java index 771b1c34..2e375e09 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java @@ -78,7 +78,7 @@ public class SubscriberEndpoint extends MediaEndpoint { public JsonObject toJson() { JsonObject json = super.toJson(); try { - json.addProperty("streamId", this.publisher.getEndpoint().getName()); + json.addProperty("streamId", this.publisher.getStreamId()); } catch (NullPointerException ex) { json.addProperty("streamId", "NOT_FOUND"); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java index 5090110e..a467bca7 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java @@ -17,15 +17,28 @@ package io.openvidu.server.kurento.kms; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + import org.kurento.client.KurentoClient; import org.kurento.client.KurentoConnectionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import io.openvidu.server.core.SessionManager; +import io.openvidu.server.kurento.core.KurentoSession; public class FixedOneKmsManager extends KmsManager { private static final Logger log = LoggerFactory.getLogger(FixedOneKmsManager.class); + @Autowired + SessionManager sessionManager; + + public static final AtomicBoolean CONNECTED_TO_KMS = new AtomicBoolean(false); + public static final AtomicLong TIME_OF_DISCONNECTION = new AtomicLong(0); + public FixedOneKmsManager(String kmsWsUri) { this(kmsWsUri, 1); } @@ -36,21 +49,37 @@ public class FixedOneKmsManager extends KmsManager { @Override public void reconnected(boolean isReconnected) { - log.warn("Kurento Client reconnected ({}) to KMS with uri {}", isReconnected, kmsWsUri); + CONNECTED_TO_KMS.compareAndSet(false, true); + if (!isReconnected) { + // Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints) + log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", kmsWsUri); + log.warn("Updating all webrtc endpoints for active sessions"); + sessionManager.getSessions().forEach(s -> { + ((KurentoSession) s).restartStatusInKurento(); + }); + } else { + // Same KMS. We can infer that openvidu-server/KMS connection has been lost, but + // not the clients/KMS connections + log.warn("Kurento Client reconnected to same KMS with uri {}", kmsWsUri); + } } @Override public void disconnected() { + CONNECTED_TO_KMS.compareAndSet(true, false); + TIME_OF_DISCONNECTION.set(System.currentTimeMillis()); log.warn("Kurento Client disconnected from KMS with uri {}", kmsWsUri); } @Override public void connectionFailed() { + CONNECTED_TO_KMS.set(false); log.warn("Kurento Client failed connecting to KMS with uri {}", kmsWsUri); } @Override public void connected() { + CONNECTED_TO_KMS.compareAndSet(false, true); log.warn("Kurento Client is now connected to KMS with uri {}", kmsWsUri); } }), kmsWsUri)); diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java b/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java index 7112308d..61ab8943 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java @@ -38,6 +38,7 @@ import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.kurento.endpoint.PublisherEndpoint; +import io.openvidu.server.kurento.kms.FixedOneKmsManager; public class CompositeWrapper { @@ -86,19 +87,27 @@ public class CompositeWrapper { this.recorderEndpoint.record(); } - public synchronized void stopCompositeRecording(CountDownLatch stopLatch) { - this.recorderEndpoint.addStoppedListener(new EventListener() { - @Override - public void onEvent(StoppedEvent event) { - endTime = System.currentTimeMillis(); - log.info("Recording stopped event for audio-only RecorderEndpoint of Composite in session {}", - session.getSessionId()); - recorderEndpoint.release(); - compositeToRecorderHubPort.release(); - stopLatch.countDown(); - } - }); - this.recorderEndpoint.stop(); + public synchronized void stopCompositeRecording(CountDownLatch stopLatch, boolean forceAfterKmsRestart) { + if (!forceAfterKmsRestart) { + this.recorderEndpoint.addStoppedListener(new EventListener() { + @Override + public void onEvent(StoppedEvent event) { + endTime = System.currentTimeMillis(); + log.info("Recording stopped event for audio-only RecorderEndpoint of Composite in session {}", + session.getSessionId()); + recorderEndpoint.release(); + compositeToRecorderHubPort.release(); + stopLatch.countDown(); + } + }); + this.recorderEndpoint.stop(); + } else { + endTime = FixedOneKmsManager.TIME_OF_DISCONNECTION.get(); + stopLatch.countDown(); + log.warn("Forcing composed audio-only recording stop after KMS restart in session {}", + this.session.getSessionId()); + } + } public void connectPublisherEndpoint(PublisherEndpoint endpoint) throws OpenViduException { diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java index b95cc4c6..3cb630f2 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java @@ -100,10 +100,14 @@ public class ComposedRecordingService extends RecordingService { @Override public Recording stopRecording(Session session, Recording recording, String reason) { + return this.stopRecording(session, recording, reason, false); + } + + public Recording stopRecording(Session session, Recording recording, String reason, boolean forceAfterKmsRestart) { if (recording.hasVideo()) { return this.stopRecordingWithVideo(session, recording, reason); } else { - return this.stopRecordingAudioOnly(session, recording, reason); + return this.stopRecordingAudioOnly(session, recording, reason, forceAfterKmsRestart); } } @@ -330,7 +334,8 @@ public class ComposedRecordingService extends RecordingService { return recording; } - private Recording stopRecordingAudioOnly(Session session, Recording recording, String reason) { + private Recording stopRecordingAudioOnly(Session session, Recording recording, String reason, + boolean forceAfterKmsRestart) { log.info("Stopping composed (audio-only) recording {} of session {}. Reason: {}", recording.getId(), recording.getSessionId(), reason); @@ -349,7 +354,8 @@ public class ComposedRecordingService extends RecordingService { CompositeWrapper compositeWrapper = this.composites.remove(sessionId); final CountDownLatch stoppedCountDown = new CountDownLatch(1); - compositeWrapper.stopCompositeRecording(stoppedCountDown); + + compositeWrapper.stopCompositeRecording(stoppedCountDown, forceAfterKmsRestart); try { if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) { recording.setStatus(io.openvidu.java.client.Recording.Status.failed); diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java index a4d131d4..0dba1300 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java @@ -114,6 +114,10 @@ public class RecordingManager { return this.sessionHandler; } + public SessionManager getSessionManager() { + return this.sessionManager; + } + public void initializeRecordingManager() throws OpenViduException { RecordingManager.IMAGE_TAG = openviduConfig.getOpenViduRecordingVersion(); @@ -207,6 +211,21 @@ public class RecordingManager { return recording; } + public Recording forceStopRecording(Session session, String reason) { + Recording recording; + recording = this.sessionsRecordings.get(session.getSessionId()); + switch (recording.getOutputMode()) { + case COMPOSED: + recording = this.composedRecordingService.stopRecording(session, recording, reason, true); + break; + case INDIVIDUAL: + recording = this.singleStreamRecordingService.stopRecording(session, recording, reason, true); + break; + } + this.abortAutomaticRecordingStopThread(session); + return recording; + } + public void startOneIndividualStreamRecording(Session session, String recordingId, MediaProfileSpecType profile, Participant participant) { Recording recording = this.sessionsRecordings.get(session.getSessionId()); @@ -230,7 +249,7 @@ public class RecordingManager { } } - public void stopOneIndividualStreamRecording(String sessionId, String streamId) { + public void stopOneIndividualStreamRecording(String sessionId, String streamId, boolean forceAfterKmsRestart) { Recording recording = this.sessionsRecordings.get(sessionId); if (recording == null) { log.error("Cannot stop recording of existing stream {}. Session {} is not being recorded", streamId, @@ -241,7 +260,7 @@ public class RecordingManager { log.info("Stopping RecorderEndpoint in session {} for stream of participant {}", sessionId, streamId); final CountDownLatch stoppedCountDown = new CountDownLatch(1); this.singleStreamRecordingService.stopRecorderEndpointOfPublisherEndpoint(sessionId, streamId, - stoppedCountDown); + stoppedCountDown, forceAfterKmsRestart); try { if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) { log.error("Error waiting for recorder endpoint of stream {} to stop in session {}", streamId, @@ -362,7 +381,7 @@ public class RecordingManager { sessionManager.closeSessionAndEmptyCollections(session, "automaticStop"); sessionManager.showTokens(); } else { - this.stopRecording(null, recordingId, "automaticStop"); + this.stopRecording(session, recordingId, "automaticStop"); } } else { // This code is reachable if there already was an automatic stop of a recording diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java index 9676c61b..f23d4bf4 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java @@ -56,6 +56,7 @@ import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; import io.openvidu.server.kurento.core.KurentoParticipant; import io.openvidu.server.kurento.endpoint.PublisherEndpoint; +import io.openvidu.server.kurento.kms.FixedOneKmsManager; import io.openvidu.server.recording.RecorderEndpointWrapper; import io.openvidu.server.recording.Recording; @@ -127,7 +128,10 @@ public class SingleStreamRecordingService extends RecordingService { @Override public Recording stopRecording(Session session, Recording recording, String reason) { + return this.stopRecording(session, recording, reason, false); + } + public Recording stopRecording(Session session, Recording recording, String reason, boolean forceAfterKmsRestart) { log.info("Stopping individual ({}) recording {} of session {}. Reason: {}", recording.hasVideo() ? (recording.hasAudio() ? "video+audio" : "video-only") : "audioOnly", recording.getId(), recording.getSessionId(), reason); @@ -137,7 +141,7 @@ public class SingleStreamRecordingService extends RecordingService { for (RecorderEndpointWrapper wrapper : recorders.get(recording.getSessionId()).values()) { this.stopRecorderEndpointOfPublisherEndpoint(recording.getSessionId(), wrapper.getStreamId(), - stoppedCountDown); + stoppedCountDown, forceAfterKmsRestart); } try { if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) { @@ -219,10 +223,10 @@ public class SingleStreamRecordingService extends RecordingService { } public void stopRecorderEndpointOfPublisherEndpoint(String sessionId, String streamId, - CountDownLatch globalStopLatch) { + CountDownLatch globalStopLatch, boolean forceAfterKmsRestart) { log.info("Stopping single stream recorder for stream {} in session {}", streamId, sessionId); final RecorderEndpointWrapper finalWrapper = this.recorders.get(sessionId).remove(streamId); - if (finalWrapper != null) { + if (finalWrapper != null && !forceAfterKmsRestart) { finalWrapper.getRecorder().addStoppedListener(new EventListener() { @Override public void onEvent(StoppedEvent event) { @@ -235,7 +239,14 @@ public class SingleStreamRecordingService extends RecordingService { }); finalWrapper.getRecorder().stop(); } else { - log.error("Stream {} wasn't being recorded in session {}", streamId, sessionId); + if (forceAfterKmsRestart) { + finalWrapper.setEndTime(FixedOneKmsManager.TIME_OF_DISCONNECTION.get()); + generateIndividualMetadataFile(finalWrapper); + log.warn("Forcing individual recording stop after KMS restart for stream {} in session {}", streamId, + sessionId); + } else { + log.error("Stream {} wasn't being recorded in session {}", streamId, sessionId); + } globalStopLatch.countDown(); } }