openvidu-server: initializeKurentoClients refactoring (support for POST /pro/media-servers)

pull/375/head
pabloFuente 2019-07-04 10:25:37 +02:00
parent a4509528f1
commit a79434fb4a
5 changed files with 43 additions and 29 deletions

View File

@ -76,8 +76,8 @@ import io.openvidu.server.webhook.CDRLoggerWebhook;
* - size: number * - size: number
* - status: string * - status: string
* - webrtcConnectionDestroyed.reason: "unsubscribe", "unpublish", "disconnect", "networkDisconnect", "mediaServerDisconnect", "openviduServerStopped" * - webrtcConnectionDestroyed.reason: "unsubscribe", "unpublish", "disconnect", "networkDisconnect", "mediaServerDisconnect", "openviduServerStopped"
* - participantLeft.reason: "unsubscribe", "unpublish", "disconnect", "networkDisconnect", "openviduServerStopped" * - participantLeft.reason: "unsubscribe", "unpublish", "disconnect", "networkDisconnect", "mediaServerDisconnect", "openviduServerStopped"
* - sessionDestroyed.reason: "lastParticipantLeft", "openviduServerStopped" * - sessionDestroyed.reason: "lastParticipantLeft", "mediaServerDisconnect", "openviduServerStopped"
* - recordingStopped.reason: "recordingStoppedByServer", "lastParticipantLeft", "sessionClosedByServer", "automaticStop", "mediaServerDisconnect", "openviduServerStopped" * - recordingStopped.reason: "recordingStoppedByServer", "lastParticipantLeft", "sessionClosedByServer", "automaticStop", "mediaServerDisconnect", "openviduServerStopped"
* *
* [OPTIONAL_PROPERTIES]: * [OPTIONAL_PROPERTIES]:
@ -210,9 +210,12 @@ public class CallDetailRecord {
RecordingManager.finalReason(reason), timestamp); RecordingManager.finalReason(reason), timestamp);
this.log(recordingStoppedEvent); this.log(recordingStoppedEvent);
// Summary: update ended recording // FIXME: Summary: update ended recording if recordSessionDestroyed has not been
// already called
if (sessionManager.getAccumulatedRecordings(recording.getSessionId()) != null) {
sessionManager.getAccumulatedRecordings(recording.getSessionId()).add(recordingStoppedEvent); sessionManager.getAccumulatedRecordings(recording.getSessionId()).add(recordingStoppedEvent);
} }
}
public void recordRecordingStatusChanged(Recording recording, EndReason finalReason, long timestamp, public void recordRecordingStatusChanged(Recording recording, EndReason finalReason, long timestamp,
Status status) { Status status) {

View File

@ -20,6 +20,7 @@ package io.openvidu.server.kurento.core;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.NoSuchElementException;
import java.util.Set; import java.util.Set;
import org.kurento.client.GenericMediaElement; import org.kurento.client.GenericMediaElement;
@ -90,7 +91,13 @@ public class KurentoSessionManager extends SessionManager {
openviduConfig, recordingManager); openviduConfig, recordingManager);
} }
Kms lessLoadedKms = this.kmsManager.getLessLoadedKms(); Kms lessLoadedKms = null;
try {
lessLoadedKms = this.kmsManager.getLessLoadedKms();
} catch (NoSuchElementException e) {
throw new OpenViduException(Code.ROOM_CANNOT_BE_CREATED_ERROR_CODE,
"There is no available media server where to initialize session '" + sessionId + "'");
}
log.info("KMS less loaded is {} with a load of {}", lessLoadedKms.getUri(), lessLoadedKms.getLoad()); log.info("KMS less loaded is {} with a load of {}", lessLoadedKms.getUri(), lessLoadedKms.getLoad());
kSession = createSession(sessionNotActive, lessLoadedKms); kSession = createSession(sessionNotActive, lessLoadedKms);
} }
@ -301,9 +308,10 @@ public class KurentoSessionManager extends SessionManager {
// Abort automatic recording stop (user published before timeout) // Abort automatic recording stop (user published before timeout)
log.info("Participant {} published before timeout finished. Aborting automatic recording stop", log.info("Participant {} published before timeout finished. Aborting automatic recording stop",
participant.getParticipantPublicId()); participant.getParticipantPublicId());
boolean stopAborted = recordingManager.abortAutomaticRecordingStopThread(kSession); boolean stopAborted = recordingManager.abortAutomaticRecordingStopThread(kSession,
EndReason.automaticStop);
if (stopAborted) { if (stopAborted) {
log.info("Automatic recording stopped succesfully aborted"); log.info("Automatic recording stopped successfully aborted");
} else { } else {
log.info("Automatic recording stopped couldn't be aborted. Recording of session {} has stopped", log.info("Automatic recording stopped couldn't be aborted. Recording of session {} has stopped",
kSession.getSessionId()); kSession.getSessionId());

View File

@ -17,21 +17,22 @@
package io.openvidu.server.kurento.kms; package io.openvidu.server.kurento.kms;
import java.util.List;
import org.kurento.client.KurentoClient; import org.kurento.client.KurentoClient;
import org.kurento.commons.exception.KurentoException; import org.kurento.commons.exception.KurentoException;
public class FixedOneKmsManager extends KmsManager { public class FixedOneKmsManager extends KmsManager {
@Override @Override
protected void initializeKurentoClients() { public void initializeKurentoClients(List<String> kmsUris) throws Exception {
final String kmsUri = this.openviduConfig.getKmsUris().get(0); final String kmsUri = kmsUris.get(0);
KurentoClient kClient = null; KurentoClient kClient = null;
try { try {
kClient = KurentoClient.create(kmsUri, this.generateKurentoConnectionListener(kmsUri)); kClient = KurentoClient.create(kmsUri, this.generateKurentoConnectionListener(kmsUri));
} catch (KurentoException e) { } catch (KurentoException e) {
log.error("KMS in {} is not reachable by OpenVidu Server", kmsUri); log.error("KMS in {} is not reachable by OpenVidu Server", kmsUri);
log.error("Shutting down OpenVidu Server"); throw new Exception();
System.exit(1);
} }
Kms kms = new Kms(kmsUri, kClient, loadManager); Kms kms = new Kms(kmsUri, kClient, loadManager);
kms.setKurentoClientConnected(true); kms.setKurentoClientConnected(true);

View File

@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -89,24 +90,19 @@ public abstract class KmsManager {
this.kmss.put(kms.getUri(), kms); this.kmss.put(kms.getUri(), kms);
} }
public synchronized void removeKms(Kms kms) { public synchronized void removeKms(String kmsUri) {
this.kmss.remove(kms.getUri()); this.kmss.remove(kmsUri);
} }
public synchronized Kms getLessLoadedKms() { public synchronized Kms getLessLoadedKms() throws NoSuchElementException {
return Collections.min(getKmsLoads()).kms; return Collections.min(getKmsLoads()).kms;
} }
public synchronized Kms getNextLessLoadedKms() { public Kms getKms(String kmsUri) {
List<KmsLoad> sortedLoads = getKmssSortedByLoad(); return this.kmss.get(kmsUri);
if (sortedLoads.size() > 1) {
return sortedLoads.get(1).kms;
} else {
return sortedLoads.get(0).kms;
}
} }
public synchronized Collection<Kms> getKmss() { public Collection<Kms> getKmss() {
return this.kmss.values(); return this.kmss.values();
} }
@ -179,11 +175,17 @@ public abstract class KmsManager {
}; };
} }
protected abstract void initializeKurentoClients(); public abstract void initializeKurentoClients(List<String> kmsUris) throws Exception;
@PostConstruct @PostConstruct
protected void postConstruct() { private void postConstruct() {
this.initializeKurentoClients(); try {
this.initializeKurentoClients(this.openviduConfig.getKmsUris());
} catch (Exception e) {
// Some KMS wasn't reachable
log.error("Shutting down OpenVidu Server");
System.exit(1);
}
} }
} }

View File

@ -226,7 +226,7 @@ public class RecordingManager {
recording = this.singleStreamRecordingService.stopRecording(session, recording, reason); recording = this.singleStreamRecordingService.stopRecording(session, recording, reason);
break; break;
} }
this.abortAutomaticRecordingStopThread(session); this.abortAutomaticRecordingStopThread(session, reason);
return recording; return recording;
} }
@ -248,7 +248,7 @@ public class RecordingManager {
kmsDisconnectionTime); kmsDisconnectionTime);
break; break;
} }
this.abortAutomaticRecordingStopThread(session); this.abortAutomaticRecordingStopThread(session, reason);
return recording; return recording;
} }
@ -433,7 +433,7 @@ public class RecordingManager {
this.automaticRecordingStopThreads.putIfAbsent(session.getSessionId(), future); this.automaticRecordingStopThreads.putIfAbsent(session.getSessionId(), future);
} }
public boolean abortAutomaticRecordingStopThread(Session session) { public boolean abortAutomaticRecordingStopThread(Session session, EndReason reason) {
ScheduledFuture<?> future = this.automaticRecordingStopThreads.remove(session.getSessionId()); ScheduledFuture<?> future = this.automaticRecordingStopThreads.remove(session.getSessionId());
if (future != null) { if (future != null) {
boolean cancelled = future.cancel(false); boolean cancelled = future.cancel(false);
@ -445,7 +445,7 @@ public class RecordingManager {
log.info( log.info(
"Ongoing recording of session {} was explicetly stopped within timeout for automatic recording stop. Closing session", "Ongoing recording of session {} was explicetly stopped within timeout for automatic recording stop. Closing session",
session.getSessionId()); session.getSessionId());
sessionManager.closeSessionAndEmptyCollections(session, EndReason.automaticStop); sessionManager.closeSessionAndEmptyCollections(session, reason);
sessionManager.showTokens(); sessionManager.showTokens();
} }
return cancelled; return cancelled;