From ba2abde8a8f9842e4e416c3caa56315ad283dd74 Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Tue, 22 Oct 2019 15:31:13 +0200 Subject: [PATCH] openvidu-server: refactoring to allow Media Node status --- .../io/openvidu/server/OpenViduServer.java | 8 +++++ .../kurento/core/KurentoSessionManager.java | 2 +- .../kurento/kms/FixedOneKmsManager.java | 6 ++-- .../io/openvidu/server/kurento/kms/Kms.java | 16 ++-------- .../server/kurento/kms/KmsManager.java | 29 +++++++++++++------ .../server/kurento/kms/KmsProperties.java | 2 +- .../server/utils/MediaNodeStatusManager.java | 15 ++++++++++ .../utils/MediaNodeStatusManagerDummy.java | 29 +++++++++++++++++++ 8 files changed, 79 insertions(+), 28 deletions(-) create mode 100644 openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeStatusManager.java create mode 100644 openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeStatusManagerDummy.java diff --git a/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java b/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java index 142bbd8d..2b169291 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java +++ b/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java @@ -61,6 +61,8 @@ import io.openvidu.server.rpc.RpcNotificationService; import io.openvidu.server.utils.CommandExecutor; import io.openvidu.server.utils.GeoLocationByIp; import io.openvidu.server.utils.GeoLocationByIpDummy; +import io.openvidu.server.utils.MediaNodeStatusManager; +import io.openvidu.server.utils.MediaNodeStatusManagerDummy; import io.openvidu.server.utils.QuarantineKiller; import io.openvidu.server.utils.QuarantineKillerDummy; import io.openvidu.server.webhook.CDRLoggerWebhook; @@ -186,6 +188,12 @@ public class OpenViduServer implements JsonRpcConfigurer { return new QuarantineKillerDummy(); } + @Bean + @ConditionalOnMissingBean + public MediaNodeStatusManager mediaNodeStatusManager() { + return new MediaNodeStatusManagerDummy(); + } + @Override public void registerJsonRpcHandlers(JsonRpcHandlerRegistry registry) { registry.addHandler(rpcHandler().withPingWatchdog(true).withInterceptors(new HttpHandshakeInterceptor()), diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index 5ac12482..b2d7a3a3 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -91,7 +91,7 @@ public class KurentoSessionManager extends SessionManager { Kms lessLoadedKms = null; try { - lessLoadedKms = this.kmsManager.getLessLoadedAndNoQuarantinedKms(); + lessLoadedKms = this.kmsManager.getLessLoadedAndRunningKms(); } catch (NoSuchElementException e) { // Restore session not active this.cleanCollections(sessionId); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java index fc30b3bb..9898ca75 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java @@ -31,9 +31,10 @@ public class FixedOneKmsManager extends KmsManager { KmsProperties firstProps = kmsProperties.get(0); KurentoClient kClient = null; Kms kms = new Kms(firstProps, loadManager); - this.addKms(kms); try { kClient = KurentoClient.create(firstProps.getUri(), this.generateKurentoConnectionListener(kms.getId())); + this.addKms(kms); + kms.setKurentoClient(kClient); } catch (KurentoException e) { log.error("KMS in {} is not reachable by OpenVidu Server", firstProps.getUri()); if (kClient != null) { @@ -41,9 +42,6 @@ public class FixedOneKmsManager extends KmsManager { } throw new Exception(); } - - kms.setKurentoClient(kClient); - return Arrays.asList(kms); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java index e4bd65ae..fe2dab24 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java @@ -51,10 +51,9 @@ public class Kms { private static final Logger log = LoggerFactory.getLogger(Kms.class); - private String id; + private String id; // Dynamic ID private String uri; private String ip; - private boolean quarantined; private KurentoClient client; private LoadManager loadManager; @@ -67,7 +66,6 @@ public class Kms { public Kms(KmsProperties props, LoadManager loadManager) { this.id = props.getId(); this.uri = props.getUri(); - this.quarantined = false; String parsedUri = uri.replaceAll("^ws://", "http://").replaceAll("^wss://", "https://"); URL url = null; @@ -97,14 +95,6 @@ public class Kms { return ip; } - public synchronized boolean isQuarantined() { - return this.quarantined; - } - - public synchronized void setQuarantined(boolean quarantined) { - this.quarantined = quarantined; - } - public KurentoClient getKurentoClient() { return this.client; } @@ -156,9 +146,9 @@ public class Kms { public JsonObject toJson() { JsonObject json = new JsonObject(); json.addProperty("id", this.id); - json.addProperty("uri", this.uri); json.addProperty("ip", this.ip); - json.addProperty("quarantined", this.quarantined); + json.addProperty("uri", this.uri); + final boolean connected = this.isKurentoClientConnected(); json.addProperty("connected", connected); json.addProperty("connectionTime", this.getTimeOfKurentoClientConnection()); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index 3409511c..549f7963 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -38,6 +38,7 @@ import org.springframework.beans.factory.annotation.Autowired; import com.google.gson.JsonObject; import io.openvidu.server.config.OpenviduConfig; +import io.openvidu.server.utils.MediaNodeStatusManager; public abstract class KmsManager { @@ -75,7 +76,6 @@ public abstract class KmsManager { json.addProperty("load", this.load); return json; } - } protected static final Logger log = LoggerFactory.getLogger(KmsManager.class); @@ -86,8 +86,13 @@ public abstract class KmsManager { @Autowired protected LoadManager loadManager; + @Autowired + protected MediaNodeStatusManager mediaNodeStatusManager; + final protected Map kmss = new ConcurrentHashMap<>(); + protected Map forceKmsUrisToHaveKmsIds; + public synchronized void addKms(Kms kms) { this.kmss.put(kms.getId(), kms); } @@ -100,9 +105,9 @@ public abstract class KmsManager { return Collections.min(getKmsLoads()).kms; } - public synchronized Kms getLessLoadedAndNoQuarantinedKms() throws NoSuchElementException { - List kmsLoads = getKmsLoads().stream().filter(kmsLoad -> !kmsLoad.kms.isQuarantined()) - .collect(Collectors.toList()); + public synchronized Kms getLessLoadedAndRunningKms() throws NoSuchElementException { + List kmsLoads = getKmsLoads().stream() + .filter(kmsLoad -> mediaNodeStatusManager.isRunning(kmsLoad.kms.getId())).collect(Collectors.toList()); return Collections.min(kmsLoads).kms; } @@ -139,7 +144,7 @@ public abstract class KmsManager { return kmsLoads; } - protected KurentoConnectionListener generateKurentoConnectionListener(String kmsId) { + protected KurentoConnectionListener generateKurentoConnectionListener(final String kmsId) { return new KurentoConnectionListener() { @Override @@ -183,6 +188,7 @@ public abstract class KmsManager { final Kms kms = kmss.get(kmsId); kms.setKurentoClientConnected(true); kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); + mediaNodeStatusManager.setStatus(kmsId, "running"); log.warn("Kurento Client is now connected to KMS {} with uri {}", kmsId, kms.getUri()); } }; @@ -191,21 +197,26 @@ public abstract class KmsManager { public abstract List initializeKurentoClients(List kmsProperties, boolean disconnectUponFailure, boolean sendMediaNodeAddedEvent) throws Exception; + public LoadManager getLoadManager() { + return this.loadManager; + } + @PostConstruct - protected void postConstruct() { + protected List postConstruct() { try { List kmsProps = new ArrayList<>(); - String kmsId; for (String kmsUri : this.openviduConfig.getKmsUris()) { - kmsId = "KMS-" + RandomStringUtils.randomAlphanumeric(6).toUpperCase(); + String kmsId = forceKmsUrisToHaveKmsIds != null ? forceKmsUrisToHaveKmsIds.get(kmsUri) + : "KMS-" + RandomStringUtils.randomAlphanumeric(6).toUpperCase(); kmsProps.add(new KmsProperties(kmsId, kmsUri)); } - this.initializeKurentoClients(kmsProps, true, false); + return this.initializeKurentoClients(kmsProps, true, false); } catch (Exception e) { // Some KMS wasn't reachable log.error("Shutting down OpenVidu Server"); System.exit(1); } + return null; } @PreDestroy diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsProperties.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsProperties.java index 7230de74..6bdfa20d 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsProperties.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsProperties.java @@ -19,7 +19,7 @@ package io.openvidu.server.kurento.kms; public class KmsProperties { - private String id; + private String id; // Dynamic ID private String uri; public KmsProperties(String id, String uri) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeStatusManager.java b/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeStatusManager.java new file mode 100644 index 00000000..298128a0 --- /dev/null +++ b/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeStatusManager.java @@ -0,0 +1,15 @@ +package io.openvidu.server.utils; + +public interface MediaNodeStatusManager { + + public boolean isPending(String mediaNodeId); + + public boolean isRunning(String mediaNodeId); + + public boolean isShuttingDown(String mediaNodeId); + + public boolean isWaitingIdleToShuttingDown(String mediaNodeId); + + public void setStatus(String mediaNodeId, String status); + +} diff --git a/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeStatusManagerDummy.java b/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeStatusManagerDummy.java new file mode 100644 index 00000000..9b298327 --- /dev/null +++ b/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeStatusManagerDummy.java @@ -0,0 +1,29 @@ +package io.openvidu.server.utils; + +public class MediaNodeStatusManagerDummy implements MediaNodeStatusManager { + + @Override + public boolean isPending(String mediaNodeId) { + return false; + } + + @Override + public boolean isRunning(String mediaNodeId) { + return true; + } + + @Override + public boolean isShuttingDown(String mediaNodeId) { + return false; + } + + @Override + public boolean isWaitingIdleToShuttingDown(String mediaNodeId) { + return false; + } + + @Override + public void setStatus(String mediaNodeId, String status) { + } + +}