openvidu-server: reconnection to KMS

pull/255/head
pabloFuente 2019-02-22 10:41:31 +01:00
parent 93b2aebcc7
commit 5e23d1fd16
11 changed files with 232 additions and 166 deletions

View File

@ -67,10 +67,10 @@ import io.openvidu.server.recording.service.RecordingManager;
* - resolution string
* - recordingLayout: string
* - size: number
* - webrtcConnectionDestroyed.reason: "unsubscribe", "unpublish", "disconnect", "networkDisconnect", "openviduServerStopped"
* - webrtcConnectionDestroyed.reason: "unsubscribe", "unpublish", "disconnect", "networkDisconnect", "mediaServerDisconnect", "openviduServerStopped"
* - participantLeft.reason: "unsubscribe", "unpublish", "disconnect", "networkDisconnect", "openviduServerStopped"
* - sessionDestroyed.reason: "lastParticipantLeft", "openviduServerStopped"
* - recordingStopped.reason: "recordingStoppedByServer", "lastParticipantLeft", "sessionClosedByServer", "automaticStop", "openviduServerStopped"
* - recordingStopped.reason: "recordingStoppedByServer", "lastParticipantLeft", "sessionClosedByServer", "automaticStop", "mediaServerDisconnect", "openviduServerStopped"
*
* [OPTIONAL_PROPERTIES]:
* - receivingFrom: only if connection = "INBOUND"
@ -105,7 +105,9 @@ public class CallDetailRecord {
public void recordSessionDestroyed(String sessionId, String reason) {
CDREvent e = this.sessions.remove(sessionId);
this.log(new CDREventSession(e, RecordingManager.finalReason(reason)));
if (e != null) {
this.log(new CDREventSession(e, RecordingManager.finalReason(reason)));
}
}
public void recordParticipantJoined(Participant participant, String sessionId) {

View File

@ -414,6 +414,7 @@ public abstract class SessionManager {
throw new OpenViduException(Code.ROOM_NOT_FOUND_ERROR_CODE, "Session '" + sessionId + "' not found");
}
if (session.isClosed()) {
this.closeSessionAndEmptyCollections(session, reason);
throw new OpenViduException(Code.ROOM_CLOSED_ERROR_CODE, "Session '" + sessionId + "' already closed");
}
Set<Participant> participants = getParticipants(sessionId);

View File

@ -17,8 +17,6 @@
package io.openvidu.server.kurento.core;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@ -29,7 +27,6 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.kurento.client.Continuation;
import org.kurento.client.ErrorEvent;
import org.kurento.client.Filter;
import org.kurento.client.GenericMediaElement;
import org.kurento.client.IceCandidate;
import org.kurento.client.IceComponentState;
import org.kurento.client.MediaElement;
@ -46,12 +43,12 @@ import com.google.gson.JsonObject;
import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code;
import io.openvidu.client.internal.ProtocolElements;
import io.openvidu.java.client.OpenViduRole;
import io.openvidu.server.cdr.CallDetailRecord;
import io.openvidu.server.config.InfoHandler;
import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.core.MediaOptions;
import io.openvidu.server.core.Participant;
import io.openvidu.server.kurento.TrackType;
import io.openvidu.server.kurento.endpoint.KmsEvent;
import io.openvidu.server.kurento.endpoint.KmsMediaEvent;
import io.openvidu.server.kurento.endpoint.MediaEndpoint;
@ -72,7 +69,6 @@ public class KurentoParticipant extends Participant {
private boolean webParticipant = true;
private final KurentoSession session;
private final MediaPipeline pipeline;
private PublisherEndpoint publisher;
private CountDownLatch endPointLatch = new CountDownLatch(1);
@ -80,9 +76,8 @@ public class KurentoParticipant extends Participant {
private final ConcurrentMap<String, Filter> filters = new ConcurrentHashMap<>();
private final ConcurrentMap<String, SubscriberEndpoint> subscribers = new ConcurrentHashMap<String, SubscriberEndpoint>();
public KurentoParticipant(Participant participant, KurentoSession kurentoSession, MediaPipeline pipeline,
InfoHandler infoHandler, CallDetailRecord CDR, OpenviduConfig openviduConfig,
RecordingManager recordingManager) {
public KurentoParticipant(Participant participant, KurentoSession kurentoSession, InfoHandler infoHandler,
CallDetailRecord CDR, OpenviduConfig openviduConfig, RecordingManager recordingManager) {
super(participant.getParticipantPrivateId(), participant.getParticipantPublicId(), participant.getToken(),
participant.getClientMetadata(), participant.getLocation(), participant.getPlatform(),
participant.getCreatedAt());
@ -91,9 +86,11 @@ public class KurentoParticipant extends Participant {
this.openviduConfig = openviduConfig;
this.recordingManager = recordingManager;
this.session = kurentoSession;
this.pipeline = pipeline;
this.publisher = new PublisherEndpoint(webParticipant, this, participant.getParticipantPublicId(), pipeline,
this.openviduConfig);
if (!OpenViduRole.SUBSCRIBER.equals(participant.getToken().getRole())) {
this.publisher = new PublisherEndpoint(webParticipant, this, participant.getParticipantPublicId(),
this.session.getPipeline(), this.openviduConfig);
}
for (Participant other : session.getParticipants()) {
if (!other.getParticipantPublicId().equals(this.getParticipantPublicId())) {
@ -113,7 +110,7 @@ public class KurentoParticipant extends Participant {
String publisherStreamId = this.getParticipantPublicId() + "_"
+ (mediaOptions.hasVideo() ? mediaOptions.getTypeOfVideo() : "MICRO") + "_"
+ RandomStringUtils.random(5, true, false).toUpperCase();
this.publisher.getEndpoint().setName(publisherStreamId);
this.publisher.setStreamId(publisherStreamId);
addEndpointListeners(this.publisher);
// Remove streamId from publisher's map
@ -121,35 +118,10 @@ public class KurentoParticipant extends Participant {
}
public void shapePublisherMedia(GenericMediaElement element, MediaType type) {
if (type == null) {
this.publisher.apply(element);
} else {
this.publisher.apply(element, type);
}
}
public synchronized Filter getFilterElement(String id) {
return filters.get(id);
}
/*
* public synchronized void addFilterElement(String id, Filter filter) {
* filters.put(id, filter); shapePublisherMedia(filter, null); }
*
* public synchronized void disableFilterelement(String filterID, boolean
* releaseElement) { Filter filter = getFilterElement(filterID);
*
* if (filter != null) { try { publisher.revert(filter, releaseElement); } catch
* (OpenViduException e) { // Ignore error } } }
*
* public synchronized void enableFilterelement(String filterID) { Filter filter
* = getFilterElement(filterID);
*
* if (filter != null) { try { publisher.apply(filter); } catch
* (OpenViduException e) { // Ignore exception if element is already used } } }
*/
public synchronized void removeFilterElement(String id) {
Filter filter = getFilterElement(id);
filters.remove(id);
@ -161,7 +133,7 @@ public class KurentoParticipant extends Participant {
public synchronized void releaseAllFilters() {
// Check this, mutable array?
filters.forEach((s, filter) -> removeFilterElement(s));
if (this.publisher.getFilter() != null) {
if (this.publisher != null && this.publisher.getFilter() != null) {
this.publisher.revert(this.publisher.getFilter());
}
}
@ -191,38 +163,6 @@ public class KurentoParticipant extends Participant {
return session;
}
public Set<SubscriberEndpoint> getAllConnectedSubscribedEndpoints() {
Set<SubscriberEndpoint> subscribedToSet = new HashSet<>();
for (SubscriberEndpoint se : subscribers.values()) {
if (se.isConnectedToPublisher()) {
subscribedToSet.add(se);
}
}
return subscribedToSet;
}
public Set<SubscriberEndpoint> getConnectedSubscribedEndpoints(PublisherEndpoint publisher) {
Set<SubscriberEndpoint> subscribedToSet = new HashSet<>();
for (SubscriberEndpoint se : subscribers.values()) {
if (se.isConnectedToPublisher() && se.getPublisher().equals(publisher)) {
subscribedToSet.add(se);
}
}
return subscribedToSet;
}
public String preparePublishConnection() {
log.info("PARTICIPANT {}: Request to publish video in room {} by " + "initiating connection from server",
this.getParticipantPublicId(), this.session.getSessionId());
String sdpOffer = this.getPublisher().preparePublishConnection();
log.trace("PARTICIPANT {}: Publishing SdpOffer is {}", this.getParticipantPublicId(), sdpOffer);
log.info("PARTICIPANT {}: Generated Sdp offer for publishing in room {}", this.getParticipantPublicId(),
this.session.getSessionId());
return sdpOffer;
}
public String publishToRoom(SdpType sdpType, String sdpString, boolean doLoopback,
MediaElement loopbackAlternativeSrc, MediaType loopbackConnectionType) {
log.info("PARTICIPANT {}: Request to publish video in room {} (sdp type {})", this.getParticipantPublicId(),
@ -252,7 +192,7 @@ public class KurentoParticipant extends Participant {
log.info("PARTICIPANT {}: unpublishing media stream from room {}", this.getParticipantPublicId(),
this.session.getSessionId());
releasePublisherEndpoint(reason);
this.publisher = new PublisherEndpoint(webParticipant, this, this.getParticipantPublicId(), pipeline,
this.publisher = new PublisherEndpoint(webParticipant, this, this.getParticipantPublicId(), this.getPipeline(),
this.openviduConfig);
log.info("PARTICIPANT {}: released publisher endpoint and left it initialized (ready for future streaming)",
this.getParticipantPublicId());
@ -357,21 +297,13 @@ public class KurentoParticipant extends Participant {
}
}
public void mutePublishedMedia(TrackType trackType) {
this.getPublisher().mute(trackType);
}
public void unmutePublishedMedia(TrackType trackType) {
this.getPublisher().unmute(trackType);
}
public void close(String reason) {
public void close(String reason, boolean definitelyClosed) {
log.debug("PARTICIPANT {}: Closing user", this.getParticipantPublicId());
if (isClosed()) {
log.warn("PARTICIPANT {}: Already closed", this.getParticipantPublicId());
return;
}
this.closed = true;
this.closed = definitelyClosed;
for (String remoteParticipantName : subscribers.keySet()) {
SubscriberEndpoint subscriber = this.subscribers.get(remoteParticipantName);
if (subscriber != null && subscriber.getEndpoint() != null) {
@ -385,6 +317,7 @@ public class KurentoParticipant extends Participant {
this.getParticipantPublicId(), remoteParticipantName);
}
}
this.subscribers.clear();
releasePublisherEndpoint(reason);
}
@ -397,8 +330,8 @@ public class KurentoParticipant extends Participant {
*/
public SubscriberEndpoint getNewOrExistingSubscriber(String senderPublicId) {
SubscriberEndpoint sendingEndpoint = new SubscriberEndpoint(webParticipant, this, senderPublicId, pipeline,
this.openviduConfig);
SubscriberEndpoint sendingEndpoint = new SubscriberEndpoint(webParticipant, this, senderPublicId,
this.getPipeline(), this.openviduConfig);
SubscriberEndpoint existingSendingEndpoint = this.subscribers.putIfAbsent(senderPublicId, sendingEndpoint);
if (existingSendingEndpoint != null) {
@ -440,7 +373,7 @@ public class KurentoParticipant extends Participant {
if (this.openviduConfig.isRecordingModuleEnabled()
&& this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) {
this.recordingManager.stopOneIndividualStreamRecording(session.getSessionId(),
this.getPublisherStreamId());
this.getPublisherStreamId(), false);
}
publisher.unregisterErrorListeners();
@ -450,6 +383,7 @@ public class KurentoParticipant extends Participant {
releaseElement(getParticipantPublicId(), publisher.getEndpoint());
this.streaming = false;
publisher = null;
this.session.deregisterPublisher();
CDR.stopPublisher(this.getParticipantPublicId(), reason);
@ -596,12 +530,18 @@ public class KurentoParticipant extends Participant {
}
public MediaPipeline getPipeline() {
return this.pipeline;
return this.session.getPipeline();
}
@Override
public String getPublisherStreamId() {
return this.publisher.getEndpoint().getName();
return this.publisher.getStreamId();
}
public void resetPublisherEndpoint() {
log.info("Reseting publisher endpoint for participant {}", this.getParticipantPublicId());
this.publisher = new PublisherEndpoint(webParticipant, this, this.getParticipantPublicId(),
this.session.getPipeline(), this.openviduConfig);
}
@Override

View File

@ -33,8 +33,11 @@ import org.slf4j.LoggerFactory;
import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code;
import io.openvidu.client.internal.ProtocolElements;
import io.openvidu.java.client.OpenViduRole;
import io.openvidu.java.client.Recording.OutputMode;
import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session;
import io.openvidu.server.recording.Recording;
/**
* @author Pablo Fuente (pablofuenteperez@gmail.com)
@ -54,7 +57,6 @@ public class KurentoSession extends Session {
private Object pipelineCreateLock = new Object();
private Object pipelineReleaseLock = new Object();
private volatile boolean pipelineReleased = false;
private boolean destroyKurentoClient;
public final ConcurrentHashMap<String, String> publishedStreamIds = new ConcurrentHashMap<>();
@ -73,7 +75,7 @@ public class KurentoSession extends Session {
checkClosed();
createPipeline();
KurentoParticipant kurentoParticipant = new KurentoParticipant(participant, this, getPipeline(),
KurentoParticipant kurentoParticipant = new KurentoParticipant(participant, this,
kurentoSessionHandler.getInfoHandler(), this.CDR, this.openviduConfig, this.recordingManager);
participants.put(participant.getParticipantPrivateId(), kurentoParticipant);
@ -105,15 +107,12 @@ public class KurentoSession extends Session {
}
public void cancelPublisher(Participant participant, String reason) {
deregisterPublisher();
// cancel recv video from this publisher
// Cancel all subscribers for this publisher
for (Participant subscriber : participants.values()) {
if (participant.equals(subscriber)) {
continue;
}
((KurentoParticipant) subscriber).cancelReceivingMedia(participant.getParticipantPublicId(), reason);
}
log.debug("SESSION {}: Unsubscribed other participants {} from the publisher {}", sessionId,
@ -134,11 +133,9 @@ public class KurentoSession extends Session {
participant.releaseAllFilters();
log.info("PARTICIPANT {}: Leaving session {}", participant.getParticipantPublicId(), this.sessionId);
if (participant.isStreaming()) {
this.deregisterPublisher();
}
this.removeParticipant(participant, reason);
participant.close(reason);
participant.close(reason, true);
if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(participant.getParticipantPublicId())) {
CDR.recordParticipantLeft(participant, participant.getSession().getSessionId(), reason);
@ -151,12 +148,12 @@ public class KurentoSession extends Session {
for (Participant participant : participants.values()) {
((KurentoParticipant) participant).releaseAllFilters();
((KurentoParticipant) participant).close(reason);
((KurentoParticipant) participant).close(reason, true);
}
participants.clear();
closePipeline();
closePipeline(null);
log.debug("Session {} closed", this.sessionId);
@ -248,23 +245,30 @@ public class KurentoSession extends Session {
}
}
private void closePipeline() {
private void closePipeline(Runnable callback) {
synchronized (pipelineReleaseLock) {
if (pipeline == null || pipelineReleased) {
if (pipeline == null) {
return;
}
getPipeline().release(new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
log.debug("SESSION {}: Released Pipeline", sessionId);
pipelineReleased = true;
pipeline = null;
pipelineLatch = new CountDownLatch(1);
if (callback != null) {
callback.run();
}
}
@Override
public void onError(Throwable cause) throws Exception {
log.warn("SESSION {}: Could not successfully release Pipeline", sessionId, cause);
pipelineReleased = true;
pipeline = null;
pipelineLatch = new CountDownLatch(1);
if (callback != null) {
callback.run();
}
}
});
}
@ -274,4 +278,49 @@ public class KurentoSession extends Session {
return this.publishedStreamIds.get(streamId);
}
public void restartStatusInKurento() {
log.info("Reseting remote media objects for active session {}", this.sessionId);
// Stop recording if session is being recorded
if (recordingManager.sessionIsBeingRecorded(this.sessionId)) {
Recording stoppedRecording = this.recordingManager.forceStopRecording(this, "mediaServerDisconnect");
if (OutputMode.COMPOSED.equals(stoppedRecording.getOutputMode()) && stoppedRecording.hasVideo()) {
recordingManager.getSessionManager().evictParticipant(
this.getParticipantByPublicId(ProtocolElements.RECORDER_PARTICIPANT_PUBLICID), null, null,
"EVICT_RECORDER");
}
}
// Close all MediaEndpoints of participants
this.getParticipants().forEach(p -> {
KurentoParticipant kParticipant = (KurentoParticipant) p;
final boolean wasStreaming = kParticipant.isStreaming();
kParticipant.releaseAllFilters();
kParticipant.close("mediaServerDisconnect", false);
if (wasStreaming) {
kurentoSessionHandler.onUnpublishMedia(kParticipant, this.getParticipants(), null, null, null,
"mediaServerDisconnect");
}
});
// Release pipeline, create a new one and prepare new PublisherEndpoints for
// allowed users
this.closePipeline(() -> {
createPipeline();
try {
if (!pipelineLatch.await(20, TimeUnit.SECONDS)) {
throw new Exception("MediaPipleine was not created in 20 seconds");
}
getParticipants().forEach(p -> {
if (!OpenViduRole.SUBSCRIBER.equals(p.getToken().getRole())) {
((KurentoParticipant) p).resetPublisherEndpoint();
}
});
} catch (Exception e) {
log.error("Error waiting to new MediaPipeline on KurentoSession restart: {}", e.getMessage());
}
});
}
}

View File

@ -74,6 +74,8 @@ public class PublisherEndpoint extends MediaEndpoint {
private Map<String, ListenerSubscription> elementsErrorSubscriptions = new HashMap<String, ListenerSubscription>();
private String streamId;
public PublisherEndpoint(boolean web, KurentoParticipant owner, String endpointName, MediaPipeline pipeline,
OpenviduConfig openviduConfig) {
super(web, owner, endpointName, pipeline, openviduConfig, log);
@ -136,7 +138,7 @@ public class PublisherEndpoint extends MediaEndpoint {
public boolean removeParticipantAsListenerOfFilterEvent(String eventType, String participantPublicId) {
if (!this.subscribersToFilterEvents.containsKey(eventType)) {
String streamId = this.getEndpoint().getName();
String streamId = this.getStreamId();
log.error("Request to removeFilterEventListener to stream {} gone wrong: Filter {} has no listener added",
streamId, eventType);
throw new OpenViduException(Code.FILTER_EVENT_LISTENER_NOT_FOUND,
@ -164,16 +166,12 @@ public class PublisherEndpoint extends MediaEndpoint {
* itself (after applying the intermediate media elements and the
* {@link PassThrough}) to allow loopback of the media stream.
*
* @param sdpType
* indicates the type of the sdpString (offer or answer)
* @param sdpString
* offer or answer from the remote peer
* @param doLoopback
* loopback flag
* @param loopbackAlternativeSrc
* alternative loopback source
* @param loopbackConnectionType
* how to connect the loopback source
* @param sdpType indicates the type of the sdpString (offer or
* answer)
* @param sdpString offer or answer from the remote peer
* @param doLoopback loopback flag
* @param loopbackAlternativeSrc alternative loopback source
* @param loopbackConnectionType how to connect the loopback source
* @return the SDP response (the answer if processing an offer SDP, otherwise is
* the updated offer generated previously by this endpoint)
*/
@ -238,12 +236,10 @@ public class PublisherEndpoint extends MediaEndpoint {
* is left ready for when the connections between elements will materialize and
* the streaming begins.
*
* @param shaper
* {@link MediaElement} that will be linked to the end of the chain
* (e.g. a filter)
* @param shaper {@link MediaElement} that will be linked to the end of the
* chain (e.g. a filter)
* @return the element's id
* @throws OpenViduException
* if thrown, the media element was not added
* @throws OpenViduException if thrown, the media element was not added
*/
public String apply(GenericMediaElement shaper) throws OpenViduException {
return apply(shaper, null);
@ -253,15 +249,12 @@ public class PublisherEndpoint extends MediaEndpoint {
* Same as {@link #apply(MediaElement)}, can specify the media type that will be
* streamed through the shaper element.
*
* @param shaper
* {@link MediaElement} that will be linked to the end of the chain
* (e.g. a filter)
* @param type
* indicates which type of media will be connected to the shaper
* ({@link MediaType}), if null then the connection is mixed
* @param shaper {@link MediaElement} that will be linked to the end of the
* chain (e.g. a filter)
* @param type indicates which type of media will be connected to the shaper
* ({@link MediaType}), if null then the connection is mixed
* @return the element's id
* @throws OpenViduException
* if thrown, the media element was not added
* @throws OpenViduException if thrown, the media element was not added
*/
public synchronized String apply(GenericMediaElement shaper, MediaType type) throws OpenViduException {
String id = shaper.getId();
@ -299,10 +292,8 @@ public class PublisherEndpoint extends MediaEndpoint {
* object is released. If the chain is connected, both adjacent remaining
* elements will be interconnected.
*
* @param shaper
* {@link MediaElement} that will be removed from the chain
* @throws OpenViduException
* if thrown, the media element was not removed
* @param shaper {@link MediaElement} that will be removed from the chain
* @throws OpenViduException if thrown, the media element was not removed
*/
public synchronized void revert(MediaElement shaper) throws OpenViduException {
revert(shaper, true);
@ -477,9 +468,9 @@ public class PublisherEndpoint extends MediaEndpoint {
*
* @param source
* @param sink
* @param type
* if null, {@link #internalSinkConnect(MediaElement, MediaElement)}
* will be used instead
* @param type if null,
* {@link #internalSinkConnect(MediaElement, MediaElement)} will
* be used instead
* @see #internalSinkConnect(MediaElement, MediaElement)
*/
private void internalSinkConnect(final MediaElement source, final MediaElement sink, final MediaType type) {
@ -524,9 +515,9 @@ public class PublisherEndpoint extends MediaEndpoint {
*
* @param source
* @param sink
* @param type
* if null, {@link #internalSinkConnect(MediaElement, MediaElement)}
* will be used instead
* @param type if null,
* {@link #internalSinkConnect(MediaElement, MediaElement)} will
* be used instead
* @see #internalSinkConnect(MediaElement, MediaElement)
*/
private void internalSinkDisconnect(final MediaElement source, final MediaElement sink, final MediaType type) {
@ -565,7 +556,7 @@ public class PublisherEndpoint extends MediaEndpoint {
@Override
public JsonObject toJson() {
JsonObject json = super.toJson();
json.addProperty("streamId", this.getEndpoint().getName());
json.addProperty("streamId", this.getStreamId());
json.add("mediaOptions", this.mediaOptions.toJson());
return json;
}
@ -584,4 +575,13 @@ public class PublisherEndpoint extends MediaEndpoint {
return "{filter: " + ((this.filter != null) ? this.filter.getName() : "null") + ", listener: "
+ this.filterListeners.toString() + ", subscribers: " + this.subscribersToFilterEvents.toString() + "}";
}
public void setStreamId(String publisherStreamId) {
this.streamId = publisherStreamId;
this.getEndpoint().setName(publisherStreamId);
}
public String getStreamId() {
return this.streamId != null ? this.streamId : this.getEndpoint().getName();
}
}

View File

@ -78,7 +78,7 @@ public class SubscriberEndpoint extends MediaEndpoint {
public JsonObject toJson() {
JsonObject json = super.toJson();
try {
json.addProperty("streamId", this.publisher.getEndpoint().getName());
json.addProperty("streamId", this.publisher.getStreamId());
} catch (NullPointerException ex) {
json.addProperty("streamId", "NOT_FOUND");
}

View File

@ -17,15 +17,28 @@
package io.openvidu.server.kurento.kms;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.kurento.client.KurentoClient;
import org.kurento.client.KurentoConnectionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import io.openvidu.server.core.SessionManager;
import io.openvidu.server.kurento.core.KurentoSession;
public class FixedOneKmsManager extends KmsManager {
private static final Logger log = LoggerFactory.getLogger(FixedOneKmsManager.class);
@Autowired
SessionManager sessionManager;
public static final AtomicBoolean CONNECTED_TO_KMS = new AtomicBoolean(false);
public static final AtomicLong TIME_OF_DISCONNECTION = new AtomicLong(0);
public FixedOneKmsManager(String kmsWsUri) {
this(kmsWsUri, 1);
}
@ -36,21 +49,37 @@ public class FixedOneKmsManager extends KmsManager {
@Override
public void reconnected(boolean isReconnected) {
log.warn("Kurento Client reconnected ({}) to KMS with uri {}", isReconnected, kmsWsUri);
CONNECTED_TO_KMS.compareAndSet(false, true);
if (!isReconnected) {
// Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints)
log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", kmsWsUri);
log.warn("Updating all webrtc endpoints for active sessions");
sessionManager.getSessions().forEach(s -> {
((KurentoSession) s).restartStatusInKurento();
});
} else {
// Same KMS. We can infer that openvidu-server/KMS connection has been lost, but
// not the clients/KMS connections
log.warn("Kurento Client reconnected to same KMS with uri {}", kmsWsUri);
}
}
@Override
public void disconnected() {
CONNECTED_TO_KMS.compareAndSet(true, false);
TIME_OF_DISCONNECTION.set(System.currentTimeMillis());
log.warn("Kurento Client disconnected from KMS with uri {}", kmsWsUri);
}
@Override
public void connectionFailed() {
CONNECTED_TO_KMS.set(false);
log.warn("Kurento Client failed connecting to KMS with uri {}", kmsWsUri);
}
@Override
public void connected() {
CONNECTED_TO_KMS.compareAndSet(false, true);
log.warn("Kurento Client is now connected to KMS with uri {}", kmsWsUri);
}
}), kmsWsUri));

View File

@ -38,6 +38,7 @@ import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code;
import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.kurento.endpoint.PublisherEndpoint;
import io.openvidu.server.kurento.kms.FixedOneKmsManager;
public class CompositeWrapper {
@ -86,19 +87,27 @@ public class CompositeWrapper {
this.recorderEndpoint.record();
}
public synchronized void stopCompositeRecording(CountDownLatch stopLatch) {
this.recorderEndpoint.addStoppedListener(new EventListener<StoppedEvent>() {
@Override
public void onEvent(StoppedEvent event) {
endTime = System.currentTimeMillis();
log.info("Recording stopped event for audio-only RecorderEndpoint of Composite in session {}",
session.getSessionId());
recorderEndpoint.release();
compositeToRecorderHubPort.release();
stopLatch.countDown();
}
});
this.recorderEndpoint.stop();
public synchronized void stopCompositeRecording(CountDownLatch stopLatch, boolean forceAfterKmsRestart) {
if (!forceAfterKmsRestart) {
this.recorderEndpoint.addStoppedListener(new EventListener<StoppedEvent>() {
@Override
public void onEvent(StoppedEvent event) {
endTime = System.currentTimeMillis();
log.info("Recording stopped event for audio-only RecorderEndpoint of Composite in session {}",
session.getSessionId());
recorderEndpoint.release();
compositeToRecorderHubPort.release();
stopLatch.countDown();
}
});
this.recorderEndpoint.stop();
} else {
endTime = FixedOneKmsManager.TIME_OF_DISCONNECTION.get();
stopLatch.countDown();
log.warn("Forcing composed audio-only recording stop after KMS restart in session {}",
this.session.getSessionId());
}
}
public void connectPublisherEndpoint(PublisherEndpoint endpoint) throws OpenViduException {

View File

@ -100,10 +100,14 @@ public class ComposedRecordingService extends RecordingService {
@Override
public Recording stopRecording(Session session, Recording recording, String reason) {
return this.stopRecording(session, recording, reason, false);
}
public Recording stopRecording(Session session, Recording recording, String reason, boolean forceAfterKmsRestart) {
if (recording.hasVideo()) {
return this.stopRecordingWithVideo(session, recording, reason);
} else {
return this.stopRecordingAudioOnly(session, recording, reason);
return this.stopRecordingAudioOnly(session, recording, reason, forceAfterKmsRestart);
}
}
@ -330,7 +334,8 @@ public class ComposedRecordingService extends RecordingService {
return recording;
}
private Recording stopRecordingAudioOnly(Session session, Recording recording, String reason) {
private Recording stopRecordingAudioOnly(Session session, Recording recording, String reason,
boolean forceAfterKmsRestart) {
log.info("Stopping composed (audio-only) recording {} of session {}. Reason: {}", recording.getId(),
recording.getSessionId(), reason);
@ -349,7 +354,8 @@ public class ComposedRecordingService extends RecordingService {
CompositeWrapper compositeWrapper = this.composites.remove(sessionId);
final CountDownLatch stoppedCountDown = new CountDownLatch(1);
compositeWrapper.stopCompositeRecording(stoppedCountDown);
compositeWrapper.stopCompositeRecording(stoppedCountDown, forceAfterKmsRestart);
try {
if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) {
recording.setStatus(io.openvidu.java.client.Recording.Status.failed);

View File

@ -114,6 +114,10 @@ public class RecordingManager {
return this.sessionHandler;
}
public SessionManager getSessionManager() {
return this.sessionManager;
}
public void initializeRecordingManager() throws OpenViduException {
RecordingManager.IMAGE_TAG = openviduConfig.getOpenViduRecordingVersion();
@ -207,6 +211,21 @@ public class RecordingManager {
return recording;
}
public Recording forceStopRecording(Session session, String reason) {
Recording recording;
recording = this.sessionsRecordings.get(session.getSessionId());
switch (recording.getOutputMode()) {
case COMPOSED:
recording = this.composedRecordingService.stopRecording(session, recording, reason, true);
break;
case INDIVIDUAL:
recording = this.singleStreamRecordingService.stopRecording(session, recording, reason, true);
break;
}
this.abortAutomaticRecordingStopThread(session);
return recording;
}
public void startOneIndividualStreamRecording(Session session, String recordingId, MediaProfileSpecType profile,
Participant participant) {
Recording recording = this.sessionsRecordings.get(session.getSessionId());
@ -230,7 +249,7 @@ public class RecordingManager {
}
}
public void stopOneIndividualStreamRecording(String sessionId, String streamId) {
public void stopOneIndividualStreamRecording(String sessionId, String streamId, boolean forceAfterKmsRestart) {
Recording recording = this.sessionsRecordings.get(sessionId);
if (recording == null) {
log.error("Cannot stop recording of existing stream {}. Session {} is not being recorded", streamId,
@ -241,7 +260,7 @@ public class RecordingManager {
log.info("Stopping RecorderEndpoint in session {} for stream of participant {}", sessionId, streamId);
final CountDownLatch stoppedCountDown = new CountDownLatch(1);
this.singleStreamRecordingService.stopRecorderEndpointOfPublisherEndpoint(sessionId, streamId,
stoppedCountDown);
stoppedCountDown, forceAfterKmsRestart);
try {
if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) {
log.error("Error waiting for recorder endpoint of stream {} to stop in session {}", streamId,
@ -362,7 +381,7 @@ public class RecordingManager {
sessionManager.closeSessionAndEmptyCollections(session, "automaticStop");
sessionManager.showTokens();
} else {
this.stopRecording(null, recordingId, "automaticStop");
this.stopRecording(session, recordingId, "automaticStop");
}
} else {
// This code is reachable if there already was an automatic stop of a recording

View File

@ -56,6 +56,7 @@ import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session;
import io.openvidu.server.kurento.core.KurentoParticipant;
import io.openvidu.server.kurento.endpoint.PublisherEndpoint;
import io.openvidu.server.kurento.kms.FixedOneKmsManager;
import io.openvidu.server.recording.RecorderEndpointWrapper;
import io.openvidu.server.recording.Recording;
@ -127,7 +128,10 @@ public class SingleStreamRecordingService extends RecordingService {
@Override
public Recording stopRecording(Session session, Recording recording, String reason) {
return this.stopRecording(session, recording, reason, false);
}
public Recording stopRecording(Session session, Recording recording, String reason, boolean forceAfterKmsRestart) {
log.info("Stopping individual ({}) recording {} of session {}. Reason: {}",
recording.hasVideo() ? (recording.hasAudio() ? "video+audio" : "video-only") : "audioOnly",
recording.getId(), recording.getSessionId(), reason);
@ -137,7 +141,7 @@ public class SingleStreamRecordingService extends RecordingService {
for (RecorderEndpointWrapper wrapper : recorders.get(recording.getSessionId()).values()) {
this.stopRecorderEndpointOfPublisherEndpoint(recording.getSessionId(), wrapper.getStreamId(),
stoppedCountDown);
stoppedCountDown, forceAfterKmsRestart);
}
try {
if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) {
@ -219,10 +223,10 @@ public class SingleStreamRecordingService extends RecordingService {
}
public void stopRecorderEndpointOfPublisherEndpoint(String sessionId, String streamId,
CountDownLatch globalStopLatch) {
CountDownLatch globalStopLatch, boolean forceAfterKmsRestart) {
log.info("Stopping single stream recorder for stream {} in session {}", streamId, sessionId);
final RecorderEndpointWrapper finalWrapper = this.recorders.get(sessionId).remove(streamId);
if (finalWrapper != null) {
if (finalWrapper != null && !forceAfterKmsRestart) {
finalWrapper.getRecorder().addStoppedListener(new EventListener<StoppedEvent>() {
@Override
public void onEvent(StoppedEvent event) {
@ -235,7 +239,14 @@ public class SingleStreamRecordingService extends RecordingService {
});
finalWrapper.getRecorder().stop();
} else {
log.error("Stream {} wasn't being recorded in session {}", streamId, sessionId);
if (forceAfterKmsRestart) {
finalWrapper.setEndTime(FixedOneKmsManager.TIME_OF_DISCONNECTION.get());
generateIndividualMetadataFile(finalWrapper);
log.warn("Forcing individual recording stop after KMS restart for stream {} in session {}", streamId,
sessionId);
} else {
log.error("Stream {} wasn't being recorded in session {}", streamId, sessionId);
}
globalStopLatch.countDown();
}
}