openvidu-server: refactoring to allow Media Node status

pull/370/head
pabloFuente 2019-10-22 15:31:13 +02:00
parent 34cda28405
commit ba2abde8a8
8 changed files with 79 additions and 28 deletions

View File

@ -61,6 +61,8 @@ import io.openvidu.server.rpc.RpcNotificationService;
import io.openvidu.server.utils.CommandExecutor; import io.openvidu.server.utils.CommandExecutor;
import io.openvidu.server.utils.GeoLocationByIp; import io.openvidu.server.utils.GeoLocationByIp;
import io.openvidu.server.utils.GeoLocationByIpDummy; import io.openvidu.server.utils.GeoLocationByIpDummy;
import io.openvidu.server.utils.MediaNodeStatusManager;
import io.openvidu.server.utils.MediaNodeStatusManagerDummy;
import io.openvidu.server.utils.QuarantineKiller; import io.openvidu.server.utils.QuarantineKiller;
import io.openvidu.server.utils.QuarantineKillerDummy; import io.openvidu.server.utils.QuarantineKillerDummy;
import io.openvidu.server.webhook.CDRLoggerWebhook; import io.openvidu.server.webhook.CDRLoggerWebhook;
@ -186,6 +188,12 @@ public class OpenViduServer implements JsonRpcConfigurer {
return new QuarantineKillerDummy(); return new QuarantineKillerDummy();
} }
@Bean
@ConditionalOnMissingBean
public MediaNodeStatusManager mediaNodeStatusManager() {
return new MediaNodeStatusManagerDummy();
}
@Override @Override
public void registerJsonRpcHandlers(JsonRpcHandlerRegistry registry) { public void registerJsonRpcHandlers(JsonRpcHandlerRegistry registry) {
registry.addHandler(rpcHandler().withPingWatchdog(true).withInterceptors(new HttpHandshakeInterceptor()), registry.addHandler(rpcHandler().withPingWatchdog(true).withInterceptors(new HttpHandshakeInterceptor()),

View File

@ -91,7 +91,7 @@ public class KurentoSessionManager extends SessionManager {
Kms lessLoadedKms = null; Kms lessLoadedKms = null;
try { try {
lessLoadedKms = this.kmsManager.getLessLoadedAndNoQuarantinedKms(); lessLoadedKms = this.kmsManager.getLessLoadedAndRunningKms();
} catch (NoSuchElementException e) { } catch (NoSuchElementException e) {
// Restore session not active // Restore session not active
this.cleanCollections(sessionId); this.cleanCollections(sessionId);

View File

@ -31,9 +31,10 @@ public class FixedOneKmsManager extends KmsManager {
KmsProperties firstProps = kmsProperties.get(0); KmsProperties firstProps = kmsProperties.get(0);
KurentoClient kClient = null; KurentoClient kClient = null;
Kms kms = new Kms(firstProps, loadManager); Kms kms = new Kms(firstProps, loadManager);
this.addKms(kms);
try { try {
kClient = KurentoClient.create(firstProps.getUri(), this.generateKurentoConnectionListener(kms.getId())); kClient = KurentoClient.create(firstProps.getUri(), this.generateKurentoConnectionListener(kms.getId()));
this.addKms(kms);
kms.setKurentoClient(kClient);
} catch (KurentoException e) { } catch (KurentoException e) {
log.error("KMS in {} is not reachable by OpenVidu Server", firstProps.getUri()); log.error("KMS in {} is not reachable by OpenVidu Server", firstProps.getUri());
if (kClient != null) { if (kClient != null) {
@ -41,9 +42,6 @@ public class FixedOneKmsManager extends KmsManager {
} }
throw new Exception(); throw new Exception();
} }
kms.setKurentoClient(kClient);
return Arrays.asList(kms); return Arrays.asList(kms);
} }

View File

@ -51,10 +51,9 @@ public class Kms {
private static final Logger log = LoggerFactory.getLogger(Kms.class); private static final Logger log = LoggerFactory.getLogger(Kms.class);
private String id; private String id; // Dynamic ID
private String uri; private String uri;
private String ip; private String ip;
private boolean quarantined;
private KurentoClient client; private KurentoClient client;
private LoadManager loadManager; private LoadManager loadManager;
@ -67,7 +66,6 @@ public class Kms {
public Kms(KmsProperties props, LoadManager loadManager) { public Kms(KmsProperties props, LoadManager loadManager) {
this.id = props.getId(); this.id = props.getId();
this.uri = props.getUri(); this.uri = props.getUri();
this.quarantined = false;
String parsedUri = uri.replaceAll("^ws://", "http://").replaceAll("^wss://", "https://"); String parsedUri = uri.replaceAll("^ws://", "http://").replaceAll("^wss://", "https://");
URL url = null; URL url = null;
@ -97,14 +95,6 @@ public class Kms {
return ip; return ip;
} }
public synchronized boolean isQuarantined() {
return this.quarantined;
}
public synchronized void setQuarantined(boolean quarantined) {
this.quarantined = quarantined;
}
public KurentoClient getKurentoClient() { public KurentoClient getKurentoClient() {
return this.client; return this.client;
} }
@ -156,9 +146,9 @@ public class Kms {
public JsonObject toJson() { public JsonObject toJson() {
JsonObject json = new JsonObject(); JsonObject json = new JsonObject();
json.addProperty("id", this.id); json.addProperty("id", this.id);
json.addProperty("uri", this.uri);
json.addProperty("ip", this.ip); json.addProperty("ip", this.ip);
json.addProperty("quarantined", this.quarantined); json.addProperty("uri", this.uri);
final boolean connected = this.isKurentoClientConnected(); final boolean connected = this.isKurentoClientConnected();
json.addProperty("connected", connected); json.addProperty("connected", connected);
json.addProperty("connectionTime", this.getTimeOfKurentoClientConnection()); json.addProperty("connectionTime", this.getTimeOfKurentoClientConnection());

View File

@ -38,6 +38,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.utils.MediaNodeStatusManager;
public abstract class KmsManager { public abstract class KmsManager {
@ -75,7 +76,6 @@ public abstract class KmsManager {
json.addProperty("load", this.load); json.addProperty("load", this.load);
return json; return json;
} }
} }
protected static final Logger log = LoggerFactory.getLogger(KmsManager.class); protected static final Logger log = LoggerFactory.getLogger(KmsManager.class);
@ -86,8 +86,13 @@ public abstract class KmsManager {
@Autowired @Autowired
protected LoadManager loadManager; protected LoadManager loadManager;
@Autowired
protected MediaNodeStatusManager mediaNodeStatusManager;
final protected Map<String, Kms> kmss = new ConcurrentHashMap<>(); final protected Map<String, Kms> kmss = new ConcurrentHashMap<>();
protected Map<String, String> forceKmsUrisToHaveKmsIds;
public synchronized void addKms(Kms kms) { public synchronized void addKms(Kms kms) {
this.kmss.put(kms.getId(), kms); this.kmss.put(kms.getId(), kms);
} }
@ -100,9 +105,9 @@ public abstract class KmsManager {
return Collections.min(getKmsLoads()).kms; return Collections.min(getKmsLoads()).kms;
} }
public synchronized Kms getLessLoadedAndNoQuarantinedKms() throws NoSuchElementException { public synchronized Kms getLessLoadedAndRunningKms() throws NoSuchElementException {
List<KmsLoad> kmsLoads = getKmsLoads().stream().filter(kmsLoad -> !kmsLoad.kms.isQuarantined()) List<KmsLoad> kmsLoads = getKmsLoads().stream()
.collect(Collectors.toList()); .filter(kmsLoad -> mediaNodeStatusManager.isRunning(kmsLoad.kms.getId())).collect(Collectors.toList());
return Collections.min(kmsLoads).kms; return Collections.min(kmsLoads).kms;
} }
@ -139,7 +144,7 @@ public abstract class KmsManager {
return kmsLoads; return kmsLoads;
} }
protected KurentoConnectionListener generateKurentoConnectionListener(String kmsId) { protected KurentoConnectionListener generateKurentoConnectionListener(final String kmsId) {
return new KurentoConnectionListener() { return new KurentoConnectionListener() {
@Override @Override
@ -183,6 +188,7 @@ public abstract class KmsManager {
final Kms kms = kmss.get(kmsId); final Kms kms = kmss.get(kmsId);
kms.setKurentoClientConnected(true); kms.setKurentoClientConnected(true);
kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
mediaNodeStatusManager.setStatus(kmsId, "running");
log.warn("Kurento Client is now connected to KMS {} with uri {}", kmsId, kms.getUri()); log.warn("Kurento Client is now connected to KMS {} with uri {}", kmsId, kms.getUri());
} }
}; };
@ -191,21 +197,26 @@ public abstract class KmsManager {
public abstract List<Kms> initializeKurentoClients(List<KmsProperties> kmsProperties, boolean disconnectUponFailure, public abstract List<Kms> initializeKurentoClients(List<KmsProperties> kmsProperties, boolean disconnectUponFailure,
boolean sendMediaNodeAddedEvent) throws Exception; boolean sendMediaNodeAddedEvent) throws Exception;
public LoadManager getLoadManager() {
return this.loadManager;
}
@PostConstruct @PostConstruct
protected void postConstruct() { protected List<Kms> postConstruct() {
try { try {
List<KmsProperties> kmsProps = new ArrayList<>(); List<KmsProperties> kmsProps = new ArrayList<>();
String kmsId;
for (String kmsUri : this.openviduConfig.getKmsUris()) { for (String kmsUri : this.openviduConfig.getKmsUris()) {
kmsId = "KMS-" + RandomStringUtils.randomAlphanumeric(6).toUpperCase(); String kmsId = forceKmsUrisToHaveKmsIds != null ? forceKmsUrisToHaveKmsIds.get(kmsUri)
: "KMS-" + RandomStringUtils.randomAlphanumeric(6).toUpperCase();
kmsProps.add(new KmsProperties(kmsId, kmsUri)); kmsProps.add(new KmsProperties(kmsId, kmsUri));
} }
this.initializeKurentoClients(kmsProps, true, false); return this.initializeKurentoClients(kmsProps, true, false);
} catch (Exception e) { } catch (Exception e) {
// Some KMS wasn't reachable // Some KMS wasn't reachable
log.error("Shutting down OpenVidu Server"); log.error("Shutting down OpenVidu Server");
System.exit(1); System.exit(1);
} }
return null;
} }
@PreDestroy @PreDestroy

View File

@ -19,7 +19,7 @@ package io.openvidu.server.kurento.kms;
public class KmsProperties { public class KmsProperties {
private String id; private String id; // Dynamic ID
private String uri; private String uri;
public KmsProperties(String id, String uri) { public KmsProperties(String id, String uri) {

View File

@ -0,0 +1,15 @@
package io.openvidu.server.utils;
public interface MediaNodeStatusManager {
public boolean isPending(String mediaNodeId);
public boolean isRunning(String mediaNodeId);
public boolean isShuttingDown(String mediaNodeId);
public boolean isWaitingIdleToShuttingDown(String mediaNodeId);
public void setStatus(String mediaNodeId, String status);
}

View File

@ -0,0 +1,29 @@
package io.openvidu.server.utils;
public class MediaNodeStatusManagerDummy implements MediaNodeStatusManager {
@Override
public boolean isPending(String mediaNodeId) {
return false;
}
@Override
public boolean isRunning(String mediaNodeId) {
return true;
}
@Override
public boolean isShuttingDown(String mediaNodeId) {
return false;
}
@Override
public boolean isWaitingIdleToShuttingDown(String mediaNodeId) {
return false;
}
@Override
public void setStatus(String mediaNodeId, String status) {
}
}