openvidu-server: LoadManager refactoring

pull/375/head
pabloFuente 2019-06-13 17:59:37 +02:00
parent 1a8b657eb7
commit 3d17ffcec0
13 changed files with 78 additions and 33 deletions

View File

@ -62,6 +62,7 @@ import io.openvidu.server.kurento.core.KurentoSessionManager;
import io.openvidu.server.kurento.kms.DummyLoadManager;
import io.openvidu.server.kurento.kms.FixedOneKmsManager;
import io.openvidu.server.kurento.kms.KmsManager;
import io.openvidu.server.kurento.kms.LoadManager;
import io.openvidu.server.recording.service.RecordingManager;
import io.openvidu.server.rpc.RpcHandler;
import io.openvidu.server.rpc.RpcNotificationService;
@ -104,7 +105,13 @@ public class OpenViduServer implements JsonRpcConfigurer {
String firstKmsWsUri = kmsWsUris.get(0);
log.info("OpenVidu Server using one KMS: {}", firstKmsWsUri);
return new FixedOneKmsManager(firstKmsWsUri, new DummyLoadManager());
return new FixedOneKmsManager(firstKmsWsUri);
}
@Bean
@ConditionalOnMissingBean
public LoadManager loadManager() {
return new DummyLoadManager();
}
@Bean

View File

