mirror of https://github.com/OpenVidu/openvidu.git
openvidu-server: store KurentoSessions in Kmss
parent
64949012e4
commit
4985847511
|
@ -62,7 +62,6 @@ import io.openvidu.server.kurento.core.KurentoSessionManager;
|
||||||
import io.openvidu.server.kurento.kms.DummyLoadManager;
|
import io.openvidu.server.kurento.kms.DummyLoadManager;
|
||||||
import io.openvidu.server.kurento.kms.FixedOneKmsManager;
|
import io.openvidu.server.kurento.kms.FixedOneKmsManager;
|
||||||
import io.openvidu.server.kurento.kms.KmsManager;
|
import io.openvidu.server.kurento.kms.KmsManager;
|
||||||
import io.openvidu.server.kurento.kms.LoadManager;
|
|
||||||
import io.openvidu.server.recording.service.RecordingManager;
|
import io.openvidu.server.recording.service.RecordingManager;
|
||||||
import io.openvidu.server.rpc.RpcHandler;
|
import io.openvidu.server.rpc.RpcHandler;
|
||||||
import io.openvidu.server.rpc.RpcNotificationService;
|
import io.openvidu.server.rpc.RpcNotificationService;
|
||||||
|
@ -105,13 +104,7 @@ public class OpenViduServer implements JsonRpcConfigurer {
|
||||||
|
|
||||||
String firstKmsWsUri = kmsWsUris.get(0);
|
String firstKmsWsUri = kmsWsUris.get(0);
|
||||||
log.info("OpenVidu Server using one KMS: {}", firstKmsWsUri);
|
log.info("OpenVidu Server using one KMS: {}", firstKmsWsUri);
|
||||||
return new FixedOneKmsManager(firstKmsWsUri);
|
return new FixedOneKmsManager(firstKmsWsUri, new DummyLoadManager());
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
@ConditionalOnMissingBean
|
|
||||||
public LoadManager loadManager() {
|
|
||||||
return new DummyLoadManager();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
|
|
|
@ -84,12 +84,16 @@ public class KurentoParticipant extends Participant {
|
||||||
this.session = kurentoSession;
|
this.session = kurentoSession;
|
||||||
|
|
||||||
if (!OpenViduRole.SUBSCRIBER.equals(participant.getToken().getRole())) {
|
if (!OpenViduRole.SUBSCRIBER.equals(participant.getToken().getRole())) {
|
||||||
|
// Initialize a PublisherEndpoint
|
||||||
this.publisher = new PublisherEndpoint(webParticipant, this, participant.getParticipantPublicId(),
|
this.publisher = new PublisherEndpoint(webParticipant, this, participant.getParticipantPublicId(),
|
||||||
this.session.getPipeline(), this.openviduConfig);
|
this.session.getPipeline(), this.openviduConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (Participant other : session.getParticipants()) {
|
for (Participant other : session.getParticipants()) {
|
||||||
if (!other.getParticipantPublicId().equals(this.getParticipantPublicId())) {
|
if (!other.getParticipantPublicId().equals(this.getParticipantPublicId())
|
||||||
|
&& !OpenViduRole.SUBSCRIBER.equals(other.getToken().getRole())) {
|
||||||
|
// Initialize a SubscriberEndpoint for each other user connected with PUBLISHER
|
||||||
|
// or MODERATOR role
|
||||||
getNewOrExistingSubscriber(other.getParticipantPublicId());
|
getNewOrExistingSubscriber(other.getParticipantPublicId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -159,6 +159,9 @@ public class KurentoSession extends Session {
|
||||||
kms.getKurentoClient().destroy();
|
kms.getKurentoClient().destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Also disassociate the KurentoSession from the Kms
|
||||||
|
kms.removeKurentoSession(this.sessionId);
|
||||||
|
|
||||||
this.closed = true;
|
this.closed = true;
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -503,6 +503,10 @@ public class KurentoSessionManager extends SessionManager {
|
||||||
log.warn("Session '{}' has just been created by another thread", session.getSessionId());
|
log.warn("Session '{}' has just been created by another thread", session.getSessionId());
|
||||||
return oldSession;
|
return oldSession;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Also associate the KurentoSession with the Kms
|
||||||
|
kms.addKurentoSession(session);
|
||||||
|
|
||||||
log.warn("No session '{}' exists yet. Created one on KMS '{}'", session.getSessionId(), kms.getUri());
|
log.warn("No session '{}' exists yet. Created one on KMS '{}'", session.getSessionId(), kms.getUri());
|
||||||
|
|
||||||
sessionEventsHandler.onSessionCreated(session);
|
sessionEventsHandler.onSessionCreated(session);
|
||||||
|
|
|
@ -479,8 +479,6 @@ public abstract class MediaEndpoint {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract PublisherEndpoint getPublisher();
|
|
||||||
|
|
||||||
public JsonObject toJson() {
|
public JsonObject toJson() {
|
||||||
JsonObject json = new JsonObject();
|
JsonObject json = new JsonObject();
|
||||||
json.addProperty("createdAt", this.createdAt);
|
json.addProperty("createdAt", this.createdAt);
|
||||||
|
|
|
@ -537,11 +537,6 @@ public class PublisherEndpoint extends MediaEndpoint {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public PublisherEndpoint getPublisher() {
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public MediaOptions getMediaOptions() {
|
public MediaOptions getMediaOptions() {
|
||||||
return mediaOptions;
|
return mediaOptions;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package io.openvidu.server.kurento.endpoint;
|
package io.openvidu.server.kurento.endpoint;
|
||||||
|
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.kurento.client.MediaPipeline;
|
import org.kurento.client.MediaPipeline;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -37,7 +38,7 @@ import io.openvidu.server.kurento.core.KurentoParticipant;
|
||||||
public class SubscriberEndpoint extends MediaEndpoint {
|
public class SubscriberEndpoint extends MediaEndpoint {
|
||||||
private final static Logger log = LoggerFactory.getLogger(SubscriberEndpoint.class);
|
private final static Logger log = LoggerFactory.getLogger(SubscriberEndpoint.class);
|
||||||
|
|
||||||
private boolean connectedToPublisher = false;
|
private AtomicBoolean connectedToPublisher = new AtomicBoolean(false);
|
||||||
|
|
||||||
private PublisherEndpoint publisher = null;
|
private PublisherEndpoint publisher = null;
|
||||||
|
|
||||||
|
@ -58,16 +59,11 @@ public class SubscriberEndpoint extends MediaEndpoint {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isConnectedToPublisher() {
|
public boolean isConnectedToPublisher() {
|
||||||
return connectedToPublisher;
|
return connectedToPublisher.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setConnectedToPublisher(boolean connectedToPublisher) {
|
public void setConnectedToPublisher(boolean connectedToPublisher) {
|
||||||
this.connectedToPublisher = connectedToPublisher;
|
this.connectedToPublisher.set(connectedToPublisher);
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public PublisherEndpoint getPublisher() {
|
|
||||||
return this.publisher;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setPublisher(PublisherEndpoint publisher) {
|
public void setPublisher(PublisherEndpoint publisher) {
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
|
|
||||||
package io.openvidu.server.kurento.kms;
|
package io.openvidu.server.kurento.kms;
|
||||||
|
|
||||||
public class DummyLoadManager extends LoadManager {
|
public class DummyLoadManager implements LoadManager {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public double calculateLoad(Kms kms) {
|
public double calculateLoad(Kms kms) {
|
||||||
|
|
|
@ -23,12 +23,10 @@ public class FixedOneKmsManager extends KmsManager {
|
||||||
|
|
||||||
String kmsWsUri;
|
String kmsWsUri;
|
||||||
|
|
||||||
public FixedOneKmsManager(String kmsWsUri) {
|
public FixedOneKmsManager(String kmsWsUri, LoadManager loadManager) {
|
||||||
|
super(loadManager);
|
||||||
this.kmsWsUri = kmsWsUri;
|
this.kmsWsUri = kmsWsUri;
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void initializeKurentoClients() {
|
|
||||||
KurentoClient kClient = KurentoClient.create(kmsWsUri, this.generateKurentoConnectionListener(kmsWsUri));
|
KurentoClient kClient = KurentoClient.create(kmsWsUri, this.generateKurentoConnectionListener(kmsWsUri));
|
||||||
this.addKms(new Kms(kmsWsUri, kClient, loadManager));
|
this.addKms(new Kms(kmsWsUri, kClient, loadManager));
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,13 +17,21 @@
|
||||||
|
|
||||||
package io.openvidu.server.kurento.kms;
|
package io.openvidu.server.kurento.kms;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.kurento.client.KurentoClient;
|
import org.kurento.client.KurentoClient;
|
||||||
|
import org.kurento.client.ModuleInfo;
|
||||||
|
import org.kurento.client.ServerInfo;
|
||||||
|
|
||||||
|
import com.google.gson.JsonArray;
|
||||||
import com.google.gson.JsonObject;
|
import com.google.gson.JsonObject;
|
||||||
|
|
||||||
|
import io.openvidu.server.kurento.core.KurentoSession;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Abstraction of a KMS instance: an object of this class corresponds to a KMS
|
* Abstraction of a KMS instance: an object of this class corresponds to a KMS
|
||||||
* process running somewhere.
|
* process running somewhere.
|
||||||
|
@ -44,6 +52,8 @@ public class Kms {
|
||||||
private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false);
|
private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false);
|
||||||
private AtomicLong timeOfKurentoClientDisconnection = new AtomicLong(0);
|
private AtomicLong timeOfKurentoClientDisconnection = new AtomicLong(0);
|
||||||
|
|
||||||
|
private Map<String, KurentoSession> kurentoSessions = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public Kms(String kmsUri, KurentoClient client, LoadManager loadManager) {
|
public Kms(String kmsUri, KurentoClient client, LoadManager loadManager) {
|
||||||
this.kmsUri = kmsUri;
|
this.kmsUri = kmsUri;
|
||||||
this.client = client;
|
this.client = client;
|
||||||
|
@ -82,11 +92,58 @@ public class Kms {
|
||||||
this.timeOfKurentoClientDisconnection.set(time);
|
this.timeOfKurentoClientDisconnection.set(time);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Collection<KurentoSession> getKurentoSessions() {
|
||||||
|
return this.kurentoSessions.values();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addKurentoSession(KurentoSession session) {
|
||||||
|
this.kurentoSessions.put(session.getSessionId(), session);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeKurentoSession(String sessionId) {
|
||||||
|
this.kurentoSessions.remove(sessionId);
|
||||||
|
}
|
||||||
|
|
||||||
public JsonObject toJson() {
|
public JsonObject toJson() {
|
||||||
JsonObject json = new JsonObject();
|
JsonObject json = new JsonObject();
|
||||||
json.addProperty("id", this.kmsUri);
|
json.addProperty("id", this.kmsUri);
|
||||||
json.addProperty("uri", this.kmsUri);
|
json.addProperty("uri", this.kmsUri);
|
||||||
// json.addProperty("createdAt", this.client.getServerManager().getInfo().getVersion());
|
return json;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JsonObject toJsonExtended(boolean withSessions, boolean withExtraInfo) {
|
||||||
|
|
||||||
|
JsonObject json = this.toJson();
|
||||||
|
|
||||||
|
if (withSessions) {
|
||||||
|
JsonArray sessions = new JsonArray();
|
||||||
|
for (KurentoSession session : this.kurentoSessions.values()) {
|
||||||
|
sessions.add(session.toJson());
|
||||||
|
}
|
||||||
|
json.add("sessions", sessions);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (withExtraInfo) {
|
||||||
|
json.addProperty("memory", this.client.getServerManager().getUsedMemory() / 1024);
|
||||||
|
|
||||||
|
ServerInfo info = this.client.getServerManager().getInfo();
|
||||||
|
json.addProperty("version", info.getVersion());
|
||||||
|
json.addProperty("capabilities", info.getCapabilities().toString());
|
||||||
|
|
||||||
|
JsonArray modules = new JsonArray();
|
||||||
|
for (ModuleInfo moduleInfo : info.getModules()) {
|
||||||
|
JsonObject moduleJson = new JsonObject();
|
||||||
|
moduleJson.addProperty("name", moduleInfo.getName());
|
||||||
|
moduleJson.addProperty("version", moduleInfo.getVersion());
|
||||||
|
moduleJson.addProperty("generationTime", moduleInfo.getGenerationTime());
|
||||||
|
JsonArray factories = new JsonArray();
|
||||||
|
moduleInfo.getFactories().forEach(fact -> factories.add(fact));
|
||||||
|
moduleJson.add("factories", factories);
|
||||||
|
modules.add(moduleJson);
|
||||||
|
}
|
||||||
|
json.add("modules", modules);
|
||||||
|
}
|
||||||
|
|
||||||
return json;
|
return json;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,8 +25,6 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
|
||||||
|
|
||||||
import org.kurento.client.KurentoConnectionListener;
|
import org.kurento.client.KurentoConnectionListener;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -69,12 +67,17 @@ public abstract class KmsManager {
|
||||||
return json;
|
return json;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public JsonObject toJsonExtended(boolean withSessions, boolean withExtraInfo) {
|
||||||
|
JsonObject json = this.kms.toJsonExtended(withSessions, withExtraInfo);
|
||||||
|
json.addProperty("load", this.load);
|
||||||
|
return json;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
protected SessionManager sessionManager;
|
protected SessionManager sessionManager;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
protected LoadManager loadManager;
|
protected LoadManager loadManager;
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(KmsManager.class);
|
private static final Logger log = LoggerFactory.getLogger(KmsManager.class);
|
||||||
|
@ -84,6 +87,10 @@ public abstract class KmsManager {
|
||||||
|
|
||||||
private Iterator<Kms> usageIterator = null;
|
private Iterator<Kms> usageIterator = null;
|
||||||
|
|
||||||
|
public KmsManager(LoadManager loadManager) {
|
||||||
|
this.loadManager = loadManager;
|
||||||
|
}
|
||||||
|
|
||||||
public synchronized void addKms(Kms kms) {
|
public synchronized void addKms(Kms kms) {
|
||||||
this.kmss.put(kms.getUri(), kms);
|
this.kmss.put(kms.getUri(), kms);
|
||||||
}
|
}
|
||||||
|
@ -200,11 +207,4 @@ public abstract class KmsManager {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract void initializeKurentoClients();
|
|
||||||
|
|
||||||
@PostConstruct
|
|
||||||
private void postConstruct() {
|
|
||||||
initializeKurentoClients();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,17 +17,10 @@
|
||||||
|
|
||||||
package io.openvidu.server.kurento.kms;
|
package io.openvidu.server.kurento.kms;
|
||||||
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
public interface LoadManager {
|
||||||
|
|
||||||
import io.openvidu.server.core.SessionManager;
|
public double calculateLoad(Kms kms);
|
||||||
|
|
||||||
public abstract class LoadManager {
|
public boolean allowMoreElements(Kms kms);
|
||||||
|
|
||||||
@Autowired
|
|
||||||
protected SessionManager sessionManager;
|
|
||||||
|
|
||||||
protected abstract double calculateLoad(Kms kms);
|
|
||||||
|
|
||||||
protected abstract boolean allowMoreElements(Kms kms);
|
|
||||||
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue