openvidu-server: KmsManager refactoring. Kms toJson extended

pull/375/head
pabloFuente 2019-06-18 15:37:39 +02:00
parent d07d2055ea
commit 4eae276d15
12 changed files with 179 additions and 154 deletions

View File

@ -20,11 +20,9 @@ package io.openvidu.server;
import java.io.IOException; import java.io.IOException;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import org.kurento.jsonrpc.JsonUtils;
import org.kurento.jsonrpc.internal.server.config.JsonRpcConfiguration; import org.kurento.jsonrpc.internal.server.config.JsonRpcConfiguration;
import org.kurento.jsonrpc.server.JsonRpcConfigurer; import org.kurento.jsonrpc.server.JsonRpcConfigurer;
import org.kurento.jsonrpc.server.JsonRpcHandlerRegistry; import org.kurento.jsonrpc.server.JsonRpcHandlerRegistry;
@ -38,11 +36,6 @@ import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Import;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.core.env.Environment;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code; import io.openvidu.client.OpenViduException.Code;
@ -62,6 +55,7 @@ import io.openvidu.server.kurento.core.KurentoSessionManager;
import io.openvidu.server.kurento.kms.DummyLoadManager; import io.openvidu.server.kurento.kms.DummyLoadManager;
import io.openvidu.server.kurento.kms.FixedOneKmsManager; import io.openvidu.server.kurento.kms.FixedOneKmsManager;
import io.openvidu.server.kurento.kms.KmsManager; import io.openvidu.server.kurento.kms.KmsManager;
import io.openvidu.server.kurento.kms.LoadManager;
import io.openvidu.server.recording.service.RecordingManager; import io.openvidu.server.recording.service.RecordingManager;
import io.openvidu.server.rpc.RpcHandler; import io.openvidu.server.rpc.RpcHandler;
import io.openvidu.server.rpc.RpcNotificationService; import io.openvidu.server.rpc.RpcNotificationService;
@ -81,7 +75,7 @@ public class OpenViduServer implements JsonRpcConfigurer {
private static final Logger log = LoggerFactory.getLogger(OpenViduServer.class); private static final Logger log = LoggerFactory.getLogger(OpenViduServer.class);
@Autowired @Autowired
private Environment env; OpenviduConfig openviduConfig;
public static final String KMSS_URIS_PROPERTY = "kms.uris"; public static final String KMSS_URIS_PROPERTY = "kms.uris";
@ -92,19 +86,18 @@ public class OpenViduServer implements JsonRpcConfigurer {
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
public KmsManager kmsManager() { public KmsManager kmsManager() {
JsonParser parser = new JsonParser(); if (openviduConfig.getKmsUris().isEmpty()) {
String uris = env.getProperty(KMSS_URIS_PROPERTY);
JsonElement elem = parser.parse(uris);
JsonArray kmsUris = elem.getAsJsonArray();
List<String> kmsWsUris = JsonUtils.toStringList(kmsUris);
if (kmsWsUris.isEmpty()) {
throw new IllegalArgumentException(KMSS_URIS_PROPERTY + " should contain at least one kms url"); throw new IllegalArgumentException(KMSS_URIS_PROPERTY + " should contain at least one kms url");
} }
String firstKmsWsUri = openviduConfig.getKmsUris().get(0);
String firstKmsWsUri = kmsWsUris.get(0);
log.info("OpenVidu Server using one KMS: {}", firstKmsWsUri); log.info("OpenVidu Server using one KMS: {}", firstKmsWsUri);
return new FixedOneKmsManager(firstKmsWsUri, new DummyLoadManager()); return new FixedOneKmsManager();
}
@Bean
@ConditionalOnMissingBean
public LoadManager loadManager() {
return new DummyLoadManager();
} }
@Bean @Bean
@ -149,12 +142,6 @@ public class OpenViduServer implements JsonRpcConfigurer {
return new TokenGeneratorDefault(); return new TokenGeneratorDefault();
} }
@Bean
@ConditionalOnMissingBean
public OpenviduConfig openviduConfig() {
return new OpenviduConfig();
}
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
public RecordingManager recordingManager() { public RecordingManager recordingManager() {
@ -162,8 +149,9 @@ public class OpenViduServer implements JsonRpcConfigurer {
} }
@Bean @Bean
@ConditionalOnMissingBean
public CoturnCredentialsService coturnCredentialsService() { public CoturnCredentialsService coturnCredentialsService() {
return new CoturnCredentialsServiceFactory(openviduConfig()).getCoturnCredentialsService(); return new CoturnCredentialsServiceFactory().getCoturnCredentialsService(openviduConfig.getSpringProfile());
} }
@Bean @Bean
@ -190,16 +178,14 @@ public class OpenViduServer implements JsonRpcConfigurer {
@PostConstruct @PostConstruct
public void init() throws MalformedURLException, InterruptedException { public void init() throws MalformedURLException, InterruptedException {
OpenviduConfig openviduConf = openviduConfig(); String publicUrl = this.openviduConfig.getOpenViduPublicUrl();
String publicUrl = openviduConf.getOpenViduPublicUrl();
String type = publicUrl; String type = publicUrl;
switch (publicUrl) { switch (publicUrl) {
case "docker": case "docker":
try { try {
String containerIp = getContainerIp(); String containerIp = getContainerIp();
OpenViduServer.wsUrl = "wss://" + containerIp + ":" + openviduConf.getServerPort(); OpenViduServer.wsUrl = "wss://" + containerIp + ":" + openviduConfig.getServerPort();
} catch (Exception e) { } catch (Exception e) {
log.error("Docker container IP was configured, but there was an error obtaining IP: " log.error("Docker container IP was configured, but there was an error obtaining IP: "
+ e.getClass().getName() + " " + e.getMessage()); + e.getClass().getName() + " " + e.getMessage());
@ -231,14 +217,14 @@ public class OpenViduServer implements JsonRpcConfigurer {
if (OpenViduServer.wsUrl == null) { if (OpenViduServer.wsUrl == null) {
type = "local"; type = "local";
OpenViduServer.wsUrl = "wss://localhost:" + openviduConf.getServerPort(); OpenViduServer.wsUrl = "wss://localhost:" + openviduConfig.getServerPort();
} }
if (OpenViduServer.wsUrl.endsWith("/")) { if (OpenViduServer.wsUrl.endsWith("/")) {
OpenViduServer.wsUrl = OpenViduServer.wsUrl.substring(0, OpenViduServer.wsUrl.length() - 1); OpenViduServer.wsUrl = OpenViduServer.wsUrl.substring(0, OpenViduServer.wsUrl.length() - 1);
} }
if (this.openviduConfig().isRecordingModuleEnabled()) { if (this.openviduConfig.isRecordingModuleEnabled()) {
try { try {
this.recordingManager().initializeRecordingManager(); this.recordingManager().initializeRecordingManager();
} catch (OpenViduException e) { } catch (OpenViduException e) {
@ -247,11 +233,11 @@ public class OpenViduServer implements JsonRpcConfigurer {
finalErrorMessage = "Error connecting to Docker daemon. Enabling OpenVidu recording module requires Docker"; finalErrorMessage = "Error connecting to Docker daemon. Enabling OpenVidu recording module requires Docker";
} else if (e.getCodeValue() == Code.RECORDING_PATH_NOT_VALID.getValue()) { } else if (e.getCodeValue() == Code.RECORDING_PATH_NOT_VALID.getValue()) {
finalErrorMessage = "Error initializing recording path \"" finalErrorMessage = "Error initializing recording path \""
+ this.openviduConfig().getOpenViduRecordingPath() + this.openviduConfig.getOpenViduRecordingPath()
+ "\" set with system property \"openvidu.recording.path\""; + "\" set with system property \"openvidu.recording.path\"";
} else if (e.getCodeValue() == Code.RECORDING_FILE_EMPTY_ERROR.getValue()) { } else if (e.getCodeValue() == Code.RECORDING_FILE_EMPTY_ERROR.getValue()) {
finalErrorMessage = "Error initializing recording custom layouts path \"" finalErrorMessage = "Error initializing recording custom layouts path \""
+ this.openviduConfig().getOpenviduRecordingCustomLayout() + this.openviduConfig.getOpenviduRecordingCustomLayout()
+ "\" set with system property \"openvidu.recording.custom-layout\""; + "\" set with system property \"openvidu.recording.custom-layout\"";
} }
log.error(finalErrorMessage + ". Shutting down OpenVidu Server"); log.error(finalErrorMessage + ". Shutting down OpenVidu Server");
@ -260,8 +246,8 @@ public class OpenViduServer implements JsonRpcConfigurer {
} }
String finalUrl = OpenViduServer.wsUrl.replaceFirst("wss://", "https://").replaceFirst("ws://", "http://"); String finalUrl = OpenViduServer.wsUrl.replaceFirst("wss://", "https://").replaceFirst("ws://", "http://");
openviduConf.setFinalUrl(finalUrl); openviduConfig.setFinalUrl(finalUrl);
httpUrl = openviduConf.getFinalUrl(); httpUrl = openviduConfig.getFinalUrl();
log.info("OpenVidu Server using " + type + " URL: [" + OpenViduServer.wsUrl + "]"); log.info("OpenVidu Server using " + type + " URL: [" + OpenViduServer.wsUrl + "]");
} }

View File

@ -17,11 +17,18 @@
package io.openvidu.server.config; package io.openvidu.server.config;
import java.util.List;
import org.kurento.jsonrpc.JsonUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.info.BuildProperties; import org.springframework.boot.info.BuildProperties;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import io.openvidu.java.client.OpenViduRole; import io.openvidu.java.client.OpenViduRole;
@Component @Component
@ -30,6 +37,11 @@ public class OpenviduConfig {
@Autowired @Autowired
BuildProperties buildProperties; BuildProperties buildProperties;
@Value("${kms.uris}")
private String kmsUris;
private List<String> kmsUrisList;
@Value("${openvidu.publicurl}") @Value("${openvidu.publicurl}")
private String openviduPublicUrl; // local, docker, [FINAL_URL] private String openviduPublicUrl; // local, docker, [FINAL_URL]
@ -95,6 +107,19 @@ public class OpenviduConfig {
private String finalUrl; private String finalUrl;
public List<String> getKmsUris() {
if (kmsUrisList == null) {
this.kmsUris = this.kmsUris.replaceAll("\\s", "");
JsonParser parser = new JsonParser();
JsonElement elem = parser.parse(this.kmsUris);
JsonArray kmsUris = elem.getAsJsonArray();
this.kmsUrisList = JsonUtils.toStringList(kmsUris);
return this.kmsUrisList;
} else {
return this.kmsUrisList;
}
}
public String getOpenViduPublicUrl() { public String getOpenViduPublicUrl() {
return this.openviduPublicUrl; return this.openviduPublicUrl;
} }

View File

@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.utils.CommandExecutor; import io.openvidu.server.utils.CommandExecutor;
public class BashCoturnCredentialsService extends CoturnCredentialsService { public class BashCoturnCredentialsService extends CoturnCredentialsService {
@ -31,8 +30,7 @@ public class BashCoturnCredentialsService extends CoturnCredentialsService {
private AtomicLong logCounter = new AtomicLong(0); private AtomicLong logCounter = new AtomicLong(0);
private final long LOG_LIMIT = 30; private final long LOG_LIMIT = 30;
public BashCoturnCredentialsService(OpenviduConfig openviduConfig) { public BashCoturnCredentialsService() {
super(openviduConfig);
try { try {
String response = CommandExecutor.execCommand("/bin/sh", "-c", String response = CommandExecutor.execCommand("/bin/sh", "-c",
"turnadmin -l -N " + this.coturnDatabaseString); "turnadmin -l -N " + this.coturnDatabaseString);

View File

@ -19,8 +19,11 @@ package io.openvidu.server.coturn;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.config.OpenviduConfig;
@ -28,6 +31,7 @@ public abstract class CoturnCredentialsService {
protected static final Logger log = LoggerFactory.getLogger(CoturnCredentialsService.class); protected static final Logger log = LoggerFactory.getLogger(CoturnCredentialsService.class);
@Autowired
protected OpenviduConfig openviduConfig; protected OpenviduConfig openviduConfig;
protected String coturnDatabaseString; protected String coturnDatabaseString;
@ -35,12 +39,6 @@ public abstract class CoturnCredentialsService {
protected AtomicBoolean coturnAvailable = new AtomicBoolean(false); protected AtomicBoolean coturnAvailable = new AtomicBoolean(false);
public CoturnCredentialsService(OpenviduConfig openviduConfig) {
this.openviduConfig = openviduConfig;
this.coturnDatabaseString = this.openviduConfig.getCoturnDatabaseString();
this.trimmedCoturnDatabaseString = this.coturnDatabaseString.replaceAll("^\"|\"$", "");
}
public abstract TurnCredentials createUser(); public abstract TurnCredentials createUser();
public abstract boolean deleteUser(String user); public abstract boolean deleteUser(String user);
@ -49,4 +47,10 @@ public abstract class CoturnCredentialsService {
return this.coturnAvailable.get(); return this.coturnAvailable.get();
} }
@PostConstruct
protected void initDatabse() {
this.coturnDatabaseString = this.openviduConfig.getCoturnDatabaseString();
this.trimmedCoturnDatabaseString = this.coturnDatabaseString.replaceAll("^\"|\"$", "");
}
} }

View File

@ -17,22 +17,14 @@
package io.openvidu.server.coturn; package io.openvidu.server.coturn;
import io.openvidu.server.config.OpenviduConfig;
public class CoturnCredentialsServiceFactory { public class CoturnCredentialsServiceFactory {
OpenviduConfig openviduConfig; public CoturnCredentialsService getCoturnCredentialsService(String springProfile) {
if (!"docker".equals(springProfile)) {
public CoturnCredentialsServiceFactory(OpenviduConfig openviduConfig) { return new BashCoturnCredentialsService();
this.openviduConfig = openviduConfig;
}
public CoturnCredentialsService getCoturnCredentialsService() {
if (!"docker".equals(openviduConfig.getSpringProfile())) {
return new BashCoturnCredentialsService(this.openviduConfig);
} else { } else {
// TODO: return other options // TODO: return other options
return new BashCoturnCredentialsService(this.openviduConfig); return new BashCoturnCredentialsService();
} }
} }

View File

@ -1,14 +1,7 @@
package io.openvidu.server.coturn; package io.openvidu.server.coturn;
import io.openvidu.server.config.OpenviduConfig;
public class DockerCoturnCredentialsService extends CoturnCredentialsService { public class DockerCoturnCredentialsService extends CoturnCredentialsService {
public DockerCoturnCredentialsService(OpenviduConfig openviduConfig) {
super(openviduConfig);
// TODO Auto-generated constructor stub
}
@Override @Override
public TurnCredentials createUser() { public TurnCredentials createUser() {
// TODO Auto-generated method stub // TODO Auto-generated method stub

View File

@ -20,13 +20,8 @@ package io.openvidu.server.kurento.kms;
public class DummyLoadManager implements LoadManager { public class DummyLoadManager implements LoadManager {
@Override @Override
public double calculateLoad(Kms kms) { public int calculateLoad(Kms kms) {
return 1; return 1;
} }
@Override
public boolean allowMoreElements(Kms kms) {
return true;
}
} }

View File

@ -18,17 +18,25 @@
package io.openvidu.server.kurento.kms; package io.openvidu.server.kurento.kms;
import org.kurento.client.KurentoClient; import org.kurento.client.KurentoClient;
import org.kurento.commons.exception.KurentoException;
public class FixedOneKmsManager extends KmsManager { public class FixedOneKmsManager extends KmsManager {
String kmsWsUri; @Override
protected void initializeKurentoClients() {
public FixedOneKmsManager(String kmsWsUri, LoadManager loadManager) { final String kmsUri = this.openviduConfig.getKmsUris().get(0);
super(loadManager); KurentoClient kClient = null;
this.kmsWsUri = kmsWsUri; try {
kClient = KurentoClient.create(kmsUri, this.generateKurentoConnectionListener(kmsUri));
KurentoClient kClient = KurentoClient.create(kmsWsUri, this.generateKurentoConnectionListener(kmsWsUri)); } catch (KurentoException e) {
this.addKms(new Kms(kmsWsUri, kClient, loadManager)); log.error("KMS in {} is not reachable by OpenVidu Server", kmsUri);
log.error("Shutting down OpenVidu Server");
System.exit(1);
}
Kms kms = new Kms(kmsUri, kClient, loadManager);
kms.setKurentoClientConnected(true);
kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
this.addKms(kms);
} }
} }

View File

@ -17,6 +17,8 @@
package io.openvidu.server.kurento.kms; package io.openvidu.server.kurento.kms;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -26,6 +28,8 @@ import java.util.concurrent.atomic.AtomicLong;
import org.kurento.client.KurentoClient; import org.kurento.client.KurentoClient;
import org.kurento.client.ModuleInfo; import org.kurento.client.ModuleInfo;
import org.kurento.client.ServerInfo; import org.kurento.client.ServerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.JsonArray; import com.google.gson.JsonArray;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
@ -45,17 +49,30 @@ import io.openvidu.server.kurento.core.KurentoSession;
*/ */
public class Kms { public class Kms {
private static final Logger log = LoggerFactory.getLogger(Kms.class);
private String kmsUri; private String kmsUri;
private String ip;
private KurentoClient client; private KurentoClient client;
private LoadManager loadManager; private LoadManager loadManager;
private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false); private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false);
private AtomicLong timeOfKurentoClientConnection = new AtomicLong(0);
private AtomicLong timeOfKurentoClientDisconnection = new AtomicLong(0); private AtomicLong timeOfKurentoClientDisconnection = new AtomicLong(0);
private Map<String, KurentoSession> kurentoSessions = new ConcurrentHashMap<>(); private Map<String, KurentoSession> kurentoSessions = new ConcurrentHashMap<>();
public Kms(String kmsUri, KurentoClient client, LoadManager loadManager) { public Kms(String kmsUri, KurentoClient client, LoadManager loadManager) {
this.kmsUri = kmsUri; this.kmsUri = kmsUri;
try {
String parsedUri = "http://" + kmsUri.replaceAll("^ws://", "").replaceAll("^wss://", "");
URL url = new URL(parsedUri);
this.ip = url.getHost();
} catch (MalformedURLException e) {
log.error("KMS uri {} is not a valid WebSocket endpoint", kmsUri);
}
this.client = client; this.client = client;
this.loadManager = loadManager; this.loadManager = loadManager;
} }
@ -64,6 +81,10 @@ public class Kms {
return kmsUri; return kmsUri;
} }
public String getIp() {
return ip;
}
public KurentoClient getKurentoClient() { public KurentoClient getKurentoClient() {
return this.client; return this.client;
} }
@ -73,7 +94,7 @@ public class Kms {
} }
public boolean allowMoreElements() { public boolean allowMoreElements() {
return loadManager.allowMoreElements(this); return true; // loadManager.allowMoreElements(this);
} }
public boolean isKurentoClientConnected() { public boolean isKurentoClientConnected() {
@ -84,6 +105,14 @@ public class Kms {
this.isKurentoClientConnected.set(isConnected); this.isKurentoClientConnected.set(isConnected);
} }
public long getTimeOfKurentoClientConnection() {
return this.timeOfKurentoClientConnection.get();
}
public void setTimeOfKurentoClientConnection(long time) {
this.timeOfKurentoClientConnection.set(time);
}
public long getTimeOfKurentoClientDisconnection() { public long getTimeOfKurentoClientDisconnection() {
return this.timeOfKurentoClientDisconnection.get(); return this.timeOfKurentoClientDisconnection.get();
} }
@ -108,6 +137,14 @@ public class Kms {
JsonObject json = new JsonObject(); JsonObject json = new JsonObject();
json.addProperty("id", this.kmsUri); json.addProperty("id", this.kmsUri);
json.addProperty("uri", this.kmsUri); json.addProperty("uri", this.kmsUri);
json.addProperty("ip", this.ip);
final boolean connected = this.isKurentoClientConnected();
json.addProperty("connected", connected);
json.addProperty("connectionTime", this.getTimeOfKurentoClientConnection());
if (!connected) {
json.addProperty("disconnectionTime", this.getTimeOfKurentoClientDisconnection());
}
return json; return json;
} }
@ -124,11 +161,16 @@ public class Kms {
} }
if (withExtraInfo) { if (withExtraInfo) {
json.addProperty("memory", this.client.getServerManager().getUsedMemory() / 1024);
if (json.get("connected").getAsBoolean()) {
JsonObject kurentoExtraInfo = new JsonObject();
kurentoExtraInfo.addProperty("memory", this.client.getServerManager().getUsedMemory() / 1024);
ServerInfo info = this.client.getServerManager().getInfo(); ServerInfo info = this.client.getServerManager().getInfo();
json.addProperty("version", info.getVersion()); kurentoExtraInfo.addProperty("version", info.getVersion());
json.addProperty("capabilities", info.getCapabilities().toString()); kurentoExtraInfo.addProperty("capabilities", info.getCapabilities().toString());
JsonArray modules = new JsonArray(); JsonArray modules = new JsonArray();
for (ModuleInfo moduleInfo : info.getModules()) { for (ModuleInfo moduleInfo : info.getModules()) {
@ -141,7 +183,10 @@ public class Kms {
moduleJson.add("factories", factories); moduleJson.add("factories", factories);
modules.add(moduleJson); modules.add(moduleJson);
} }
json.add("modules", modules); kurentoExtraInfo.add("modules", modules);
json.add("kurentoInfo", kurentoExtraInfo);
}
} }
return json; return json;

View File

@ -20,11 +20,12 @@ package io.openvidu.server.kurento.kms;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import org.kurento.client.KurentoConnectionListener; import org.kurento.client.KurentoConnectionListener;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -32,9 +33,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import io.openvidu.server.core.SessionManager; import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.kurento.core.KurentoSessionManager;
public abstract class KmsManager { public abstract class KmsManager {
@ -75,21 +74,16 @@ public abstract class KmsManager {
} }
@Autowired protected static final Logger log = LoggerFactory.getLogger(KmsManager.class);
protected SessionManager sessionManager;
@Autowired
protected OpenviduConfig openviduConfig;
@Autowired
protected LoadManager loadManager; protected LoadManager loadManager;
private static final Logger log = LoggerFactory.getLogger(KmsManager.class);
// Using KMS websocket uris as unique identifiers // Using KMS websocket uris as unique identifiers
protected Map<String, Kms> kmss = new ConcurrentHashMap<>(); final protected Map<String, Kms> kmss = new ConcurrentHashMap<>();
private Iterator<Kms> usageIterator = null;
public KmsManager(LoadManager loadManager) {
this.loadManager = loadManager;
}
public synchronized void addKms(Kms kms) { public synchronized void addKms(Kms kms) {
this.kmss.put(kms.getUri(), kms); this.kmss.put(kms.getUri(), kms);
@ -99,18 +93,6 @@ public abstract class KmsManager {
this.kmss.remove(kms.getUri()); this.kmss.remove(kms.getUri());
} }
/**
* Returns a {@link Kms} using a round-robin strategy.
*
* @param sessionInfo session's id
*/
public synchronized Kms getKmsRoundRobin() {
if (usageIterator == null || !usageIterator.hasNext()) {
usageIterator = kmss.values().iterator();
}
return usageIterator.next();
}
public synchronized Kms getLessLoadedKms() { public synchronized Kms getLessLoadedKms() {
return Collections.min(getKmsLoads()).kms; return Collections.min(getKmsLoads()).kms;
} }
@ -134,18 +116,6 @@ public abstract class KmsManager {
return kmsLoads; return kmsLoads;
} }
public boolean isKurentoClientConnectedToKms(Kms kms) {
return this.kmss.get(kms.getUri()).isKurentoClientConnected();
}
public void setKurentoClientConnectedToKms(String kmsUri, boolean isConnected) {
this.kmss.get(kmsUri).setKurentoClientConnected(isConnected);
}
public void setTimeOfKurentoClientDisconnection(String kmsUri, long time) {
this.kmss.get(kmsUri).setTimeOfKurentoClientDisconnection(time);
}
private List<KmsLoad> getKmsLoads() { private List<KmsLoad> getKmsLoads() {
ArrayList<KmsLoad> kmsLoads = new ArrayList<>(); ArrayList<KmsLoad> kmsLoads = new ArrayList<>();
for (Kms kms : kmss.values()) { for (Kms kms : kmss.values()) {
@ -160,51 +130,60 @@ public abstract class KmsManager {
return false; return false;
} }
protected KurentoConnectionListener generateKurentoConnectionListener(String kmsWsUri) { protected KurentoConnectionListener generateKurentoConnectionListener(String kmsUri) {
return new KurentoConnectionListener() { return new KurentoConnectionListener() {
@Override @Override
public void reconnected(boolean isReconnected) { public void reconnected(boolean isReconnected) {
((KurentoSessionManager) sessionManager).getKmsManager().setKurentoClientConnectedToKms(kmsWsUri, true); final Kms kms = kmss.get(kmsUri);
kms.setKurentoClientConnected(true);
kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
if (!isReconnected) { if (!isReconnected) {
// Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints) // Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints)
log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", kmsWsUri); log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", kmsUri);
log.warn("Updating all webrtc endpoints for active sessions"); log.warn("Updating all webrtc endpoints for active sessions");
final Kms kms = ((KurentoSessionManager) sessionManager).getKmsManager().kmss.get(kmsWsUri);
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
sessionManager.getSessions().forEach(s -> { kms.getKurentoSessions().forEach(kSession -> {
((KurentoSession) s).restartStatusInKurento(timeOfKurentoDisconnection); kSession.restartStatusInKurento(timeOfKurentoDisconnection);
}); });
kms.setTimeOfKurentoClientDisconnection(0); kms.setTimeOfKurentoClientDisconnection(0);
} else { } else {
// Same KMS. We may infer that openvidu-server/KMS connection has been lost, but // Same KMS. We may infer that openvidu-server/KMS connection has been lost, but
// not the clients/KMS connections // not the clients/KMS connections
log.warn("Kurento Client reconnected to same KMS with uri {}", kmsWsUri); log.warn("Kurento Client reconnected to same KMS with uri {}", kmsUri);
} }
} }
@Override @Override
public void disconnected() { public void disconnected() {
((KurentoSessionManager) sessionManager).getKmsManager().setKurentoClientConnectedToKms(kmsWsUri, final Kms kms = kmss.get(kmsUri);
false); kms.setKurentoClientConnected(false);
((KurentoSessionManager) sessionManager).getKmsManager().setTimeOfKurentoClientDisconnection(kmsWsUri, kms.setTimeOfKurentoClientDisconnection(System.currentTimeMillis());
System.currentTimeMillis()); log.warn("Kurento Client disconnected from KMS with uri {}", kmsUri);
log.warn("Kurento Client disconnected from KMS with uri {}", kmsWsUri);
} }
@Override @Override
public void connectionFailed() { public void connectionFailed() {
((KurentoSessionManager) sessionManager).getKmsManager().setKurentoClientConnectedToKms(kmsWsUri, final Kms kms = kmss.get(kmsUri);
false); kms.setKurentoClientConnected(false);
log.warn("Kurento Client failed connecting to KMS with uri {}", kmsWsUri); log.warn("Kurento Client failed connecting to KMS with uri {}", kmsUri);
} }
@Override @Override
public void connected() { public void connected() {
((KurentoSessionManager) sessionManager).getKmsManager().setKurentoClientConnectedToKms(kmsWsUri, true); final Kms kms = kmss.get(kmsUri);
log.warn("Kurento Client is now connected to KMS with uri {}", kmsWsUri); kms.setKurentoClientConnected(true);
kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
log.warn("Kurento Client is now connected to KMS with uri {}", kmsUri);
} }
}; };
} }
protected abstract void initializeKurentoClients();
@PostConstruct
protected void postConstruct() {
this.initializeKurentoClients();
}
} }

View File

@ -19,8 +19,8 @@ package io.openvidu.server.kurento.kms;
public interface LoadManager { public interface LoadManager {
public double calculateLoad(Kms kms); public int calculateLoad(Kms kms);
public boolean allowMoreElements(Kms kms); // public boolean allowMoreElements(Kms kms);
} }

View File

@ -29,7 +29,7 @@ openvidu.streams.video.min-recv-bandwidth: 300
openvidu.streams.video.max-send-bandwidth: 1000 openvidu.streams.video.max-send-bandwidth: 1000
openvidu.streams.video.min-send-bandwidth: 300 openvidu.streams.video.min-send-bandwidth: 300
kms.uris: ["ws://localhost:8888/kurento"] kms.uris: [\"ws://localhost:8888/kurento\"]
coturn.redis.ip: 127.0.0.1 coturn.redis.ip: 127.0.0.1
coturn.redis.dbname: 0 coturn.redis.dbname: 0