From 89432c52a9572281250aef21d5789b8a40bfc806 Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Fri, 5 Jul 2019 12:56:18 +0200 Subject: [PATCH] openvidu-server: random id in Kms --- .../kurento/kms/FixedOneKmsManager.java | 5 ++- .../io/openvidu/server/kurento/kms/Kms.java | 23 ++++++---- .../server/kurento/kms/KmsManager.java | 44 +++++++++++-------- 3 files changed, 45 insertions(+), 27 deletions(-) diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java index f2b58843..0ffb8d39 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java @@ -17,6 +17,7 @@ package io.openvidu.server.kurento.kms; +import java.util.Arrays; import java.util.List; import org.kurento.client.KurentoClient; @@ -25,7 +26,7 @@ import org.kurento.commons.exception.KurentoException; public class FixedOneKmsManager extends KmsManager { @Override - public void initializeKurentoClients(List kmsUris) throws Exception { + public List initializeKurentoClients(List kmsUris) throws Exception { final String kmsUri = kmsUris.get(0); KurentoClient kClient = null; try { @@ -38,6 +39,8 @@ public class FixedOneKmsManager extends KmsManager { kms.setKurentoClientConnected(true); kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); this.addKms(kms); + + return Arrays.asList(kms); } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java index 25834f7e..ef192869 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.RandomStringUtils; import org.kurento.client.KurentoClient; import org.kurento.client.ModuleInfo; import org.kurento.client.ServerInfo; @@ -51,7 +52,8 @@ public class Kms { private static final Logger log = LoggerFactory.getLogger(Kms.class); - private String kmsUri; + private String id; + private String uri; private String ip; private KurentoClient client; private LoadManager loadManager; @@ -62,23 +64,28 @@ public class Kms { private Map kurentoSessions = new ConcurrentHashMap<>(); - public Kms(String kmsUri, KurentoClient client, LoadManager loadManager) { - this.kmsUri = kmsUri; + public Kms(String uri, KurentoClient client, LoadManager loadManager) { + this.uri = uri; + this.id = "KMS-" + RandomStringUtils.randomAlphanumeric(6).toUpperCase(); try { - String parsedUri = "http://" + kmsUri.replaceAll("^ws://", "").replaceAll("^wss://", ""); + String parsedUri = "http://" + uri.replaceAll("^ws://", "").replaceAll("^wss://", ""); URL url = new URL(parsedUri); this.ip = url.getHost(); } catch (MalformedURLException e) { - log.error("KMS uri {} is not a valid WebSocket endpoint", kmsUri); + log.error("KMS uri {} is not a valid WebSocket endpoint", uri); } this.client = client; this.loadManager = loadManager; } + public String getId() { + return id; + } + public String getUri() { - return kmsUri; + return uri; } public String getIp() { @@ -135,8 +142,8 @@ public class Kms { public JsonObject toJson() { JsonObject json = new JsonObject(); - json.addProperty("id", this.kmsUri); - json.addProperty("uri", this.kmsUri); + json.addProperty("id", this.id); + json.addProperty("uri", this.uri); json.addProperty("ip", this.ip); final boolean connected = this.isKurentoClientConnected(); json.addProperty("connected", connected); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index 2196bf4d..ea521fed 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -83,23 +83,31 @@ public abstract class KmsManager { @Autowired protected LoadManager loadManager; - // Using KMS websocket uris as unique identifiers final protected Map kmss = new ConcurrentHashMap<>(); public synchronized void addKms(Kms kms) { - this.kmss.put(kms.getUri(), kms); + this.kmss.put(kms.getId(), kms); } - public synchronized Kms removeKms(String kmsUri) { - return this.kmss.remove(kmsUri); + public synchronized Kms removeKms(String kmsId) { + return this.kmss.remove(kmsId); } public synchronized Kms getLessLoadedKms() throws NoSuchElementException { return Collections.min(getKmsLoads()).kms; } - public Kms getKms(String kmsUri) { - return this.kmss.get(kmsUri); + public Kms getKms(String kmsId) { + return this.kmss.get(kmsId); + } + + public boolean kmsWithUriExists(String kmsUri) { + return this.kmss.values().stream().anyMatch(kms -> kms.getUri().equals(kmsUri)); + } + + public KmsLoad getKmsLoad(String kmsId) { + Kms kms = this.kmss.get(kmsId); + return new KmsLoad(kms, kms.getLoad()); } public Collection getKmss() { @@ -117,7 +125,7 @@ public abstract class KmsManager { for (Kms kms : kmss.values()) { double load = kms.getLoad(); kmsLoads.add(new KmsLoad(kms, load)); - log.trace("Calc load {} for kms: {}", load, kms.getUri()); + log.trace("Calc load {} for kms {}", load, kms.getUri()); } return kmsLoads; } @@ -126,17 +134,17 @@ public abstract class KmsManager { return false; } - protected KurentoConnectionListener generateKurentoConnectionListener(String kmsUri) { + protected KurentoConnectionListener generateKurentoConnectionListener(String kmsId) { return new KurentoConnectionListener() { @Override public void reconnected(boolean isReconnected) { - final Kms kms = kmss.get(kmsUri); + final Kms kms = kmss.get(kmsId); kms.setKurentoClientConnected(true); kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); if (!isReconnected) { // Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints) - log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", kmsUri); + log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", kms.getUri()); log.warn("Updating all webrtc endpoints for active sessions"); final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); kms.getKurentoSessions().forEach(kSession -> { @@ -146,36 +154,36 @@ public abstract class KmsManager { } else { // Same KMS. We may infer that openvidu-server/KMS connection has been lost, but // not the clients/KMS connections - log.warn("Kurento Client reconnected to same KMS with uri {}", kmsUri); + log.warn("Kurento Client reconnected to same KMS {} with uri {}", kmsId, kms.getUri()); } } @Override public void disconnected() { - final Kms kms = kmss.get(kmsUri); + final Kms kms = kmss.get(kmsId); kms.setKurentoClientConnected(false); kms.setTimeOfKurentoClientDisconnection(System.currentTimeMillis()); - log.warn("Kurento Client disconnected from KMS with uri {}", kmsUri); + log.warn("Kurento Client disconnected from KMS {} with uri {}", kmsId, kms.getUri()); } @Override public void connectionFailed() { - final Kms kms = kmss.get(kmsUri); + final Kms kms = kmss.get(kmsId); kms.setKurentoClientConnected(false); - log.warn("Kurento Client failed connecting to KMS with uri {}", kmsUri); + log.warn("Kurento Client failed connecting to KMS {} with uri {}", kmsId, kms.getUri()); } @Override public void connected() { - final Kms kms = kmss.get(kmsUri); + final Kms kms = kmss.get(kmsId); kms.setKurentoClientConnected(true); kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); - log.warn("Kurento Client is now connected to KMS with uri {}", kmsUri); + log.warn("Kurento Client is now connected to KMS {} with uri {}", kmsId, kms.getUri()); } }; } - public abstract void initializeKurentoClients(List kmsUris) throws Exception; + public abstract List initializeKurentoClients(List kmsUris) throws Exception; @PostConstruct private void postConstruct() {