diff --git a/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java b/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java index d9b381c9..d957f4e9 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java +++ b/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java @@ -62,7 +62,6 @@ import io.openvidu.server.kurento.core.KurentoSessionManager; import io.openvidu.server.kurento.kms.DummyLoadManager; import io.openvidu.server.kurento.kms.FixedOneKmsManager; import io.openvidu.server.kurento.kms.KmsManager; -import io.openvidu.server.kurento.kms.LoadManager; import io.openvidu.server.recording.service.RecordingManager; import io.openvidu.server.rpc.RpcHandler; import io.openvidu.server.rpc.RpcNotificationService; @@ -105,13 +104,7 @@ public class OpenViduServer implements JsonRpcConfigurer { String firstKmsWsUri = kmsWsUris.get(0); log.info("OpenVidu Server using one KMS: {}", firstKmsWsUri); - return new FixedOneKmsManager(firstKmsWsUri); - } - - @Bean - @ConditionalOnMissingBean - public LoadManager loadManager() { - return new DummyLoadManager(); + return new FixedOneKmsManager(firstKmsWsUri, new DummyLoadManager()); } @Bean diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java index 2e053434..56b4df27 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java @@ -84,12 +84,16 @@ public class KurentoParticipant extends Participant { this.session = kurentoSession; if (!OpenViduRole.SUBSCRIBER.equals(participant.getToken().getRole())) { + // Initialize a PublisherEndpoint this.publisher = new PublisherEndpoint(webParticipant, this, participant.getParticipantPublicId(), this.session.getPipeline(), this.openviduConfig); } for (Participant other : session.getParticipants()) { - if (!other.getParticipantPublicId().equals(this.getParticipantPublicId())) { + if (!other.getParticipantPublicId().equals(this.getParticipantPublicId()) + && !OpenViduRole.SUBSCRIBER.equals(other.getToken().getRole())) { + // Initialize a SubscriberEndpoint for each other user connected with PUBLISHER + // or MODERATOR role getNewOrExistingSubscriber(other.getParticipantPublicId()); } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java index 760adbdb..6eb72675 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java @@ -159,6 +159,9 @@ public class KurentoSession extends Session { kms.getKurentoClient().destroy(); } + // Also disassociate the KurentoSession from the Kms + kms.removeKurentoSession(this.sessionId); + this.closed = true; return true; } else { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index 3700c2f5..fe3580a1 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -503,6 +503,10 @@ public class KurentoSessionManager extends SessionManager { log.warn("Session '{}' has just been created by another thread", session.getSessionId()); return oldSession; } + + // Also associate the KurentoSession with the Kms + kms.addKurentoSession(session); + log.warn("No session '{}' exists yet. Created one on KMS '{}'", session.getSessionId(), kms.getUri()); sessionEventsHandler.onSessionCreated(session); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java index 855e5898..ced03c23 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java @@ -479,8 +479,6 @@ public abstract class MediaEndpoint { }); } - public abstract PublisherEndpoint getPublisher(); - public JsonObject toJson() { JsonObject json = new JsonObject(); json.addProperty("createdAt", this.createdAt); diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java index b1c8f7c2..e300d11d 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java @@ -537,11 +537,6 @@ public class PublisherEndpoint extends MediaEndpoint { } } - @Override - public PublisherEndpoint getPublisher() { - return this; - } - public MediaOptions getMediaOptions() { return mediaOptions; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java index e2199d3f..6f19ba0b 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/SubscriberEndpoint.java @@ -18,6 +18,7 @@ package io.openvidu.server.kurento.endpoint; import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicBoolean; import org.kurento.client.MediaPipeline; import org.slf4j.Logger; @@ -37,7 +38,7 @@ import io.openvidu.server.kurento.core.KurentoParticipant; public class SubscriberEndpoint extends MediaEndpoint { private final static Logger log = LoggerFactory.getLogger(SubscriberEndpoint.class); - private boolean connectedToPublisher = false; + private AtomicBoolean connectedToPublisher = new AtomicBoolean(false); private PublisherEndpoint publisher = null; @@ -58,16 +59,11 @@ public class SubscriberEndpoint extends MediaEndpoint { } public boolean isConnectedToPublisher() { - return connectedToPublisher; + return connectedToPublisher.get(); } public void setConnectedToPublisher(boolean connectedToPublisher) { - this.connectedToPublisher = connectedToPublisher; - } - - @Override - public PublisherEndpoint getPublisher() { - return this.publisher; + this.connectedToPublisher.set(connectedToPublisher); } public void setPublisher(PublisherEndpoint publisher) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/DummyLoadManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/DummyLoadManager.java index b8b2411b..fcb8e3ea 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/DummyLoadManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/DummyLoadManager.java @@ -17,7 +17,7 @@ package io.openvidu.server.kurento.kms; -public class DummyLoadManager extends LoadManager { +public class DummyLoadManager implements LoadManager { @Override public double calculateLoad(Kms kms) { diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java index 6365051f..e48f405b 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java @@ -23,12 +23,10 @@ public class FixedOneKmsManager extends KmsManager { String kmsWsUri; - public FixedOneKmsManager(String kmsWsUri) { + public FixedOneKmsManager(String kmsWsUri, LoadManager loadManager) { + super(loadManager); this.kmsWsUri = kmsWsUri; - } - @Override - protected void initializeKurentoClients() { KurentoClient kClient = KurentoClient.create(kmsWsUri, this.generateKurentoConnectionListener(kmsWsUri)); this.addKms(new Kms(kmsWsUri, kClient, loadManager)); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java index 2491e636..f093cdff 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java @@ -17,13 +17,21 @@ package io.openvidu.server.kurento.kms; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.kurento.client.KurentoClient; +import org.kurento.client.ModuleInfo; +import org.kurento.client.ServerInfo; +import com.google.gson.JsonArray; import com.google.gson.JsonObject; +import io.openvidu.server.kurento.core.KurentoSession; + /** * Abstraction of a KMS instance: an object of this class corresponds to a KMS * process running somewhere. @@ -44,6 +52,8 @@ public class Kms { private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false); private AtomicLong timeOfKurentoClientDisconnection = new AtomicLong(0); + private Map kurentoSessions = new ConcurrentHashMap<>(); + public Kms(String kmsUri, KurentoClient client, LoadManager loadManager) { this.kmsUri = kmsUri; this.client = client; @@ -82,11 +92,58 @@ public class Kms { this.timeOfKurentoClientDisconnection.set(time); } + public Collection getKurentoSessions() { + return this.kurentoSessions.values(); + } + + public void addKurentoSession(KurentoSession session) { + this.kurentoSessions.put(session.getSessionId(), session); + } + + public void removeKurentoSession(String sessionId) { + this.kurentoSessions.remove(sessionId); + } + public JsonObject toJson() { JsonObject json = new JsonObject(); json.addProperty("id", this.kmsUri); json.addProperty("uri", this.kmsUri); - // json.addProperty("createdAt", this.client.getServerManager().getInfo().getVersion()); + return json; + } + + public JsonObject toJsonExtended(boolean withSessions, boolean withExtraInfo) { + + JsonObject json = this.toJson(); + + if (withSessions) { + JsonArray sessions = new JsonArray(); + for (KurentoSession session : this.kurentoSessions.values()) { + sessions.add(session.toJson()); + } + json.add("sessions", sessions); + } + + if (withExtraInfo) { + json.addProperty("memory", this.client.getServerManager().getUsedMemory() / 1024); + + ServerInfo info = this.client.getServerManager().getInfo(); + json.addProperty("version", info.getVersion()); + json.addProperty("capabilities", info.getCapabilities().toString()); + + JsonArray modules = new JsonArray(); + for (ModuleInfo moduleInfo : info.getModules()) { + JsonObject moduleJson = new JsonObject(); + moduleJson.addProperty("name", moduleInfo.getName()); + moduleJson.addProperty("version", moduleInfo.getVersion()); + moduleJson.addProperty("generationTime", moduleInfo.getGenerationTime()); + JsonArray factories = new JsonArray(); + moduleInfo.getFactories().forEach(fact -> factories.add(fact)); + moduleJson.add("factories", factories); + modules.add(moduleJson); + } + json.add("modules", modules); + } + return json; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index ab323532..b821689c 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -25,8 +25,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import javax.annotation.PostConstruct; - import org.kurento.client.KurentoConnectionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,12 +67,17 @@ public abstract class KmsManager { return json; } + public JsonObject toJsonExtended(boolean withSessions, boolean withExtraInfo) { + JsonObject json = this.kms.toJsonExtended(withSessions, withExtraInfo); + json.addProperty("load", this.load); + return json; + } + } @Autowired protected SessionManager sessionManager; - @Autowired protected LoadManager loadManager; private static final Logger log = LoggerFactory.getLogger(KmsManager.class); @@ -84,6 +87,10 @@ public abstract class KmsManager { private Iterator usageIterator = null; + public KmsManager(LoadManager loadManager) { + this.loadManager = loadManager; + } + public synchronized void addKms(Kms kms) { this.kmss.put(kms.getUri(), kms); } @@ -200,11 +207,4 @@ public abstract class KmsManager { }; } - protected abstract void initializeKurentoClients(); - - @PostConstruct - private void postConstruct() { - initializeKurentoClients(); - } - } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/LoadManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/LoadManager.java index 9b723405..db9bddc8 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/LoadManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/LoadManager.java @@ -17,17 +17,10 @@ package io.openvidu.server.kurento.kms; -import org.springframework.beans.factory.annotation.Autowired; +public interface LoadManager { -import io.openvidu.server.core.SessionManager; + public double calculateLoad(Kms kms); -public abstract class LoadManager { - - @Autowired - protected SessionManager sessionManager; - - protected abstract double calculateLoad(Kms kms); - - protected abstract boolean allowMoreElements(Kms kms); + public boolean allowMoreElements(Kms kms); } \ No newline at end of file