openvidu-server: refactoring for multiple KMS support

pull/375/head
pabloFuente 2019-06-10 17:03:41 +02:00
parent 0b22c295ee
commit 883d686542
25 changed files with 308 additions and 530 deletions

View File

@ -56,12 +56,13 @@ import io.openvidu.server.core.TokenGenerator;
import io.openvidu.server.core.TokenGeneratorDefault;
import io.openvidu.server.coturn.CoturnCredentialsService;
import io.openvidu.server.coturn.CoturnCredentialsServiceFactory;
import io.openvidu.server.kurento.AutodiscoveryKurentoClientProvider;
import io.openvidu.server.kurento.KurentoClientProvider;
import io.openvidu.server.kurento.core.KurentoParticipantEndpointConfig;
import io.openvidu.server.kurento.core.KurentoSessionEventsHandler;
import io.openvidu.server.kurento.core.KurentoSessionManager;
import io.openvidu.server.kurento.kms.DummyLoadManager;
import io.openvidu.server.kurento.kms.FixedOneKmsManager;
import io.openvidu.server.kurento.kms.KmsManager;
import io.openvidu.server.kurento.kms.LoadManager;
import io.openvidu.server.recording.service.RecordingManager;
import io.openvidu.server.rpc.RpcHandler;
import io.openvidu.server.rpc.RpcNotificationService;
@ -91,8 +92,7 @@ public class OpenViduServer implements JsonRpcConfigurer {
@Bean
@ConditionalOnMissingBean
public KurentoClientProvider kmsManager() {
public KmsManager kmsManager() {
JsonParser parser = new JsonParser();
String uris = env.getProperty(KMSS_URIS_PROPERTY);
JsonElement elem = parser.parse(uris);
@ -104,14 +104,8 @@ public class OpenViduServer implements JsonRpcConfigurer {
}
String firstKmsWsUri = kmsWsUris.get(0);
if (firstKmsWsUri.equals("autodiscovery")) {
log.info("Using autodiscovery rules to locate KMS on every pipeline");
return new AutodiscoveryKurentoClientProvider();
} else {
log.info("Configuring OpenVidu Server to use first of the following kmss: " + kmsWsUris);
return new FixedOneKmsManager(firstKmsWsUri);
}
log.info("OpenVidu Server using one KMS: {}", kmsWsUris);
return new FixedOneKmsManager(firstKmsWsUri);
}
@Bean
@ -156,6 +150,12 @@ public class OpenViduServer implements JsonRpcConfigurer {
return new TokenGeneratorDefault();
}
@Bean
@ConditionalOnMissingBean
public LoadManager loadManager() {
return new DummyLoadManager();
}
@Bean
@ConditionalOnMissingBean
public OpenviduConfig openviduConfig() {
@ -275,12 +275,8 @@ public class OpenViduServer implements JsonRpcConfigurer {
@EventListener(ApplicationReadyEvent.class)
public void whenReady() {
final String NEW_LINE = System.lineSeparator();
String str = NEW_LINE +
NEW_LINE + " ACCESS IP " +
NEW_LINE + "-------------------------" +
NEW_LINE + httpUrl +
NEW_LINE + "-------------------------" +
NEW_LINE;
String str = NEW_LINE + NEW_LINE + " ACCESS IP " + NEW_LINE + "-------------------------"
+ NEW_LINE + httpUrl + NEW_LINE + "-------------------------" + NEW_LINE;
log.info(str);
}

View File

@ -19,7 +19,7 @@ package io.openvidu.server.core;
import com.google.gson.JsonObject;
import io.openvidu.server.kurento.KurentoFilter;
import io.openvidu.server.kurento.endpoint.KurentoFilter;
public class MediaOptions {

View File

@ -40,8 +40,8 @@ import io.openvidu.java.client.OpenViduRole;
import io.openvidu.server.cdr.CallDetailRecord;
import io.openvidu.server.config.InfoHandler;
import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.kurento.KurentoFilter;
import io.openvidu.server.kurento.core.KurentoParticipant;
import io.openvidu.server.kurento.endpoint.KurentoFilter;
import io.openvidu.server.recording.Recording;
import io.openvidu.server.rpc.RpcNotificationService;

View File

@ -1,40 +0,0 @@
/*
* (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.openvidu.server.kurento;
import org.kurento.client.KurentoClient;
import org.kurento.client.Properties;
import io.openvidu.client.OpenViduException;
public class AutodiscoveryKurentoClientProvider implements KurentoClientProvider {
private static final int ROOM_PIPELINE_LOAD_POINTS = 50;
@Override
public KurentoClient getKurentoClient(KurentoClientSessionInfo sessionInfo) throws OpenViduException {
return KurentoClient.create(Properties.of("loadPoints", ROOM_PIPELINE_LOAD_POINTS));
}
@Override
public boolean destroyWhenUnused() {
return true;
}
}

View File

@ -1,46 +0,0 @@
/*
* (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.openvidu.server.kurento;
import org.kurento.client.KurentoClient;
import io.openvidu.client.OpenViduException;
/**
* This service interface was designed so that the room manager could obtain a {@link KurentoClient}
* instance at any time, without requiring knowledge about the placement of the media server
* instances. It is left for the developer to provide an implementation for this API.
*
* @author Pablo Fuente (pablofuenteperez@gmail.com)
*/
public interface KurentoClientProvider {
/**
* Obtains a {@link KurentoClient} instance given the custom session bean. Normally, it'd be
* called during a room's instantiation.
*
* @param sessionInfo
* custom information object required by the implementors of this interface
* @return the {@link KurentoClient} instance
* @throws OpenViduException
* in case there is an error obtaining a {@link KurentoClient} instance
*/
KurentoClient getKurentoClient(KurentoClientSessionInfo sessionInfo) throws OpenViduException;
boolean destroyWhenUnused();
}

View File

@ -1,54 +0,0 @@
/*
* (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.openvidu.server.kurento;
/**
* Implementation of the session info interface, contains a participant's
* private id and the session's id.
*
* @author Pablo Fuente (pablofuenteperez@gmail.com)
*
*/
public class OpenViduKurentoClientSessionInfo implements KurentoClientSessionInfo {
private String participantPrivateId;
private String sessionId;
public OpenViduKurentoClientSessionInfo(String participantPrivateId, String roomName) {
super();
this.participantPrivateId = participantPrivateId;
this.sessionId = roomName;
}
public String getParticipantPrivateId() {
return participantPrivateId;
}
public void setParticipantPrivateId(String participantPrivateId) {
this.participantPrivateId = participantPrivateId;
}
@Override
public String getRoomName() {
return sessionId;
}
public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
}

View File

@ -21,7 +21,7 @@ import org.kurento.client.MediaElement;
import org.kurento.client.MediaType;
import io.openvidu.server.core.MediaOptions;
import io.openvidu.server.kurento.KurentoFilter;
import io.openvidu.server.kurento.endpoint.KurentoFilter;
public class KurentoMediaOptions extends MediaOptions {

View File

@ -372,8 +372,7 @@ public class KurentoParticipant extends Participant {
if (this.openviduConfig.isRecordingModuleEnabled()
&& this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) {
this.recordingManager.stopOneIndividualStreamRecording(session.getSessionId(),
this.getPublisherStreamId(), false);
this.recordingManager.stopOneIndividualStreamRecording(session, this.getPublisherStreamId());
}
publisher.unregisterErrorListeners();

View File

@ -25,7 +25,6 @@ import org.kurento.client.Continuation;
import org.kurento.client.ErrorEvent;
import org.kurento.client.EventListener;
import org.kurento.client.IceCandidate;
import org.kurento.client.KurentoClient;
import org.kurento.client.MediaPipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,6 +37,7 @@ import io.openvidu.java.client.Recording.OutputMode;
import io.openvidu.server.core.EndReason;
import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session;
import io.openvidu.server.kurento.kms.Kms;
import io.openvidu.server.recording.Recording;
/**
@ -51,7 +51,7 @@ public class KurentoSession extends Session {
private MediaPipeline pipeline;
private CountDownLatch pipelineLatch = new CountDownLatch(1);
private KurentoClient kurentoClient;
private Kms kms;
private KurentoSessionEventsHandler kurentoSessionHandler;
private KurentoParticipantEndpointConfig kurentoEndpointConfig;
@ -63,11 +63,10 @@ public class KurentoSession extends Session {
public final ConcurrentHashMap<String, String> publishedStreamIds = new ConcurrentHashMap<>();
public KurentoSession(Session sessionNotActive, KurentoClient kurentoClient,
KurentoSessionEventsHandler kurentoSessionHandler, KurentoParticipantEndpointConfig kurentoEndpointConfig,
boolean destroyKurentoClient) {
public KurentoSession(Session sessionNotActive, Kms kms, KurentoSessionEventsHandler kurentoSessionHandler,
KurentoParticipantEndpointConfig kurentoEndpointConfig, boolean destroyKurentoClient) {
super(sessionNotActive);
this.kurentoClient = kurentoClient;
this.kms = kms;
this.destroyKurentoClient = destroyKurentoClient;
this.kurentoSessionHandler = kurentoSessionHandler;
this.kurentoEndpointConfig = kurentoEndpointConfig;
@ -158,7 +157,7 @@ public class KurentoSession extends Session {
log.debug("Session {} closed", this.sessionId);
if (destroyKurentoClient) {
kurentoClient.destroy();
kms.getKurentoClient().destroy();
}
this.closed = true;
@ -192,6 +191,10 @@ public class KurentoSession extends Session {
}
}
public Kms getKms() {
return this.kms;
}
public MediaPipeline getPipeline() {
try {
pipelineLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS);
@ -208,7 +211,7 @@ public class KurentoSession extends Session {
}
log.info("SESSION {}: Creating MediaPipeline", sessionId);
try {
kurentoClient.createMediaPipeline(new Continuation<MediaPipeline>() {
kms.getKurentoClient().createMediaPipeline(new Continuation<MediaPipeline>() {
@Override
public void onSuccess(MediaPipeline result) throws Exception {
pipeline = result;

View File

@ -24,7 +24,6 @@ import java.util.Set;
import org.kurento.client.GenericMediaElement;
import org.kurento.client.IceCandidate;
import org.kurento.client.KurentoClient;
import org.kurento.client.ListenerSubscription;
import org.kurento.jsonrpc.Props;
import org.kurento.jsonrpc.message.Request;
@ -50,12 +49,11 @@ import io.openvidu.server.core.MediaOptions;
import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session;
import io.openvidu.server.core.SessionManager;
import io.openvidu.server.kurento.KurentoClientProvider;
import io.openvidu.server.kurento.KurentoClientSessionInfo;
import io.openvidu.server.kurento.KurentoFilter;
import io.openvidu.server.kurento.OpenViduKurentoClientSessionInfo;
import io.openvidu.server.kurento.endpoint.KurentoFilter;
import io.openvidu.server.kurento.endpoint.PublisherEndpoint;
import io.openvidu.server.kurento.endpoint.SdpType;
import io.openvidu.server.kurento.kms.Kms;
import io.openvidu.server.kurento.kms.KmsManager;
import io.openvidu.server.rpc.RpcHandler;
import io.openvidu.server.utils.JsonUtils;
@ -64,7 +62,7 @@ public class KurentoSessionManager extends SessionManager {
private static final Logger log = LoggerFactory.getLogger(KurentoSessionManager.class);
@Autowired
private KurentoClientProvider kcProvider;
private KmsManager kmsManager;
@Autowired
private KurentoSessionEventsHandler kurentoSessionEventsHandler;
@ -72,18 +70,14 @@ public class KurentoSessionManager extends SessionManager {
@Autowired
private KurentoParticipantEndpointConfig kurentoEndpointConfig;
private KurentoClient kurentoClient;
@Override
public synchronized void joinRoom(Participant participant, String sessionId, Integer transactionId) {
Set<Participant> existingParticipants = null;
try {
KurentoClientSessionInfo kcSessionInfo = new OpenViduKurentoClientSessionInfo(
participant.getParticipantPrivateId(), sessionId);
KurentoSession kSession = (KurentoSession) sessions.get(sessionId);
if (kSession == null && kcSessionInfo != null) {
if (kSession == null) {
// First user connecting to the session
Session sessionNotActive = sessionsNotActive.remove(sessionId);
@ -96,20 +90,18 @@ public class KurentoSessionManager extends SessionManager {
openviduConfig, recordingManager);
}
createSession(sessionNotActive, kcSessionInfo);
}
kSession = (KurentoSession) sessions.get(sessionId);
if (kSession == null) {
log.warn("Session '{}' not found");
throw new OpenViduException(Code.ROOM_NOT_FOUND_ERROR_CODE, "Session '" + sessionId
+ "' was not found, must be created before '" + sessionId + "' can join");
Kms lessLoadedKms = this.kmsManager.getLessLoadedKms();
log.info("KMS less loaded is {} with a load of {}", lessLoadedKms.getUri(), lessLoadedKms.getLoad());
kSession = createSession(sessionNotActive, lessLoadedKms);
}
if (kSession.isClosed()) {
log.warn("'{}' is trying to join session '{}' but it is closing", participant.getParticipantPublicId(),
sessionId);
throw new OpenViduException(Code.ROOM_CLOSED_ERROR_CODE, "'" + participant.getParticipantPublicId()
+ "' is trying to join session '" + sessionId + "' but it is closing");
}
existingParticipants = getParticipants(sessionId);
kSession.join(participant);
} catch (OpenViduException e) {
@ -492,38 +484,29 @@ public class KurentoSessionManager extends SessionManager {
}
/**
* Creates a session if it doesn't already exist. The session's id will be
* indicated by the session info bean.
* Creates a session with the already existing not-active session in the
* indicated KMS, if it doesn't already exist
*
* @param kcSessionInfo bean that will be passed to the
* {@link KurentoClientProvider} in order to obtain the
* {@link KurentoClient} that will be used by the room
* @throws OpenViduException in case of error while creating the session
*/
public void createSession(Session sessionNotActive, KurentoClientSessionInfo kcSessionInfo)
throws OpenViduException {
String sessionId = kcSessionInfo.getRoomName();
KurentoSession session = (KurentoSession) sessions.get(sessionId);
public KurentoSession createSession(Session sessionNotActive, Kms kms) throws OpenViduException {
KurentoSession session = (KurentoSession) sessions.get(sessionNotActive.getSessionId());
if (session != null) {
throw new OpenViduException(Code.ROOM_CANNOT_BE_CREATED_ERROR_CODE,
"Session '" + sessionId + "' already exists");
"Session '" + session.getSessionId() + "' already exists");
}
this.kurentoClient = kcProvider.getKurentoClient(kcSessionInfo);
session = new KurentoSession(sessionNotActive, kurentoClient, kurentoSessionEventsHandler,
kurentoEndpointConfig, kcProvider.destroyWhenUnused());
session = new KurentoSession(sessionNotActive, kms, kurentoSessionEventsHandler, kurentoEndpointConfig,
kmsManager.destroyWhenUnused());
KurentoSession oldSession = (KurentoSession) sessions.putIfAbsent(sessionId, session);
KurentoSession oldSession = (KurentoSession) sessions.putIfAbsent(session.getSessionId(), session);
if (oldSession != null) {
log.warn("Session '{}' has just been created by another thread", sessionId);
return;
log.warn("Session '{}' has just been created by another thread", session.getSessionId());
return oldSession;
}
String kcName = "[NAME NOT AVAILABLE]";
if (kurentoClient.getServerManager() != null) {
kcName = kurentoClient.getServerManager().getName();
}
log.warn("No session '{}' exists yet. Created one using KurentoClient '{}'.", sessionId, kcName);
log.warn("No session '{}' exists yet. Created one on KMS '{}'", session.getSessionId(), kms.getUri());
sessionEventsHandler.onSessionCreated(session);
return session;
}
@Override
@ -852,6 +835,10 @@ public class KurentoSessionManager extends SessionManager {
return ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId);
}
public KmsManager getKmsManager() {
return this.kmsManager;
}
private void applyFilterInPublisher(KurentoParticipant kParticipant, KurentoFilter filter)
throws OpenViduException {
GenericMediaElement.Builder builder = new GenericMediaElement.Builder(kParticipant.getPipeline(),

View File

@ -15,7 +15,7 @@
*
*/
package io.openvidu.server.kurento;
package io.openvidu.server.kurento.endpoint;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;

View File

@ -38,7 +38,6 @@ import org.kurento.client.WebRtcEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;

View File

@ -46,7 +46,6 @@ import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code;
import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.core.MediaOptions;
import io.openvidu.server.kurento.TrackType;
import io.openvidu.server.kurento.core.KurentoParticipant;
import io.openvidu.server.utils.JsonUtils;

View File

@ -15,8 +15,8 @@
*
*/
package io.openvidu.server.kurento;
package io.openvidu.server.kurento.endpoint;
public enum TrackType {
ALL, VIDEO, AUDIO;
ALL, VIDEO, AUDIO;
}

View File

@ -15,19 +15,18 @@
*
*/
package io.openvidu.server.kurento;
package io.openvidu.server.kurento.kms;
import org.kurento.client.KurentoClient;
public class DummyLoadManager implements LoadManager {
@Override
public double calculateLoad(Kms kms) {
return 1;
}
@Override
public boolean allowMoreElements(Kms kms) {
return true;
}
/**
* Interface for beans holding information required to obtain a {@link KurentoClient}.
*
* @author <a href="mailto:rvlad@naevatec.com">Radu Tom Vlad</a>
*
*/
public interface KurentoClientSessionInfo {
/**
* @return the room's name (or id) for whom a {@link KurentoClient} will be needed
*/
public String getRoomName();
}

View File

@ -17,72 +17,62 @@
package io.openvidu.server.kurento.kms;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.kurento.client.KurentoClient;
import org.kurento.client.KurentoConnectionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import io.openvidu.server.core.SessionManager;
import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.kurento.core.KurentoSessionManager;
public class FixedOneKmsManager extends KmsManager {
private static final Logger log = LoggerFactory.getLogger(FixedOneKmsManager.class);
@Autowired
SessionManager sessionManager;
public static final AtomicBoolean CONNECTED_TO_KMS = new AtomicBoolean(false);
public static final AtomicLong TIME_OF_DISCONNECTION = new AtomicLong(0);
public FixedOneKmsManager(String kmsWsUri) {
this(kmsWsUri, 1);
KurentoClient kClient = KurentoClient.create(kmsWsUri, new KurentoConnectionListener() {
@Override
public void reconnected(boolean isReconnected) {
((KurentoSessionManager) sessionManager).getKmsManager().setKurentoClientConnectedToKms(kmsWsUri, true);
if (!isReconnected) {
// Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints)
log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", kmsWsUri);
log.warn("Updating all webrtc endpoints for active sessions");
sessionManager.getSessions().forEach(s -> {
((KurentoSession) s).restartStatusInKurento();
});
} else {
// Same KMS. We may infer that openvidu-server/KMS connection has been lost, but
// not the clients/KMS connections
log.warn("Kurento Client reconnected to same KMS with uri {}", kmsWsUri);
}
}
@Override
public void disconnected() {
((KurentoSessionManager) sessionManager).getKmsManager().setKurentoClientConnectedToKms(kmsWsUri,
false);
((KurentoSessionManager) sessionManager).getKmsManager().setTimeOfKurentoClientDisconnection(kmsWsUri,
System.currentTimeMillis());
log.warn("Kurento Client disconnected from KMS with uri {}", kmsWsUri);
}
@Override
public void connectionFailed() {
((KurentoSessionManager) sessionManager).getKmsManager().setKurentoClientConnectedToKms(kmsWsUri,
false);
log.warn("Kurento Client failed connecting to KMS with uri {}", kmsWsUri);
}
@Override
public void connected() {
((KurentoSessionManager) sessionManager).getKmsManager().setKurentoClientConnectedToKms(kmsWsUri, true);
log.warn("Kurento Client is now connected to KMS with uri {}", kmsWsUri);
}
});
this.addKms(new Kms(kmsWsUri, kClient, loadManager));
}
public FixedOneKmsManager(String kmsWsUri, int numKmss) {
for (int i = 0; i < numKmss; i++) {
this.addKms(new Kms(KurentoClient.create(kmsWsUri, new KurentoConnectionListener() {
@Override
public void reconnected(boolean isReconnected) {
CONNECTED_TO_KMS.compareAndSet(false, true);
if (!isReconnected) {
// Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints)
log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", kmsWsUri);
log.warn("Updating all webrtc endpoints for active sessions");
sessionManager.getSessions().forEach(s -> {
((KurentoSession) s).restartStatusInKurento();
});
} else {
// Same KMS. We can infer that openvidu-server/KMS connection has been lost, but
// not the clients/KMS connections
log.warn("Kurento Client reconnected to same KMS with uri {}", kmsWsUri);
}
}
@Override
public void disconnected() {
CONNECTED_TO_KMS.compareAndSet(true, false);
TIME_OF_DISCONNECTION.set(System.currentTimeMillis());
log.warn("Kurento Client disconnected from KMS with uri {}", kmsWsUri);
}
@Override
public void connectionFailed() {
CONNECTED_TO_KMS.set(false);
log.warn("Kurento Client failed connecting to KMS with uri {}", kmsWsUri);
}
@Override
public void connected() {
CONNECTED_TO_KMS.compareAndSet(false, true);
log.warn("Kurento Client is now connected to KMS with uri {}", kmsWsUri);
}
}), kmsWsUri));
}
}
}

View File

@ -17,36 +17,67 @@
package io.openvidu.server.kurento.kms;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.kurento.client.KurentoClient;
/**
* Abstraction of a KMS instance: an object of this class corresponds to a KMS
* process running somewhere.
*
* It is uniquely identified by the KMS ws URI endpoint. It encapsulates the
* WebSocket client to communicate openvidu-server Java process with it and has
* a specific LoadManager service to calculate the load of this KMS based on
* different measures
*
* @author Pablo Fuente (pablofuenteperez@gmail.com)
*/
public class Kms {
private LoadManager loadManager = new MaxWebRtcLoadManager(10000);
private KurentoClient client;
private String kmsUri;
private String kmsUri;
private KurentoClient client;
private LoadManager loadManager;
public Kms(KurentoClient client, String kmsUri) {
this.client = client;
this.kmsUri = kmsUri;
}
private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false);
private AtomicLong timeOfKurentoClientDisconnection = new AtomicLong(0);
public void setLoadManager(LoadManager loadManager) {
this.loadManager = loadManager;
}
public Kms(String kmsUri, KurentoClient client, LoadManager loadManager) {
this.kmsUri = kmsUri;
this.client = client;
this.loadManager = loadManager;
}
public double getLoad() {
return loadManager.calculateLoad(this);
}
public String getUri() {
return kmsUri;
}
public boolean allowMoreElements() {
return loadManager.allowMoreElements(this);
}
public KurentoClient getKurentoClient() {
return this.client;
}
public String getUri() {
return kmsUri;
}
public double getLoad() {
return loadManager.calculateLoad(this);
}
public boolean allowMoreElements() {
return loadManager.allowMoreElements(this);
}
public boolean isKurentoClientConnected() {
return this.isKurentoClientConnected.get();
}
public void setKurentoClientConnected(boolean isConnected) {
this.isKurentoClientConnected.set(isConnected);
}
public long getTimeOfKurentoClientDisconnection() {
return this.timeOfKurentoClientDisconnection.get();
}
public void setTimeOfKurentoClientDisconnection(long time) {
this.timeOfKurentoClientDisconnection.set(time);
}
public KurentoClient getKurentoClient() {
return this.client;
}
}

View File

@ -21,105 +21,129 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.kurento.client.KurentoClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code;
import io.openvidu.server.kurento.KurentoClientProvider;
import io.openvidu.server.kurento.KurentoClientSessionInfo;
import io.openvidu.server.kurento.OpenViduKurentoClientSessionInfo;
import io.openvidu.server.core.SessionManager;
public abstract class KmsManager implements KurentoClientProvider {
@Service
public abstract class KmsManager {
public static class KmsLoad implements Comparable<KmsLoad> {
public class KmsLoad implements Comparable<KmsLoad> {
private Kms kms;
private double load;
private Kms kms;
private double load;
public KmsLoad(Kms kms, double load) {
this.kms = kms;
this.load = load;
}
public KmsLoad(Kms kms, double load) {
this.kms = kms;
this.load = load;
}
public Kms getKms() {
return kms;
}
public Kms getKms() {
return kms;
}
public double getLoad() {
return load;
}
public double getLoad() {
return load;
}
@Override
public int compareTo(KmsLoad o) {
return Double.compare(this.load, o.load);
}
}
@Override
public int compareTo(KmsLoad o) {
return Double.compare(this.load, o.load);
}
}
private final Logger log = LoggerFactory.getLogger(KmsManager.class);
@Autowired
protected SessionManager sessionManager;
private List<Kms> kmss = new ArrayList<Kms>();
private Iterator<Kms> usageIterator = null;
@Autowired
protected LoadManager loadManager;
@Override
public KurentoClient getKurentoClient(KurentoClientSessionInfo sessionInfo) throws OpenViduException {
if (!(sessionInfo instanceof OpenViduKurentoClientSessionInfo)) {
throw new OpenViduException(Code.GENERIC_ERROR_CODE, "Unkown session info bean type (expected "
+ OpenViduKurentoClientSessionInfo.class.getName() + ")");
}
return getKms((OpenViduKurentoClientSessionInfo) sessionInfo).getKurentoClient();
}
private final Logger log = LoggerFactory.getLogger(KmsManager.class);
/**
* Returns a {@link Kms} using a round-robin strategy.
*
* @param sessionInfo
* session's id
*/
public synchronized Kms getKms(OpenViduKurentoClientSessionInfo sessionInfo) {
if (usageIterator == null || !usageIterator.hasNext()) {
usageIterator = kmss.iterator();
}
return usageIterator.next();
}
// Using KMS websocket uris as unique identifiers
private Map<String, Kms> kmss = new ConcurrentHashMap<>();
public synchronized void addKms(Kms kms) {
this.kmss.add(kms);
}
private Iterator<Kms> usageIterator = null;
public synchronized Kms getLessLoadedKms() {
return Collections.min(getKmsLoads()).kms;
}
public synchronized void addKms(Kms kms) {
this.kmss.put(kms.getUri(), kms);
}
public synchronized Kms getNextLessLoadedKms() {
List<KmsLoad> sortedLoads = getKmssSortedByLoad();
if (sortedLoads.size() > 1) {
return sortedLoads.get(1).kms;
} else {
return sortedLoads.get(0).kms;
}
}
public synchronized void removeKms(Kms kms) {
this.kmss.remove(kms.getUri());
}
public synchronized List<KmsLoad> getKmssSortedByLoad() {
List<KmsLoad> kmsLoads = getKmsLoads();
Collections.sort(kmsLoads);
return kmsLoads;
}
public synchronized Kms getKms(String sessionId) {
if (usageIterator == null || !usageIterator.hasNext()) {
usageIterator = kmss.values().iterator();
}
return usageIterator.next();
}
private List<KmsLoad> getKmsLoads() {
ArrayList<KmsLoad> kmsLoads = new ArrayList<>();
for (Kms kms : kmss) {
double load = kms.getLoad();
kmsLoads.add(new KmsLoad(kms, load));
log.trace("Calc load {} for kms: {}", load, kms.getUri());
}
return kmsLoads;
}
/**
* Returns a {@link Kms} using a round-robin strategy.
*
* @param sessionInfo session's id
*/
public synchronized Kms getKmsRoundRobin() {
if (usageIterator == null || !usageIterator.hasNext()) {
usageIterator = kmss.values().iterator();
}
return usageIterator.next();
}
@Override
public boolean destroyWhenUnused() {
return false;
}
public synchronized Kms getLessLoadedKms() {
return Collections.min(getKmsLoads()).kms;
}
public synchronized Kms getNextLessLoadedKms() {
List<KmsLoad> sortedLoads = getKmssSortedByLoad();
if (sortedLoads.size() > 1) {
return sortedLoads.get(1).kms;
} else {
return sortedLoads.get(0).kms;
}
}
public synchronized List<KmsLoad> getKmssSortedByLoad() {
List<KmsLoad> kmsLoads = getKmsLoads();
Collections.sort(kmsLoads);
return kmsLoads;
}
public boolean isKurentoClientConnectedToKms(Kms kms) {
return this.kmss.get(kms.getUri()).isKurentoClientConnected();
}
public long getTimeOfKurentoClientDisconnection(Kms kms) {
return this.kmss.get(kms.getUri()).getTimeOfKurentoClientDisconnection();
}
public void setKurentoClientConnectedToKms(String kmsUri, boolean isConnected) {
this.kmss.get(kmsUri).setKurentoClientConnected(isConnected);
}
public void setTimeOfKurentoClientDisconnection(String kmsUri, long time) {
this.kmss.get(kmsUri).setTimeOfKurentoClientDisconnection(time);
}
private List<KmsLoad> getKmsLoads() {
ArrayList<KmsLoad> kmsLoads = new ArrayList<>();
for (Kms kms : kmss.values()) {
double load = kms.getLoad();
kmsLoads.add(new KmsLoad(kms, load));
log.trace("Calc load {} for kms: {}", load, kms.getUri());
}
return kmsLoads;
}
public boolean destroyWhenUnused() {
return false;
}
}

View File

@ -17,6 +17,9 @@
package io.openvidu.server.kurento.kms;
import org.springframework.stereotype.Service;
@Service
public interface LoadManager {
double calculateLoad(Kms kms);

View File

@ -1,55 +0,0 @@
/*
* (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.openvidu.server.kurento.kms;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MaxWebRtcLoadManager implements LoadManager {
private static final Logger log = LoggerFactory.getLogger(MaxWebRtcLoadManager.class);
private int maxWebRtcPerKms;
public MaxWebRtcLoadManager(int maxWebRtcPerKms) {
this.maxWebRtcPerKms = maxWebRtcPerKms;
}
@Override
public double calculateLoad(Kms kms) {
int numWebRtcs = countWebRtcEndpoints(kms);
if (numWebRtcs > maxWebRtcPerKms) {
return 1;
} else {
return numWebRtcs / (double) maxWebRtcPerKms;
}
}
@Override
public boolean allowMoreElements(Kms kms) {
return countWebRtcEndpoints(kms) < maxWebRtcPerKms;
}
private synchronized int countWebRtcEndpoints(Kms kms) {
try {
return kms.getKurentoClient().getServerManager().getPipelines().size();
} catch (Throwable e) {
log.warn("Error counting KurentoClient pipelines", e);
return 0;
}
}
}

View File

@ -38,7 +38,6 @@ import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code;
import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.kurento.endpoint.PublisherEndpoint;
import io.openvidu.server.kurento.kms.FixedOneKmsManager;
public class CompositeWrapper {
@ -87,8 +86,8 @@ public class CompositeWrapper {
this.recorderEndpoint.record();
}
public synchronized void stopCompositeRecording(CountDownLatch stopLatch, boolean forceAfterKmsRestart) {
if (!forceAfterKmsRestart) {
public synchronized void stopCompositeRecording(CountDownLatch stopLatch, Long timeOfKmsDisconnection) {
if (timeOfKmsDisconnection == 0) {
this.recorderEndpoint.addStoppedListener(new EventListener<StoppedEvent>() {
@Override
public void onEvent(StoppedEvent event) {
@ -102,7 +101,7 @@ public class CompositeWrapper {
});
this.recorderEndpoint.stop();
} else {
endTime = FixedOneKmsManager.TIME_OF_DISCONNECTION.get();
endTime = timeOfKmsDisconnection;
stopLatch.countDown();
log.warn("Forcing composed audio-only recording stop after KMS restart in session {}",
this.session.getSessionId());

View File

@ -93,15 +93,10 @@ public class ComposedRecordingService extends RecordingService {
@Override
public Recording stopRecording(Session session, Recording recording, EndReason reason) {
return this.stopRecording(session, recording, reason, false);
}
public Recording stopRecording(Session session, Recording recording, EndReason reason,
boolean forceAfterKmsRestart) {
if (recording.hasVideo()) {
return this.stopRecordingWithVideo(session, recording, reason);
} else {
return this.stopRecordingAudioOnly(session, recording, reason, forceAfterKmsRestart);
return this.stopRecordingAudioOnly(session, recording, reason);
}
}
@ -331,8 +326,7 @@ public class ComposedRecordingService extends RecordingService {
return recording;
}
private Recording stopRecordingAudioOnly(Session session, Recording recording, EndReason reason,
boolean forceAfterKmsRestart) {
private Recording stopRecordingAudioOnly(Session session, Recording recording, EndReason reason) {
log.info("Stopping composed (audio-only) recording {} of session {}. Reason: {}", recording.getId(),
recording.getSessionId(), reason);
@ -349,10 +343,9 @@ public class ComposedRecordingService extends RecordingService {
}
CompositeWrapper compositeWrapper = this.composites.remove(sessionId);
final CountDownLatch stoppedCountDown = new CountDownLatch(1);
compositeWrapper.stopCompositeRecording(stoppedCountDown, ((KurentoSession)session).getKms().getTimeOfKurentoClientDisconnection());
compositeWrapper.stopCompositeRecording(stoppedCountDown, forceAfterKmsRestart);
try {
if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) {
recording.setStatus(io.openvidu.java.client.Recording.Status.failed);

View File

@ -62,9 +62,8 @@ import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session;
import io.openvidu.server.core.SessionEventsHandler;
import io.openvidu.server.core.SessionManager;
import io.openvidu.server.kurento.KurentoClientProvider;
import io.openvidu.server.kurento.KurentoClientSessionInfo;
import io.openvidu.server.kurento.OpenViduKurentoClientSessionInfo;
import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.kurento.kms.KmsManager;
import io.openvidu.server.recording.Recording;
import io.openvidu.server.utils.CustomFileManager;
import io.openvidu.server.utils.DockerManager;
@ -89,7 +88,7 @@ public class RecordingManager {
protected OpenviduConfig openviduConfig;
@Autowired
private KurentoClientProvider kcProvider;
private KmsManager kmsManager;
protected Map<String, Recording> startingRecordings = new ConcurrentHashMap<>();
protected Map<String, Recording> startedRecordings = new ConcurrentHashMap<>();
@ -221,10 +220,10 @@ public class RecordingManager {
recording = this.sessionsRecordings.get(session.getSessionId());
switch (recording.getOutputMode()) {
case COMPOSED:
recording = this.composedRecordingService.stopRecording(session, recording, reason, true);
recording = this.composedRecordingService.stopRecording(session, recording, reason);
break;
case INDIVIDUAL:
recording = this.singleStreamRecordingService.stopRecording(session, recording, reason, true);
recording = this.singleStreamRecordingService.stopRecording(session, recording, reason);
break;
}
this.abortAutomaticRecordingStopThread(session);
@ -254,22 +253,23 @@ public class RecordingManager {
}
}
public void stopOneIndividualStreamRecording(String sessionId, String streamId, boolean forceAfterKmsRestart) {
Recording recording = this.sessionsRecordings.get(sessionId);
public void stopOneIndividualStreamRecording(KurentoSession session, String streamId) {
Recording recording = this.sessionsRecordings.get(session.getSessionId());
if (recording == null) {
log.error("Cannot stop recording of existing stream {}. Session {} is not being recorded", streamId,
sessionId);
session.getSessionId());
}
if (io.openvidu.java.client.Recording.OutputMode.INDIVIDUAL.equals(recording.getOutputMode())) {
// Stop specific RecorderEndpoint for this stream
log.info("Stopping RecorderEndpoint in session {} for stream of participant {}", sessionId, streamId);
log.info("Stopping RecorderEndpoint in session {} for stream of participant {}", session.getSessionId(),
streamId);
final CountDownLatch stoppedCountDown = new CountDownLatch(1);
this.singleStreamRecordingService.stopRecorderEndpointOfPublisherEndpoint(sessionId, streamId,
stoppedCountDown, forceAfterKmsRestart);
this.singleStreamRecordingService.stopRecorderEndpointOfPublisherEndpoint(session.getSessionId(), streamId,
stoppedCountDown, session.getKms().getTimeOfKurentoClientDisconnection());
try {
if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) {
log.error("Error waiting for recorder endpoint of stream {} to stop in session {}", streamId,
sessionId);
session.getSessionId());
}
} catch (InterruptedException e) {
log.error("Exception while waiting for state change", e);
@ -277,9 +277,9 @@ public class RecordingManager {
} else if (io.openvidu.java.client.Recording.OutputMode.COMPOSED.equals(recording.getOutputMode())
&& !recording.hasVideo()) {
// Disconnect this stream from existing Composite recorder
log.info("Removing PublisherEndpoint from Composite in session {} for stream of participant {}", sessionId,
streamId);
this.composedRecordingService.removePublisherEndpointFromComposite(sessionId, streamId);
log.info("Removing PublisherEndpoint from Composite in session {} for stream of participant {}",
session.getSessionId(), streamId);
this.composedRecordingService.removePublisherEndpointFromComposite(session.getSessionId(), streamId);
}
}
@ -536,9 +536,7 @@ public class RecordingManager {
final String testFilePath = testFolderPath + "/TEST_RECORDING_PATH.webm";
// Check Kurento Media Server write permissions in recording path
KurentoClientSessionInfo kcSessionInfo = new OpenViduKurentoClientSessionInfo("TEST_RECORDING_PATH",
"TEST_RECORDING_PATH");
MediaPipeline pipeline = this.kcProvider.getKurentoClient(kcSessionInfo).createMediaPipeline();
MediaPipeline pipeline = this.kmsManager.getLessLoadedKms().getKurentoClient().createMediaPipeline();
RecorderEndpoint recorder = new RecorderEndpoint.Builder(pipeline, "file://" + testFilePath).build();
final AtomicBoolean kurentoRecorderError = new AtomicBoolean(false);

View File

@ -56,8 +56,8 @@ import io.openvidu.server.core.EndReason;
import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session;
import io.openvidu.server.kurento.core.KurentoParticipant;
import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.kurento.endpoint.PublisherEndpoint;
import io.openvidu.server.kurento.kms.FixedOneKmsManager;
import io.openvidu.server.recording.RecorderEndpointWrapper;
import io.openvidu.server.recording.Recording;
@ -129,11 +129,6 @@ public class SingleStreamRecordingService extends RecordingService {
@Override
public Recording stopRecording(Session session, Recording recording, EndReason reason) {
return this.stopRecording(session, recording, reason, false);
}
public Recording stopRecording(Session session, Recording recording, EndReason reason,
boolean forceAfterKmsRestart) {
log.info("Stopping individual ({}) recording {} of session {}. Reason: {}",
recording.hasVideo() ? (recording.hasAudio() ? "video+audio" : "video-only") : "audioOnly",
recording.getId(), recording.getSessionId(), reason);
@ -141,9 +136,11 @@ public class SingleStreamRecordingService extends RecordingService {
final int numberOfActiveRecorders = recorders.get(recording.getSessionId()).size();
final CountDownLatch stoppedCountDown = new CountDownLatch(numberOfActiveRecorders);
final long timeOfKurentoClientDisconnection = ((KurentoSession) session).getKms()
.getTimeOfKurentoClientDisconnection();
for (RecorderEndpointWrapper wrapper : recorders.get(recording.getSessionId()).values()) {
this.stopRecorderEndpointOfPublisherEndpoint(recording.getSessionId(), wrapper.getStreamId(),
stoppedCountDown, forceAfterKmsRestart);
stoppedCountDown, timeOfKurentoClientDisconnection);
}
try {
if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) {
@ -225,10 +222,10 @@ public class SingleStreamRecordingService extends RecordingService {
}
public void stopRecorderEndpointOfPublisherEndpoint(String sessionId, String streamId,
CountDownLatch globalStopLatch, boolean forceAfterKmsRestart) {
CountDownLatch globalStopLatch, Long kmsDisconnectionTime) {
log.info("Stopping single stream recorder for stream {} in session {}", streamId, sessionId);
final RecorderEndpointWrapper finalWrapper = this.recorders.get(sessionId).remove(streamId);
if (finalWrapper != null && !forceAfterKmsRestart) {
if (finalWrapper != null && kmsDisconnectionTime == 0) {
finalWrapper.getRecorder().addStoppedListener(new EventListener<StoppedEvent>() {
@Override
public void onEvent(StoppedEvent event) {
@ -241,8 +238,8 @@ public class SingleStreamRecordingService extends RecordingService {
});
finalWrapper.getRecorder().stop();
} else {
if (forceAfterKmsRestart) {
finalWrapper.setEndTime(FixedOneKmsManager.TIME_OF_DISCONNECTION.get());
if (kmsDisconnectionTime != 0) {
finalWrapper.setEndTime(kmsDisconnectionTime);
generateIndividualMetadataFile(finalWrapper);
log.warn("Forcing individual recording stop after KMS restart for stream {} in session {}", streamId,
sessionId);

View File

@ -16,41 +16,9 @@
package io.openvidu.server.test.core;
import static org.junit.matchers.JUnitMatchers.containsString;
import static org.junit.matchers.JUnitMatchers.hasItem;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.whenNew;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@ -62,9 +30,7 @@ import org.kurento.client.ErrorEvent;
import org.kurento.client.EventListener;
import org.kurento.client.FaceOverlayFilter;
import org.kurento.client.HubPort;
import org.kurento.client.IceCandidate;
import org.kurento.client.KurentoClient;
import org.kurento.client.MediaElement;
import org.kurento.client.MediaPipeline;
import org.kurento.client.MediaType;
import org.kurento.client.Mixer;
@ -75,25 +41,15 @@ import org.kurento.client.ServerManager;
import org.kurento.client.WebRtcEndpoint;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.context.ConfigurableApplicationContext;
import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code;
import io.openvidu.server.OpenViduServer;
import io.openvidu.server.core.Participant;
import io.openvidu.server.core.SessionManager;
import io.openvidu.server.core.Token;
import io.openvidu.server.kurento.KurentoClientProvider;
import io.openvidu.server.kurento.KurentoClientSessionInfo;
import io.openvidu.server.kurento.core.KurentoSessionEventsHandler;
import io.openvidu.server.kurento.core.KurentoSessionManager;
import io.openvidu.server.kurento.kms.KmsManager;
/**
* Tests for {@link RoomManager} when using mocked {@link KurentoClient} resources.
@ -124,7 +80,7 @@ public class RoomManagerTest {
private SessionManager manager;
@Mock
private KurentoClientProvider kcProvider;
private KmsManager kcProvider;
@Mock
private KurentoSessionEventsHandler roomHandler;