@ -17,12 +17,9 @@
package io.openvidu.server.core;
import org.springframework.stereotype.Service;
import io.openvidu.java.client.OpenViduRole;
import io.openvidu.server.kurento.core.KurentoTokenOptions;
@Service
public interface TokenGenerator {
public Token generateToken(String sessionId, OpenViduRole role, String serverMetadata,

View File

@ -21,11 +21,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import io.openvidu.server.config.OpenviduConfig;
@Service
public abstract class CoturnCredentialsService {
protected static final Logger log = LoggerFactory.getLogger(CoturnCredentialsService.class);

View File

@ -17,6 +17,7 @@
package io.openvidu.server.kurento.core;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@ -66,7 +67,7 @@ public class KurentoParticipant extends Participant {
private KurentoParticipantEndpointConfig endpointConfig;
private PublisherEndpoint publisher;
private CountDownLatch endPointLatch = new CountDownLatch(1);
private CountDownLatch publisherLatch = new CountDownLatch(1);
private final ConcurrentMap<String, Filter> filters = new ConcurrentHashMap<>();
private final ConcurrentMap<String, SubscriberEndpoint> subscribers = new ConcurrentHashMap<String, SubscriberEndpoint>();
@ -96,7 +97,7 @@ public class KurentoParticipant extends Participant {
public void createPublishingEndpoint(MediaOptions mediaOptions) {
publisher.createEndpoint(endPointLatch);
publisher.createEndpoint(publisherLatch);
if (getPublisher().getEndpoint() == null) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create publisher endpoint");
}
@ -139,7 +140,7 @@ public class KurentoParticipant extends Participant {
public PublisherEndpoint getPublisher() {
try {
if (!endPointLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) {
if (!publisherLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) {
throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE,
"Timeout reached while waiting for publisher endpoint to be ready");
}
@ -150,6 +151,10 @@ public class KurentoParticipant extends Participant {
return this.publisher;
}
public Collection<SubscriberEndpoint> getSubscribers() {
return this.subscribers.values();
}
public MediaOptions getPublisherMediaOptions() {
return this.publisher.getMediaOptions();
}
@ -372,7 +377,8 @@ public class KurentoParticipant extends Participant {
if (this.openviduConfig.isRecordingModuleEnabled()
&& this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) {
this.recordingManager.stopOneIndividualStreamRecording(session, this.getPublisherStreamId(), kmsDisconnectionTime);
this.recordingManager.stopOneIndividualStreamRecording(session, this.getPublisherStreamId(),
kmsDisconnectionTime);
}
publisher.unregisterErrorListeners();

View File

@ -17,10 +17,7 @@
package io.openvidu.server.kurento.kms;
import org.springframework.stereotype.Service;
@Service
public class DummyLoadManager implements LoadManager {
public class DummyLoadManager extends LoadManager {
@Override
public double calculateLoad(Kms kms) {

View File

@ -21,8 +21,14 @@ import org.kurento.client.KurentoClient;
public class FixedOneKmsManager extends KmsManager {
public FixedOneKmsManager(String kmsWsUri, LoadManager loadManager) {
super(loadManager);
String kmsWsUri;
public FixedOneKmsManager(String kmsWsUri) {
this.kmsWsUri = kmsWsUri;
}
@Override
protected void initializeKurentoClients() {
KurentoClient kClient = KurentoClient.create(kmsWsUri, this.generateKurentoConnectionListener(kmsWsUri));
this.addKms(new Kms(kmsWsUri, kClient, loadManager));
}

View File

@ -22,6 +22,8 @@ import java.util.concurrent.atomic.AtomicLong;
import org.kurento.client.KurentoClient;
import com.google.gson.JsonObject;
/**
* Abstraction of a KMS instance: an object of this class corresponds to a KMS
* process running somewhere.
@ -80,4 +82,12 @@ public class Kms {
this.timeOfKurentoClientDisconnection.set(time);
}
public JsonObject toJson() {
JsonObject json = new JsonObject();
json.addProperty("id", this.kmsUri);
json.addProperty("uri", this.kmsUri);
// json.addProperty("createdAt", this.client.getServerManager().getInfo().getVersion());
return json;
}
}

View File

@ -18,23 +18,26 @@
package io.openvidu.server.kurento.kms;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import org.kurento.client.KurentoConnectionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.google.gson.JsonObject;
import io.openvidu.server.core.SessionManager;
import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.kurento.core.KurentoSessionManager;
@Service
public abstract class KmsManager {
public class KmsLoad implements Comparable<KmsLoad> {
@ -59,11 +62,19 @@ public abstract class KmsManager {
public int compareTo(KmsLoad o) {
return Double.compare(this.load, o.load);
}
public JsonObject toJson() {
JsonObject json = this.kms.toJson();
json.addProperty("load", this.load);
return json;
}
}
@Autowired
protected SessionManager sessionManager;
@Autowired
protected LoadManager loadManager;
private static final Logger log = LoggerFactory.getLogger(KmsManager.class);
@ -73,10 +84,6 @@ public abstract class KmsManager {
private Iterator<Kms> usageIterator = null;
public KmsManager(LoadManager loadManager) {
this.loadManager = loadManager;
}
public synchronized void addKms(Kms kms) {
this.kmss.put(kms.getUri(), kms);
}
@ -110,6 +117,10 @@ public abstract class KmsManager {
}
}
public synchronized Collection<Kms> getKmss() {
return this.kmss.values();
}
public synchronized List<KmsLoad> getKmssSortedByLoad() {
List<KmsLoad> kmsLoads = getKmsLoads();
Collections.sort(kmsLoads);
@ -189,4 +200,11 @@ public abstract class KmsManager {
};
}
protected abstract void initializeKurentoClients();
@PostConstruct
private void postConstruct() {
initializeKurentoClients();
}
}

View File

@ -17,10 +17,17 @@
package io.openvidu.server.kurento.kms;
public interface LoadManager {
import org.springframework.beans.factory.annotation.Autowired;
double calculateLoad(Kms kms);
import io.openvidu.server.core.SessionManager;
boolean allowMoreElements(Kms kms);
public abstract class LoadManager {
@Autowired
protected SessionManager sessionManager;
protected abstract double calculateLoad(Kms kms);
protected abstract boolean allowMoreElements(Kms kms);
}

View File

@ -47,7 +47,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
@ -69,7 +68,6 @@ import io.openvidu.server.recording.Recording;
import io.openvidu.server.utils.CustomFileManager;
import io.openvidu.server.utils.DockerManager;
@Service
public class RecordingManager {
private static final Logger log = LoggerFactory.getLogger(RecordingManager.class);

View File

@ -35,7 +35,7 @@ import io.openvidu.server.config.OpenviduConfig;
/**
*
* @author Pablo Fuente Pérez
* @author Pablo Fuente (pablofuenteperez@gmail.com)
*/
@RestController
@CrossOrigin
@ -110,9 +110,13 @@ public class ConfigRestController {
json.addProperty("openviduRecordingAutostopTimeout", openviduConfig.getOpenviduRecordingAutostopTimeout());
}
return new ResponseEntity<>(json.toString(), getResponseHeaders(), HttpStatus.OK);
}
protected HttpHeaders getResponseHeaders() {
HttpHeaders responseHeaders = new HttpHeaders();
responseHeaders.setContentType(MediaType.APPLICATION_JSON);
return new ResponseEntity<>(json.toString(), responseHeaders, HttpStatus.OK);
return responseHeaders;
}
}

View File

@ -61,7 +61,7 @@ import io.openvidu.server.utils.RandomStringGenerator;
/**
*
* @author Pablo Fuente Pérez
* @author Pablo Fuente (pablofuenteperez@gmail.com)
*/
@RestController
@CrossOrigin

View File

@ -2,9 +2,6 @@ package io.openvidu.server.utils;
import java.net.InetAddress;
import org.springframework.stereotype.Service;
@Service
public class GeoLocationByIpDummy implements GeoLocationByIp {
@Override