From 9d975d3a178ec550d97616a53f38732404df8841 Mon Sep 17 00:00:00 2001 From: pabloFuente Date: Mon, 21 Mar 2022 13:49:30 +0100 Subject: [PATCH] openvidu-server: MediaNodeManager --- .../io/openvidu/server/OpenViduServer.java | 25 ++++++----------- .../openvidu/server/core/SessionManager.java | 6 ++-- .../kurento/kms/FixedOneKmsManager.java | 10 +++---- .../io/openvidu/server/kurento/kms/Kms.java | 18 ++++++++---- .../server/kurento/kms/KmsManager.java | 28 +++++++------------ ...atusManager.java => MediaNodeManager.java} | 10 ++++++- ...rDummy.java => MediaNodeManagerDummy.java} | 16 ++++++++++- .../server/utils/QuarantineKiller.java | 7 ----- .../server/utils/QuarantineKillerDummy.java | 9 ------ .../config/IntegrationTestConfiguration.java | 7 +++-- 10 files changed, 68 insertions(+), 68 deletions(-) rename openvidu-server/src/main/java/io/openvidu/server/utils/{MediaNodeStatusManager.java => MediaNodeManager.java} (51%) rename openvidu-server/src/main/java/io/openvidu/server/utils/{MediaNodeStatusManagerDummy.java => MediaNodeManagerDummy.java} (55%) delete mode 100644 openvidu-server/src/main/java/io/openvidu/server/utils/QuarantineKiller.java delete mode 100644 openvidu-server/src/main/java/io/openvidu/server/utils/QuarantineKillerDummy.java diff --git a/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java b/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java index 5c73fbe1..9cac76ec 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java +++ b/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; -import io.openvidu.server.core.TokenRegister; import org.bouncycastle.util.Arrays; import org.kurento.jsonrpc.internal.server.config.JsonRpcConfiguration; import org.kurento.jsonrpc.server.JsonRpcConfigurer; @@ -53,6 +52,7 @@ import io.openvidu.server.config.OpenviduConfig.Error; import io.openvidu.server.core.SessionEventsHandler; import io.openvidu.server.core.SessionManager; import io.openvidu.server.core.TokenGenerator; +import io.openvidu.server.core.TokenRegister; import io.openvidu.server.coturn.CoturnCredentialsService; import io.openvidu.server.coturn.CoturnCredentialsServiceFactory; import io.openvidu.server.kurento.core.KurentoParticipantEndpointConfig; @@ -78,10 +78,8 @@ import io.openvidu.server.utils.GeoLocationByIp; import io.openvidu.server.utils.GeoLocationByIpDummy; import io.openvidu.server.utils.LocalCustomFileManager; import io.openvidu.server.utils.LocalDockerManager; -import io.openvidu.server.utils.MediaNodeStatusManager; -import io.openvidu.server.utils.MediaNodeStatusManagerDummy; -import io.openvidu.server.utils.QuarantineKiller; -import io.openvidu.server.utils.QuarantineKillerDummy; +import io.openvidu.server.utils.MediaNodeManager; +import io.openvidu.server.utils.MediaNodeManagerDummy; import io.openvidu.server.utils.SDPMunging; import io.openvidu.server.webhook.CDRLoggerWebhook; @@ -139,14 +137,15 @@ public class OpenViduServer implements JsonRpcConfigurer { @Bean @ConditionalOnMissingBean - @DependsOn({ "openviduConfig", "sessionManager", "mediaNodeStatusManager" }) - public KmsManager kmsManager(OpenviduConfig openviduConfig, SessionManager sessionManager) { + @DependsOn({ "openviduConfig", "sessionManager", "loadManager" }) + public KmsManager kmsManager(OpenviduConfig openviduConfig, SessionManager sessionManager, + LoadManager loadManager) { if (openviduConfig.getKmsUris().isEmpty()) { throw new IllegalArgumentException("'KMS_URIS' should contain at least one KMS url"); } String firstKmsWsUri = openviduConfig.getKmsUris().get(0); log.info("OpenVidu Server using one KMS: {}", firstKmsWsUri); - return new FixedOneKmsManager(sessionManager); + return new FixedOneKmsManager(sessionManager, loadManager); } @Bean @@ -236,14 +235,8 @@ public class OpenViduServer implements JsonRpcConfigurer { @Bean @ConditionalOnMissingBean - public QuarantineKiller quarantineKiller() { - return new QuarantineKillerDummy(); - } - - @Bean - @ConditionalOnMissingBean - public MediaNodeStatusManager mediaNodeStatusManager() { - return new MediaNodeStatusManagerDummy(); + public MediaNodeManager mediaNodeManager() { + return new MediaNodeManagerDummy(); } @Bean diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java index f4e1e51e..e5daacaa 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionManager.java @@ -62,7 +62,7 @@ import io.openvidu.server.recording.service.RecordingManager; import io.openvidu.server.utils.FormatChecker; import io.openvidu.server.utils.GeoLocation; import io.openvidu.server.utils.GeoLocationByIp; -import io.openvidu.server.utils.QuarantineKiller; +import io.openvidu.server.utils.MediaNodeManager; import io.openvidu.server.utils.UpdatableTimerTask; public abstract class SessionManager { @@ -88,7 +88,7 @@ public abstract class SessionManager { protected TokenRegister tokenRegister; @Autowired - protected QuarantineKiller quarantineKiller; + protected MediaNodeManager mediaNodeManager; @Autowired protected GeoLocationByIp geoLocationByIp; @@ -646,7 +646,7 @@ public abstract class SessionManager { log.info("Session '{}' removed and closed", session.getSessionId()); if (mediaNodeId != null) { - this.quarantineKiller.dropMediaNode(mediaNodeId); + this.mediaNodeManager.dropIdleMediaNode(mediaNodeId); } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java index 0c8cc91e..12fd8095 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java @@ -36,8 +36,8 @@ import io.openvidu.server.core.SessionManager; public class FixedOneKmsManager extends KmsManager { - public FixedOneKmsManager(SessionManager sessionManager) { - super(sessionManager); + public FixedOneKmsManager(SessionManager sessionManager, LoadManager loadManager) { + super(sessionManager, loadManager); } @Override @@ -45,21 +45,21 @@ public class FixedOneKmsManager extends KmsManager { throws Exception { KmsProperties firstProps = kmsProperties.get(0); KurentoClient kClient = null; - Kms kms = new Kms(firstProps, loadManager, quarantineKiller); + Kms kms = new Kms(firstProps, loadManager, mediaNodeManager); try { JsonRpcWSConnectionListener listener = this.generateKurentoConnectionListener(kms.getId()); JsonRpcClientNettyWebSocket client = new JsonRpcClientNettyWebSocket(firstProps.getUri(), listener); client.setTryReconnectingMaxTime(0); client.setTryReconnectingForever(false); kClient = KurentoClient.createFromJsonRpcClient(client); - this.addKms(kms); kms.setKurentoClient(kClient); // TODO: This should be done in KurentoClient connected event kms.setKurentoClientConnected(true); - kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); MediaServer mediaServer = kms.fetchMediaServerType(); + this.addKms(kms); + // Set Media Server in OpenVidu configuration this.openviduConfig.setMediaServer(mediaServer); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java index d67e5860..585e1886 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java @@ -39,7 +39,7 @@ import com.google.gson.JsonObject; import io.openvidu.java.client.RecordingProperties; import io.openvidu.server.core.MediaServer; import io.openvidu.server.kurento.core.KurentoSession; -import io.openvidu.server.utils.QuarantineKiller; +import io.openvidu.server.utils.MediaNodeManager; import io.openvidu.server.utils.RecordingUtils; import io.openvidu.server.utils.UpdatableTimerTask; @@ -65,7 +65,7 @@ public class Kms { private MediaServer mediaServer; private UpdatableTimerTask clientReconnectTimer; private LoadManager loadManager; - private QuarantineKiller quarantineKiller; + private MediaNodeManager mediaNodeManager; private boolean isFirstReconnectionAttempt = true; private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false); @@ -76,7 +76,7 @@ public class Kms { private Map activeRecordings = new ConcurrentHashMap<>(); private AtomicLong activeComposedRecordings = new AtomicLong(); - public Kms(KmsProperties props, LoadManager loadManager, QuarantineKiller quarantineKiller) { + public Kms(KmsProperties props, LoadManager loadManager, MediaNodeManager mediaNodeManager) { this.id = props.getId(); this.uri = props.getUri(); @@ -90,7 +90,7 @@ public class Kms { this.ip = url.getHost(); this.loadManager = loadManager; - this.quarantineKiller = quarantineKiller; + this.mediaNodeManager = mediaNodeManager; } public KurentoClient getKurentoClient() { @@ -142,7 +142,15 @@ public class Kms { } public void setKurentoClientConnected(boolean isConnected) { + final long timestamp = System.currentTimeMillis(); this.isKurentoClientConnected.set(isConnected); + if (isConnected) { + this.setTimeOfKurentoClientConnection(timestamp); + this.mediaNodeManager.mediaNodeUsageRegistration(this, timestamp); + } else { + this.setTimeOfKurentoClientDisconnection(timestamp); + this.mediaNodeManager.mediaNodeUsageDeregistration(this, timestamp); + } } public long getTimeOfKurentoClientConnection() { @@ -190,7 +198,7 @@ public class Kms { if (RecordingUtils.IS_COMPOSED(properties.outputMode())) { this.activeComposedRecordings.decrementAndGet(); } - this.quarantineKiller.dropMediaNode(this.id); + this.mediaNodeManager.dropIdleMediaNode(this.id); } public JsonObject toJson() { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index e89c98d5..195e74d0 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -48,8 +48,7 @@ import io.openvidu.server.core.Session; import io.openvidu.server.core.SessionEventsHandler; import io.openvidu.server.core.SessionManager; import io.openvidu.server.kurento.core.KurentoSession; -import io.openvidu.server.utils.MediaNodeStatusManager; -import io.openvidu.server.utils.QuarantineKiller; +import io.openvidu.server.utils.MediaNodeManager; import io.openvidu.server.utils.RemoteOperationUtils; import io.openvidu.server.utils.UpdatableTimerTask; @@ -100,13 +99,7 @@ public abstract class KmsManager { protected OpenviduConfig openviduConfig; @Autowired - protected LoadManager loadManager; - - @Autowired - protected QuarantineKiller quarantineKiller; - - @Autowired - protected MediaNodeStatusManager mediaNodeStatusManager; + protected MediaNodeManager mediaNodeManager; @Autowired protected SessionEventsHandler sessionEventsHandler; @@ -114,9 +107,11 @@ public abstract class KmsManager { final protected Map kmss = new ConcurrentHashMap<>(); protected SessionManager sessionManager; + protected LoadManager loadManager; - public KmsManager(SessionManager sessionManager) { + public KmsManager(SessionManager sessionManager, LoadManager loadManager) { this.sessionManager = sessionManager; + this.loadManager = loadManager; } public synchronized void addKms(Kms kms) { @@ -128,8 +123,9 @@ public abstract class KmsManager { } public synchronized Kms getLessLoadedConnectedAndRunningKms() throws NoSuchElementException { - List kmsLoads = getKmsLoads().stream().filter(kmsLoad -> kmsLoad.kms.isKurentoClientConnected() - && mediaNodeStatusManager.isRunning(kmsLoad.kms.getId())).collect(Collectors.toList()); + List kmsLoads = getKmsLoads().stream().filter( + kmsLoad -> kmsLoad.kms.isKurentoClientConnected() && mediaNodeManager.isRunning(kmsLoad.kms.getId())) + .collect(Collectors.toList()); if (kmsLoads.isEmpty()) { throw new NoSuchElementException(); } else { @@ -139,8 +135,7 @@ public abstract class KmsManager { public synchronized boolean atLeastOneConnectedAndRunningKms() { Optional optional = this.kmss.values().stream() - .filter(kms -> kms.isKurentoClientConnected() && mediaNodeStatusManager.isRunning(kms.getId())) - .findFirst(); + .filter(kms -> kms.isKurentoClientConnected() && mediaNodeManager.isRunning(kms.getId())).findFirst(); return optional.isPresent(); } @@ -183,7 +178,7 @@ public abstract class KmsManager { // TODO: This should be done here, not after KurentoClient#create method // returns, but it seems that this event is never triggered // kms.setKurentoClientConnected(true); - // kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); + // kms.fetchMediaServerType(); } @Override @@ -191,7 +186,6 @@ public abstract class KmsManager { final Kms kms = kmss.get(kmsId); log.error("Kurento Client \"connectionFailed\" event for KMS {} [{}]", kms.getUri(), kms.getKurentoClient().toString()); - kms.setKurentoClientConnected(false); } @Override @@ -224,7 +218,6 @@ public abstract class KmsManager { } kms.setKurentoClientConnected(false); - kms.setTimeOfKurentoClientDisconnection(System.currentTimeMillis()); disconnectionHandler(kms); } @@ -315,7 +308,6 @@ public abstract class KmsManager { kms.getKurentoClientReconnectTimer().cancelTimer(); kms.setKurentoClientConnected(true); - kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); diff --git a/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeStatusManager.java b/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeManager.java similarity index 51% rename from openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeStatusManager.java rename to openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeManager.java index cfad5e5c..08035553 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeStatusManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeManager.java @@ -1,6 +1,14 @@ package io.openvidu.server.utils; -public interface MediaNodeStatusManager { +import io.openvidu.server.kurento.kms.Kms; + +public interface MediaNodeManager { + + public void mediaNodeUsageRegistration(Kms kms, long timeOfConnection); + + public void mediaNodeUsageDeregistration(Kms kms, long timeOfDisconnection); + + public void dropIdleMediaNode(String mediaNodeId); public boolean isLaunching(String mediaNodeId); diff --git a/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeStatusManagerDummy.java b/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeManagerDummy.java similarity index 55% rename from openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeStatusManagerDummy.java rename to openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeManagerDummy.java index d0eacd25..230aa14e 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeStatusManagerDummy.java +++ b/openvidu-server/src/main/java/io/openvidu/server/utils/MediaNodeManagerDummy.java @@ -1,6 +1,20 @@ package io.openvidu.server.utils; -public class MediaNodeStatusManagerDummy implements MediaNodeStatusManager { +import io.openvidu.server.kurento.kms.Kms; + +public class MediaNodeManagerDummy implements MediaNodeManager { + + @Override + public void mediaNodeUsageRegistration(Kms kms, long timeOfConnection) { + } + + @Override + public void mediaNodeUsageDeregistration(Kms kms, long timeOfDisconnection) { + } + + @Override + public void dropIdleMediaNode(String mediaNodeId) { + } @Override public boolean isLaunching(String mediaNodeId) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/utils/QuarantineKiller.java b/openvidu-server/src/main/java/io/openvidu/server/utils/QuarantineKiller.java deleted file mode 100644 index a757514b..00000000 --- a/openvidu-server/src/main/java/io/openvidu/server/utils/QuarantineKiller.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.openvidu.server.utils; - -public interface QuarantineKiller { - - public void dropMediaNode(String mediaNodeId); - -} diff --git a/openvidu-server/src/main/java/io/openvidu/server/utils/QuarantineKillerDummy.java b/openvidu-server/src/main/java/io/openvidu/server/utils/QuarantineKillerDummy.java deleted file mode 100644 index 8bfad9a5..00000000 --- a/openvidu-server/src/main/java/io/openvidu/server/utils/QuarantineKillerDummy.java +++ /dev/null @@ -1,9 +0,0 @@ -package io.openvidu.server.utils; - -public class QuarantineKillerDummy implements QuarantineKiller { - - @Override - public void dropMediaNode(String mediaNodeId) { - } - -} diff --git a/openvidu-server/src/test/java/io/openvidu/server/test/integration/config/IntegrationTestConfiguration.java b/openvidu-server/src/test/java/io/openvidu/server/test/integration/config/IntegrationTestConfiguration.java index 3d0d5256..20d998df 100644 --- a/openvidu-server/src/test/java/io/openvidu/server/test/integration/config/IntegrationTestConfiguration.java +++ b/openvidu-server/src/test/java/io/openvidu/server/test/integration/config/IntegrationTestConfiguration.java @@ -21,10 +21,12 @@ import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import io.openvidu.server.kurento.core.KurentoSessionManager; +import io.openvidu.server.kurento.kms.DummyLoadManager; import io.openvidu.server.kurento.kms.FixedOneKmsManager; import io.openvidu.server.kurento.kms.Kms; import io.openvidu.server.kurento.kms.KmsManager; import io.openvidu.server.kurento.kms.KmsProperties; +import io.openvidu.server.utils.MediaNodeManager; /** * KmsManager bean mock @@ -36,13 +38,13 @@ public class IntegrationTestConfiguration { @Bean public KmsManager kmsManager() throws Exception { - final KmsManager spy = Mockito.spy(new FixedOneKmsManager(new KurentoSessionManager())); + final KmsManager spy = Mockito.spy(new FixedOneKmsManager(new KurentoSessionManager(), new DummyLoadManager())); doAnswer(invocation -> { List successfullyConnectedKmss = new ArrayList<>(); List kmsProperties = invocation.getArgument(0); for (KmsProperties kmsProp : kmsProperties) { Kms kms = new Kms(kmsProp, Whitebox.getInternalState(spy, "loadManager"), - Whitebox.getInternalState(spy, "quarantineKiller")); + Whitebox.getInternalState(spy, "mediaNodeManager")); KurentoClient kClient = mock(KurentoClient.class); doAnswer(i -> { @@ -60,7 +62,6 @@ public class IntegrationTestConfiguration { kms.setKurentoClient(kClient); kms.setKurentoClientConnected(true); - kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); kms.fetchMediaServerType(); spy.addKms(kms);