openvidu-server: MediaNodeManager

pull/707/head
pabloFuente 2022-03-21 13:49:30 +01:00
parent 592cec9d10
commit 9d975d3a17
10 changed files with 68 additions and 68 deletions

View File

@ -24,7 +24,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import io.openvidu.server.core.TokenRegister;
import org.bouncycastle.util.Arrays; import org.bouncycastle.util.Arrays;
import org.kurento.jsonrpc.internal.server.config.JsonRpcConfiguration; import org.kurento.jsonrpc.internal.server.config.JsonRpcConfiguration;
import org.kurento.jsonrpc.server.JsonRpcConfigurer; import org.kurento.jsonrpc.server.JsonRpcConfigurer;
@ -53,6 +52,7 @@ import io.openvidu.server.config.OpenviduConfig.Error;
import io.openvidu.server.core.SessionEventsHandler; import io.openvidu.server.core.SessionEventsHandler;
import io.openvidu.server.core.SessionManager; import io.openvidu.server.core.SessionManager;
import io.openvidu.server.core.TokenGenerator; import io.openvidu.server.core.TokenGenerator;
import io.openvidu.server.core.TokenRegister;
import io.openvidu.server.coturn.CoturnCredentialsService; import io.openvidu.server.coturn.CoturnCredentialsService;
import io.openvidu.server.coturn.CoturnCredentialsServiceFactory; import io.openvidu.server.coturn.CoturnCredentialsServiceFactory;
import io.openvidu.server.kurento.core.KurentoParticipantEndpointConfig; import io.openvidu.server.kurento.core.KurentoParticipantEndpointConfig;
@ -78,10 +78,8 @@ import io.openvidu.server.utils.GeoLocationByIp;
import io.openvidu.server.utils.GeoLocationByIpDummy; import io.openvidu.server.utils.GeoLocationByIpDummy;
import io.openvidu.server.utils.LocalCustomFileManager; import io.openvidu.server.utils.LocalCustomFileManager;
import io.openvidu.server.utils.LocalDockerManager; import io.openvidu.server.utils.LocalDockerManager;
import io.openvidu.server.utils.MediaNodeStatusManager; import io.openvidu.server.utils.MediaNodeManager;
import io.openvidu.server.utils.MediaNodeStatusManagerDummy; import io.openvidu.server.utils.MediaNodeManagerDummy;
import io.openvidu.server.utils.QuarantineKiller;
import io.openvidu.server.utils.QuarantineKillerDummy;
import io.openvidu.server.utils.SDPMunging; import io.openvidu.server.utils.SDPMunging;
import io.openvidu.server.webhook.CDRLoggerWebhook; import io.openvidu.server.webhook.CDRLoggerWebhook;
@ -139,14 +137,15 @@ public class OpenViduServer implements JsonRpcConfigurer {
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
@DependsOn({ "openviduConfig", "sessionManager", "mediaNodeStatusManager" }) @DependsOn({ "openviduConfig", "sessionManager", "loadManager" })
public KmsManager kmsManager(OpenviduConfig openviduConfig, SessionManager sessionManager) { public KmsManager kmsManager(OpenviduConfig openviduConfig, SessionManager sessionManager,
LoadManager loadManager) {
if (openviduConfig.getKmsUris().isEmpty()) { if (openviduConfig.getKmsUris().isEmpty()) {
throw new IllegalArgumentException("'KMS_URIS' should contain at least one KMS url"); throw new IllegalArgumentException("'KMS_URIS' should contain at least one KMS url");
} }
String firstKmsWsUri = openviduConfig.getKmsUris().get(0); String firstKmsWsUri = openviduConfig.getKmsUris().get(0);
log.info("OpenVidu Server using one KMS: {}", firstKmsWsUri); log.info("OpenVidu Server using one KMS: {}", firstKmsWsUri);
return new FixedOneKmsManager(sessionManager); return new FixedOneKmsManager(sessionManager, loadManager);
} }
@Bean @Bean
@ -236,14 +235,8 @@ public class OpenViduServer implements JsonRpcConfigurer {
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
public QuarantineKiller quarantineKiller() { public MediaNodeManager mediaNodeManager() {
return new QuarantineKillerDummy(); return new MediaNodeManagerDummy();
}
@Bean
@ConditionalOnMissingBean
public MediaNodeStatusManager mediaNodeStatusManager() {
return new MediaNodeStatusManagerDummy();
} }
@Bean @Bean

View File

@ -62,7 +62,7 @@ import io.openvidu.server.recording.service.RecordingManager;
import io.openvidu.server.utils.FormatChecker; import io.openvidu.server.utils.FormatChecker;
import io.openvidu.server.utils.GeoLocation; import io.openvidu.server.utils.GeoLocation;
import io.openvidu.server.utils.GeoLocationByIp; import io.openvidu.server.utils.GeoLocationByIp;
import io.openvidu.server.utils.QuarantineKiller; import io.openvidu.server.utils.MediaNodeManager;
import io.openvidu.server.utils.UpdatableTimerTask; import io.openvidu.server.utils.UpdatableTimerTask;
public abstract class SessionManager { public abstract class SessionManager {
@ -88,7 +88,7 @@ public abstract class SessionManager {
protected TokenRegister tokenRegister; protected TokenRegister tokenRegister;
@Autowired @Autowired
protected QuarantineKiller quarantineKiller; protected MediaNodeManager mediaNodeManager;
@Autowired @Autowired
protected GeoLocationByIp geoLocationByIp; protected GeoLocationByIp geoLocationByIp;
@ -646,7 +646,7 @@ public abstract class SessionManager {
log.info("Session '{}' removed and closed", session.getSessionId()); log.info("Session '{}' removed and closed", session.getSessionId());
if (mediaNodeId != null) { if (mediaNodeId != null) {
this.quarantineKiller.dropMediaNode(mediaNodeId); this.mediaNodeManager.dropIdleMediaNode(mediaNodeId);
} }
} }

View File

@ -36,8 +36,8 @@ import io.openvidu.server.core.SessionManager;
public class FixedOneKmsManager extends KmsManager { public class FixedOneKmsManager extends KmsManager {
public FixedOneKmsManager(SessionManager sessionManager) { public FixedOneKmsManager(SessionManager sessionManager, LoadManager loadManager) {
super(sessionManager); super(sessionManager, loadManager);
} }
@Override @Override
@ -45,21 +45,21 @@ public class FixedOneKmsManager extends KmsManager {
throws Exception { throws Exception {
KmsProperties firstProps = kmsProperties.get(0); KmsProperties firstProps = kmsProperties.get(0);
KurentoClient kClient = null; KurentoClient kClient = null;
Kms kms = new Kms(firstProps, loadManager, quarantineKiller); Kms kms = new Kms(firstProps, loadManager, mediaNodeManager);
try { try {
JsonRpcWSConnectionListener listener = this.generateKurentoConnectionListener(kms.getId()); JsonRpcWSConnectionListener listener = this.generateKurentoConnectionListener(kms.getId());
JsonRpcClientNettyWebSocket client = new JsonRpcClientNettyWebSocket(firstProps.getUri(), listener); JsonRpcClientNettyWebSocket client = new JsonRpcClientNettyWebSocket(firstProps.getUri(), listener);
client.setTryReconnectingMaxTime(0); client.setTryReconnectingMaxTime(0);
client.setTryReconnectingForever(false); client.setTryReconnectingForever(false);
kClient = KurentoClient.createFromJsonRpcClient(client); kClient = KurentoClient.createFromJsonRpcClient(client);
this.addKms(kms);
kms.setKurentoClient(kClient); kms.setKurentoClient(kClient);
// TODO: This should be done in KurentoClient connected event // TODO: This should be done in KurentoClient connected event
kms.setKurentoClientConnected(true); kms.setKurentoClientConnected(true);
kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
MediaServer mediaServer = kms.fetchMediaServerType(); MediaServer mediaServer = kms.fetchMediaServerType();
this.addKms(kms);
// Set Media Server in OpenVidu configuration // Set Media Server in OpenVidu configuration
this.openviduConfig.setMediaServer(mediaServer); this.openviduConfig.setMediaServer(mediaServer);

View File

@ -39,7 +39,7 @@ import com.google.gson.JsonObject;
import io.openvidu.java.client.RecordingProperties; import io.openvidu.java.client.RecordingProperties;
import io.openvidu.server.core.MediaServer; import io.openvidu.server.core.MediaServer;
import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.utils.QuarantineKiller; import io.openvidu.server.utils.MediaNodeManager;
import io.openvidu.server.utils.RecordingUtils; import io.openvidu.server.utils.RecordingUtils;
import io.openvidu.server.utils.UpdatableTimerTask; import io.openvidu.server.utils.UpdatableTimerTask;
@ -65,7 +65,7 @@ public class Kms {
private MediaServer mediaServer; private MediaServer mediaServer;
private UpdatableTimerTask clientReconnectTimer; private UpdatableTimerTask clientReconnectTimer;
private LoadManager loadManager; private LoadManager loadManager;
private QuarantineKiller quarantineKiller; private MediaNodeManager mediaNodeManager;
private boolean isFirstReconnectionAttempt = true; private boolean isFirstReconnectionAttempt = true;
private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false); private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false);
@ -76,7 +76,7 @@ public class Kms {
private Map<String, String> activeRecordings = new ConcurrentHashMap<>(); private Map<String, String> activeRecordings = new ConcurrentHashMap<>();
private AtomicLong activeComposedRecordings = new AtomicLong(); private AtomicLong activeComposedRecordings = new AtomicLong();
public Kms(KmsProperties props, LoadManager loadManager, QuarantineKiller quarantineKiller) { public Kms(KmsProperties props, LoadManager loadManager, MediaNodeManager mediaNodeManager) {
this.id = props.getId(); this.id = props.getId();
this.uri = props.getUri(); this.uri = props.getUri();
@ -90,7 +90,7 @@ public class Kms {
this.ip = url.getHost(); this.ip = url.getHost();
this.loadManager = loadManager; this.loadManager = loadManager;
this.quarantineKiller = quarantineKiller; this.mediaNodeManager = mediaNodeManager;
} }
public KurentoClient getKurentoClient() { public KurentoClient getKurentoClient() {
@ -142,7 +142,15 @@ public class Kms {
} }
public void setKurentoClientConnected(boolean isConnected) { public void setKurentoClientConnected(boolean isConnected) {
final long timestamp = System.currentTimeMillis();
this.isKurentoClientConnected.set(isConnected); this.isKurentoClientConnected.set(isConnected);
if (isConnected) {
this.setTimeOfKurentoClientConnection(timestamp);
this.mediaNodeManager.mediaNodeUsageRegistration(this, timestamp);
} else {
this.setTimeOfKurentoClientDisconnection(timestamp);
this.mediaNodeManager.mediaNodeUsageDeregistration(this, timestamp);
}
} }
public long getTimeOfKurentoClientConnection() { public long getTimeOfKurentoClientConnection() {
@ -190,7 +198,7 @@ public class Kms {
if (RecordingUtils.IS_COMPOSED(properties.outputMode())) { if (RecordingUtils.IS_COMPOSED(properties.outputMode())) {
this.activeComposedRecordings.decrementAndGet(); this.activeComposedRecordings.decrementAndGet();
} }
this.quarantineKiller.dropMediaNode(this.id); this.mediaNodeManager.dropIdleMediaNode(this.id);
} }
public JsonObject toJson() { public JsonObject toJson() {

View File

@ -48,8 +48,7 @@ import io.openvidu.server.core.Session;
import io.openvidu.server.core.SessionEventsHandler; import io.openvidu.server.core.SessionEventsHandler;
import io.openvidu.server.core.SessionManager; import io.openvidu.server.core.SessionManager;
import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.utils.MediaNodeStatusManager; import io.openvidu.server.utils.MediaNodeManager;
import io.openvidu.server.utils.QuarantineKiller;
import io.openvidu.server.utils.RemoteOperationUtils; import io.openvidu.server.utils.RemoteOperationUtils;
import io.openvidu.server.utils.UpdatableTimerTask; import io.openvidu.server.utils.UpdatableTimerTask;
@ -100,13 +99,7 @@ public abstract class KmsManager {
protected OpenviduConfig openviduConfig; protected OpenviduConfig openviduConfig;
@Autowired @Autowired
protected LoadManager loadManager; protected MediaNodeManager mediaNodeManager;
@Autowired
protected QuarantineKiller quarantineKiller;
@Autowired
protected MediaNodeStatusManager mediaNodeStatusManager;
@Autowired @Autowired
protected SessionEventsHandler sessionEventsHandler; protected SessionEventsHandler sessionEventsHandler;
@ -114,9 +107,11 @@ public abstract class KmsManager {
final protected Map<String, Kms> kmss = new ConcurrentHashMap<>(); final protected Map<String, Kms> kmss = new ConcurrentHashMap<>();
protected SessionManager sessionManager; protected SessionManager sessionManager;
protected LoadManager loadManager;
public KmsManager(SessionManager sessionManager) { public KmsManager(SessionManager sessionManager, LoadManager loadManager) {
this.sessionManager = sessionManager; this.sessionManager = sessionManager;
this.loadManager = loadManager;
} }
public synchronized void addKms(Kms kms) { public synchronized void addKms(Kms kms) {
@ -128,8 +123,9 @@ public abstract class KmsManager {
} }
public synchronized Kms getLessLoadedConnectedAndRunningKms() throws NoSuchElementException { public synchronized Kms getLessLoadedConnectedAndRunningKms() throws NoSuchElementException {
List<KmsLoad> kmsLoads = getKmsLoads().stream().filter(kmsLoad -> kmsLoad.kms.isKurentoClientConnected() List<KmsLoad> kmsLoads = getKmsLoads().stream().filter(
&& mediaNodeStatusManager.isRunning(kmsLoad.kms.getId())).collect(Collectors.toList()); kmsLoad -> kmsLoad.kms.isKurentoClientConnected() && mediaNodeManager.isRunning(kmsLoad.kms.getId()))
.collect(Collectors.toList());
if (kmsLoads.isEmpty()) { if (kmsLoads.isEmpty()) {
throw new NoSuchElementException(); throw new NoSuchElementException();
} else { } else {
@ -139,8 +135,7 @@ public abstract class KmsManager {
public synchronized boolean atLeastOneConnectedAndRunningKms() { public synchronized boolean atLeastOneConnectedAndRunningKms() {
Optional<Kms> optional = this.kmss.values().stream() Optional<Kms> optional = this.kmss.values().stream()
.filter(kms -> kms.isKurentoClientConnected() && mediaNodeStatusManager.isRunning(kms.getId())) .filter(kms -> kms.isKurentoClientConnected() && mediaNodeManager.isRunning(kms.getId())).findFirst();
.findFirst();
return optional.isPresent(); return optional.isPresent();
} }
@ -183,7 +178,7 @@ public abstract class KmsManager {
// TODO: This should be done here, not after KurentoClient#create method // TODO: This should be done here, not after KurentoClient#create method
// returns, but it seems that this event is never triggered // returns, but it seems that this event is never triggered
// kms.setKurentoClientConnected(true); // kms.setKurentoClientConnected(true);
// kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); // kms.fetchMediaServerType();
} }
@Override @Override
@ -191,7 +186,6 @@ public abstract class KmsManager {
final Kms kms = kmss.get(kmsId); final Kms kms = kmss.get(kmsId);
log.error("Kurento Client \"connectionFailed\" event for KMS {} [{}]", kms.getUri(), log.error("Kurento Client \"connectionFailed\" event for KMS {} [{}]", kms.getUri(),
kms.getKurentoClient().toString()); kms.getKurentoClient().toString());
kms.setKurentoClientConnected(false);
} }
@Override @Override
@ -224,7 +218,6 @@ public abstract class KmsManager {
} }
kms.setKurentoClientConnected(false); kms.setKurentoClientConnected(false);
kms.setTimeOfKurentoClientDisconnection(System.currentTimeMillis());
disconnectionHandler(kms); disconnectionHandler(kms);
} }
@ -315,7 +308,6 @@ public abstract class KmsManager {
kms.getKurentoClientReconnectTimer().cancelTimer(); kms.getKurentoClientReconnectTimer().cancelTimer();
kms.setKurentoClientConnected(true); kms.setKurentoClientConnected(true);
kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();

View File

@ -1,6 +1,14 @@
package io.openvidu.server.utils; package io.openvidu.server.utils;
public interface MediaNodeStatusManager { import io.openvidu.server.kurento.kms.Kms;
public interface MediaNodeManager {
public void mediaNodeUsageRegistration(Kms kms, long timeOfConnection);
public void mediaNodeUsageDeregistration(Kms kms, long timeOfDisconnection);
public void dropIdleMediaNode(String mediaNodeId);
public boolean isLaunching(String mediaNodeId); public boolean isLaunching(String mediaNodeId);

View File

@ -1,6 +1,20 @@
package io.openvidu.server.utils; package io.openvidu.server.utils;
public class MediaNodeStatusManagerDummy implements MediaNodeStatusManager { import io.openvidu.server.kurento.kms.Kms;
public class MediaNodeManagerDummy implements MediaNodeManager {
@Override
public void mediaNodeUsageRegistration(Kms kms, long timeOfConnection) {
}
@Override
public void mediaNodeUsageDeregistration(Kms kms, long timeOfDisconnection) {
}
@Override
public void dropIdleMediaNode(String mediaNodeId) {
}
@Override @Override
public boolean isLaunching(String mediaNodeId) { public boolean isLaunching(String mediaNodeId) {

View File

@ -1,7 +0,0 @@
package io.openvidu.server.utils;
public interface QuarantineKiller {
public void dropMediaNode(String mediaNodeId);
}

View File

@ -1,9 +0,0 @@
package io.openvidu.server.utils;
public class QuarantineKillerDummy implements QuarantineKiller {
@Override
public void dropMediaNode(String mediaNodeId) {
}
}

View File

@ -21,10 +21,12 @@ import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import io.openvidu.server.kurento.core.KurentoSessionManager; import io.openvidu.server.kurento.core.KurentoSessionManager;
import io.openvidu.server.kurento.kms.DummyLoadManager;
import io.openvidu.server.kurento.kms.FixedOneKmsManager; import io.openvidu.server.kurento.kms.FixedOneKmsManager;
import io.openvidu.server.kurento.kms.Kms; import io.openvidu.server.kurento.kms.Kms;
import io.openvidu.server.kurento.kms.KmsManager; import io.openvidu.server.kurento.kms.KmsManager;
import io.openvidu.server.kurento.kms.KmsProperties; import io.openvidu.server.kurento.kms.KmsProperties;
import io.openvidu.server.utils.MediaNodeManager;
/** /**
* KmsManager bean mock * KmsManager bean mock
@ -36,13 +38,13 @@ public class IntegrationTestConfiguration {
@Bean @Bean
public KmsManager kmsManager() throws Exception { public KmsManager kmsManager() throws Exception {
final KmsManager spy = Mockito.spy(new FixedOneKmsManager(new KurentoSessionManager())); final KmsManager spy = Mockito.spy(new FixedOneKmsManager(new KurentoSessionManager(), new DummyLoadManager()));
doAnswer(invocation -> { doAnswer(invocation -> {
List<Kms> successfullyConnectedKmss = new ArrayList<>(); List<Kms> successfullyConnectedKmss = new ArrayList<>();
List<KmsProperties> kmsProperties = invocation.getArgument(0); List<KmsProperties> kmsProperties = invocation.getArgument(0);
for (KmsProperties kmsProp : kmsProperties) { for (KmsProperties kmsProp : kmsProperties) {
Kms kms = new Kms(kmsProp, Whitebox.getInternalState(spy, "loadManager"), Kms kms = new Kms(kmsProp, Whitebox.getInternalState(spy, "loadManager"),
Whitebox.getInternalState(spy, "quarantineKiller")); Whitebox.getInternalState(spy, "mediaNodeManager"));
KurentoClient kClient = mock(KurentoClient.class); KurentoClient kClient = mock(KurentoClient.class);
doAnswer(i -> { doAnswer(i -> {
@ -60,7 +62,6 @@ public class IntegrationTestConfiguration {
kms.setKurentoClient(kClient); kms.setKurentoClient(kClient);
kms.setKurentoClientConnected(true); kms.setKurentoClientConnected(true);
kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
kms.fetchMediaServerType(); kms.fetchMediaServerType();
spy.addKms(kms); spy.addKms(kms);