openvidu-server: random id in Kms

pull/375/head
pabloFuente 2019-07-05 12:56:18 +02:00
parent c8576b4358
commit 89432c52a9
3 changed files with 45 additions and 27 deletions

View File

@ -17,6 +17,7 @@
package io.openvidu.server.kurento.kms; package io.openvidu.server.kurento.kms;
import java.util.Arrays;
import java.util.List; import java.util.List;
import org.kurento.client.KurentoClient; import org.kurento.client.KurentoClient;
@ -25,7 +26,7 @@ import org.kurento.commons.exception.KurentoException;
public class FixedOneKmsManager extends KmsManager { public class FixedOneKmsManager extends KmsManager {
@Override @Override
public void initializeKurentoClients(List<String> kmsUris) throws Exception { public List<Kms> initializeKurentoClients(List<String> kmsUris) throws Exception {
final String kmsUri = kmsUris.get(0); final String kmsUri = kmsUris.get(0);
KurentoClient kClient = null; KurentoClient kClient = null;
try { try {
@ -38,6 +39,8 @@ public class FixedOneKmsManager extends KmsManager {
kms.setKurentoClientConnected(true); kms.setKurentoClientConnected(true);
kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
this.addKms(kms); this.addKms(kms);
return Arrays.asList(kms);
} }
} }

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.RandomStringUtils;
import org.kurento.client.KurentoClient; import org.kurento.client.KurentoClient;
import org.kurento.client.ModuleInfo; import org.kurento.client.ModuleInfo;
import org.kurento.client.ServerInfo; import org.kurento.client.ServerInfo;
@ -51,7 +52,8 @@ public class Kms {
private static final Logger log = LoggerFactory.getLogger(Kms.class); private static final Logger log = LoggerFactory.getLogger(Kms.class);
private String kmsUri; private String id;
private String uri;
private String ip; private String ip;
private KurentoClient client; private KurentoClient client;
private LoadManager loadManager; private LoadManager loadManager;
@ -62,23 +64,28 @@ public class Kms {
private Map<String, KurentoSession> kurentoSessions = new ConcurrentHashMap<>(); private Map<String, KurentoSession> kurentoSessions = new ConcurrentHashMap<>();
public Kms(String kmsUri, KurentoClient client, LoadManager loadManager) { public Kms(String uri, KurentoClient client, LoadManager loadManager) {
this.kmsUri = kmsUri; this.uri = uri;
this.id = "KMS-" + RandomStringUtils.randomAlphanumeric(6).toUpperCase();
try { try {
String parsedUri = "http://" + kmsUri.replaceAll("^ws://", "").replaceAll("^wss://", ""); String parsedUri = "http://" + uri.replaceAll("^ws://", "").replaceAll("^wss://", "");
URL url = new URL(parsedUri); URL url = new URL(parsedUri);
this.ip = url.getHost(); this.ip = url.getHost();
} catch (MalformedURLException e) { } catch (MalformedURLException e) {
log.error("KMS uri {} is not a valid WebSocket endpoint", kmsUri); log.error("KMS uri {} is not a valid WebSocket endpoint", uri);
} }
this.client = client; this.client = client;
this.loadManager = loadManager; this.loadManager = loadManager;
} }
public String getId() {
return id;
}
public String getUri() { public String getUri() {
return kmsUri; return uri;
} }
public String getIp() { public String getIp() {
@ -135,8 +142,8 @@ public class Kms {
public JsonObject toJson() { public JsonObject toJson() {
JsonObject json = new JsonObject(); JsonObject json = new JsonObject();
json.addProperty("id", this.kmsUri); json.addProperty("id", this.id);
json.addProperty("uri", this.kmsUri); json.addProperty("uri", this.uri);
json.addProperty("ip", this.ip); json.addProperty("ip", this.ip);
final boolean connected = this.isKurentoClientConnected(); final boolean connected = this.isKurentoClientConnected();
json.addProperty("connected", connected); json.addProperty("connected", connected);

View File

@ -83,23 +83,31 @@ public abstract class KmsManager {
@Autowired @Autowired
protected LoadManager loadManager; protected LoadManager loadManager;
// Using KMS websocket uris as unique identifiers
final protected Map<String, Kms> kmss = new ConcurrentHashMap<>(); final protected Map<String, Kms> kmss = new ConcurrentHashMap<>();
public synchronized void addKms(Kms kms) { public synchronized void addKms(Kms kms) {
this.kmss.put(kms.getUri(), kms); this.kmss.put(kms.getId(), kms);
} }
public synchronized Kms removeKms(String kmsUri) { public synchronized Kms removeKms(String kmsId) {
return this.kmss.remove(kmsUri); return this.kmss.remove(kmsId);
} }
public synchronized Kms getLessLoadedKms() throws NoSuchElementException { public synchronized Kms getLessLoadedKms() throws NoSuchElementException {
return Collections.min(getKmsLoads()).kms; return Collections.min(getKmsLoads()).kms;
} }
public Kms getKms(String kmsUri) { public Kms getKms(String kmsId) {
return this.kmss.get(kmsUri); return this.kmss.get(kmsId);
}
public boolean kmsWithUriExists(String kmsUri) {
return this.kmss.values().stream().anyMatch(kms -> kms.getUri().equals(kmsUri));
}
public KmsLoad getKmsLoad(String kmsId) {
Kms kms = this.kmss.get(kmsId);
return new KmsLoad(kms, kms.getLoad());
} }
public Collection<Kms> getKmss() { public Collection<Kms> getKmss() {
@ -117,7 +125,7 @@ public abstract class KmsManager {
for (Kms kms : kmss.values()) { for (Kms kms : kmss.values()) {
double load = kms.getLoad(); double load = kms.getLoad();
kmsLoads.add(new KmsLoad(kms, load)); kmsLoads.add(new KmsLoad(kms, load));
log.trace("Calc load {} for kms: {}", load, kms.getUri()); log.trace("Calc load {} for kms {}", load, kms.getUri());
} }
return kmsLoads; return kmsLoads;
} }
@ -126,17 +134,17 @@ public abstract class KmsManager {
return false; return false;
} }
protected KurentoConnectionListener generateKurentoConnectionListener(String kmsUri) { protected KurentoConnectionListener generateKurentoConnectionListener(String kmsId) {
return new KurentoConnectionListener() { return new KurentoConnectionListener() {
@Override @Override
public void reconnected(boolean isReconnected) { public void reconnected(boolean isReconnected) {
final Kms kms = kmss.get(kmsUri); final Kms kms = kmss.get(kmsId);
kms.setKurentoClientConnected(true); kms.setKurentoClientConnected(true);
kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
if (!isReconnected) { if (!isReconnected) {
// Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints) // Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints)
log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", kmsUri); log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", kms.getUri());
log.warn("Updating all webrtc endpoints for active sessions"); log.warn("Updating all webrtc endpoints for active sessions");
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
kms.getKurentoSessions().forEach(kSession -> { kms.getKurentoSessions().forEach(kSession -> {
@ -146,36 +154,36 @@ public abstract class KmsManager {
} else { } else {
// Same KMS. We may infer that openvidu-server/KMS connection has been lost, but // Same KMS. We may infer that openvidu-server/KMS connection has been lost, but
// not the clients/KMS connections // not the clients/KMS connections
log.warn("Kurento Client reconnected to same KMS with uri {}", kmsUri); log.warn("Kurento Client reconnected to same KMS {} with uri {}", kmsId, kms.getUri());
} }
} }
@Override @Override
public void disconnected() { public void disconnected() {
final Kms kms = kmss.get(kmsUri); final Kms kms = kmss.get(kmsId);
kms.setKurentoClientConnected(false); kms.setKurentoClientConnected(false);
kms.setTimeOfKurentoClientDisconnection(System.currentTimeMillis()); kms.setTimeOfKurentoClientDisconnection(System.currentTimeMillis());
log.warn("Kurento Client disconnected from KMS with uri {}", kmsUri); log.warn("Kurento Client disconnected from KMS {} with uri {}", kmsId, kms.getUri());
} }
@Override @Override
public void connectionFailed() { public void connectionFailed() {
final Kms kms = kmss.get(kmsUri); final Kms kms = kmss.get(kmsId);
kms.setKurentoClientConnected(false); kms.setKurentoClientConnected(false);
log.warn("Kurento Client failed connecting to KMS with uri {}", kmsUri); log.warn("Kurento Client failed connecting to KMS {} with uri {}", kmsId, kms.getUri());
} }
@Override @Override
public void connected() { public void connected() {
final Kms kms = kmss.get(kmsUri); final Kms kms = kmss.get(kmsId);
kms.setKurentoClientConnected(true); kms.setKurentoClientConnected(true);
kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
log.warn("Kurento Client is now connected to KMS with uri {}", kmsUri); log.warn("Kurento Client is now connected to KMS {} with uri {}", kmsId, kms.getUri());
} }
}; };
} }
public abstract void initializeKurentoClients(List<String> kmsUris) throws Exception; public abstract List<Kms> initializeKurentoClients(List<String> kmsUris) throws Exception;
@PostConstruct @PostConstruct
private void postConstruct() { private void postConstruct() {