openvidu-server: new behavior upon KurentoClient disconnection

pull/581/head
pabloFuente 2020-12-22 17:36:40 +01:00
parent 474e0a70b1
commit 1b98351596
12 changed files with 170 additions and 158 deletions

View File

@ -101,18 +101,6 @@ public class OpenViduServer implements JsonRpcConfigurer {
@Autowired
OpenviduConfig config;
@Bean
@ConditionalOnMissingBean
@DependsOn({ "openviduConfig", "mediaNodeStatusManager" })
public KmsManager kmsManager(OpenviduConfig openviduConfig) {
if (openviduConfig.getKmsUris().isEmpty()) {
throw new IllegalArgumentException("'KMS_URIS' should contain at least one KMS url");
}
String firstKmsWsUri = openviduConfig.getKmsUris().get(0);
log.info("OpenVidu Server using one KMS: {}", firstKmsWsUri);
return new FixedOneKmsManager();
}
@Bean
@ConditionalOnMissingBean
@DependsOn("openviduConfig")
@ -148,6 +136,18 @@ public class OpenViduServer implements JsonRpcConfigurer {
return new KurentoSessionManager();
}
@Bean
@ConditionalOnMissingBean
@DependsOn({ "openviduConfig", "sessionManager", "mediaNodeStatusManager" })
public KmsManager kmsManager(OpenviduConfig openviduConfig, SessionManager sessionManager) {
if (openviduConfig.getKmsUris().isEmpty()) {
throw new IllegalArgumentException("'KMS_URIS' should contain at least one KMS url");
}
String firstKmsWsUri = openviduConfig.getKmsUris().get(0);
log.info("OpenVidu Server using one KMS: {}", firstKmsWsUri);
return new FixedOneKmsManager(sessionManager);
}
@Bean
@ConditionalOnMissingBean
@DependsOn("openviduConfig")

View File

@ -0,0 +1,25 @@
package io.openvidu.server.cdr;
import com.google.gson.JsonObject;
import io.openvidu.server.kurento.kms.Kms;
public class CDREventMediaServerCrashed extends CDREvent {
private Kms kms;
public CDREventMediaServerCrashed(CDREventName eventName, String sessionId, Long timeStamp, Kms kms) {
super(eventName, sessionId, timeStamp);
this.kms = kms;
}
@Override
public JsonObject toJson() {
JsonObject json = super.toJson();
json.addProperty("id", this.kms.getId());
json.addProperty("ip", this.kms.getIp());
json.addProperty("uri", this.kms.getUri());
return json;
}
}

View File

@ -21,6 +21,6 @@ public enum CDREventName {
sessionCreated, sessionDestroyed, participantJoined, participantLeft, webrtcConnectionCreated,
webrtcConnectionDestroyed, recordingStarted, recordingStopped, recordingStatusChanged, filterEventDispatched,
signalSent, mediaNodeStatusChanged, autoscaling
signalSent, mediaNodeStatusChanged, autoscaling, mediaServerCrashed
}

View File

@ -36,6 +36,7 @@ import io.openvidu.server.core.Participant;
import io.openvidu.server.core.Session;
import io.openvidu.server.core.SessionManager;
import io.openvidu.server.kurento.endpoint.KmsEvent;
import io.openvidu.server.kurento.kms.Kms;
import io.openvidu.server.recording.Recording;
import io.openvidu.server.recording.service.RecordingManager;
import io.openvidu.server.summary.SessionSummary;
@ -216,4 +217,10 @@ public class CallDetailRecord {
});
}
public void recordMediaServerCrashed(Kms kms, long timeOfKurentoDisconnection) {
CDREvent e = new CDREventMediaServerCrashed(CDREventName.mediaServerCrashed, null, timeOfKurentoDisconnection,
kms);
this.log(e);
}
}

View File

@ -20,7 +20,7 @@ package io.openvidu.server.core;
public enum EndReason {
unsubscribe, unpublish, disconnect, forceUnpublishByUser, forceUnpublishByServer, forceDisconnectByUser,
forceDisconnectByServer, lastParticipantLeft, networkDisconnect, mediaServerDisconnect, openviduServerStopped,
recordingStoppedByServer, automaticStop, sessionClosedByServer
forceDisconnectByServer, lastParticipantLeft, networkDisconnect, mediaServerDisconnect, mediaServerCrashed,
openviduServerStopped, recordingStoppedByServer, automaticStop, sessionClosedByServer
}

View File

@ -42,6 +42,7 @@ import io.openvidu.server.config.OpenviduBuildInfo;
import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.kurento.core.KurentoParticipant;
import io.openvidu.server.kurento.endpoint.KurentoFilter;
import io.openvidu.server.kurento.kms.Kms;
import io.openvidu.server.recording.Recording;
import io.openvidu.server.rpc.RpcNotificationService;
@ -606,6 +607,10 @@ public class SessionEventsHandler {
public void onConnectionPropertyChanged(Participant participant, String property, Object newValue) {
}
public void onMediaServerCrashed(Kms kms, long timeOfKurentoDisconnection) {
CDR.recordMediaServerCrashed(kms, timeOfKurentoDisconnection);
}
protected Set<Participant> filterParticipantsByRole(OpenViduRole[] roles, Set<Participant> participants) {
return participants.stream().filter(part -> {
if (ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(part.getParticipantPublicId())) {

View File

@ -459,7 +459,7 @@ public abstract class SessionManager {
log.warn("Possible ghost session {}", sessionActive.getSessionId());
}
}
}, () -> new Long(openviduConfig.getSessionGarbageInterval() * 1000));
}, () -> Long.valueOf(openviduConfig.getSessionGarbageInterval() * 1000));
this.sessionGarbageCollectorTimer.updateTimer();

View File

@ -37,6 +37,7 @@ import org.kurento.client.MediaElement;
import org.kurento.client.MediaPipeline;
import org.kurento.client.PassThrough;
import org.kurento.client.internal.server.KurentoServerException;
import org.kurento.jsonrpc.JsonRpcException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -370,11 +371,16 @@ public class KurentoParticipant extends Participant {
it.remove();
if (subscriber != null && subscriber.getEndpoint() != null) {
releaseSubscriberEndpoint(remoteParticipantName,
(KurentoParticipant) this.session.getParticipantByPublicId(remoteParticipantName), subscriber,
reason, false);
log.debug("PARTICIPANT {}: Released subscriber endpoint to {}", this.getParticipantPublicId(),
remoteParticipantName);
try {
releaseSubscriberEndpoint(remoteParticipantName,
(KurentoParticipant) this.session.getParticipantByPublicId(remoteParticipantName),
subscriber, reason, false);
log.debug("PARTICIPANT {}: Released subscriber endpoint to {}", this.getParticipantPublicId(),
remoteParticipantName);
} catch (JsonRpcException e) {
log.error("Error releasing subscriber endpoint of participant {}: {}", this.participantPublicId,
e.getMessage());
}
} else {
log.warn(
"PARTICIPANT {}: Trying to close subscriber endpoint to {}. "
@ -452,25 +458,33 @@ public class KurentoParticipant extends Participant {
}
private void releasePublisherEndpointAux(EndReason reason, Long kmsDisconnectionTime) {
// Remove streamId from publisher's map
this.session.publishedStreamIds.remove(this.getPublisherStreamId());
try {
// Remove streamId from publisher's map
this.session.publishedStreamIds.remove(this.getPublisherStreamId());
if (this.openviduConfig.isRecordingModuleEnabled() && this.token.record()
&& this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) {
this.recordingManager.stopOneIndividualStreamRecording(session, this.getPublisherStreamId(),
kmsDisconnectionTime);
this.streaming = false;
this.session.deregisterPublisher(this);
if (this.openviduConfig.isRecordingModuleEnabled() && this.token.record()
&& this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) {
this.recordingManager.stopOneIndividualStreamRecording(session, this.getPublisherStreamId(),
kmsDisconnectionTime);
}
publisher.cancelStatsLoop.set(true);
// These operations are all remote
publisher.unregisterErrorListeners();
for (MediaElement el : publisher.getMediaElements()) {
releaseElement(getParticipantPublicId(), el);
}
releaseElement(getParticipantPublicId(), publisher.getEndpoint());
} catch (JsonRpcException e) {
log.error("Error releasing publisher endpoint of participant {}: {}", this.participantPublicId,
e.getMessage());
}
publisher.unregisterErrorListeners();
publisher.cancelStatsLoop.set(true);
for (MediaElement el : publisher.getMediaElements()) {
releaseElement(getParticipantPublicId(), el);
}
releaseElement(getParticipantPublicId(), publisher.getEndpoint());
this.streaming = false;
this.session.deregisterPublisher(this);
endpointConfig.getCdr().stopPublisher(this.getParticipantPublicId(), publisher.getStreamId(), reason);
publisher = null;
}

View File

@ -26,12 +26,19 @@ import javax.annotation.PostConstruct;
import org.kurento.client.KurentoClient;
import org.kurento.commons.exception.KurentoException;
import org.kurento.jsonrpc.client.JsonRpcClientNettyWebSocket;
import org.kurento.jsonrpc.client.JsonRpcWSConnectionListener;
import io.openvidu.java.client.RecordingProperties;
import io.openvidu.server.core.Session;
import io.openvidu.server.core.SessionManager;
public class FixedOneKmsManager extends KmsManager {
public FixedOneKmsManager(SessionManager sessionManager) {
super(sessionManager);
}
@Override
public List<Kms> initializeKurentoClients(List<KmsProperties> kmsProperties, boolean disconnectUponFailure)
throws Exception {
@ -39,7 +46,11 @@ public class FixedOneKmsManager extends KmsManager {
KurentoClient kClient = null;
Kms kms = new Kms(firstProps, loadManager, quarantineKiller);
try {
kClient = KurentoClient.create(firstProps.getUri(), this.generateKurentoConnectionListener(kms.getId()));
JsonRpcWSConnectionListener listener = this.generateKurentoConnectionListener(kms.getId());
JsonRpcClientNettyWebSocket client = new JsonRpcClientNettyWebSocket(firstProps.getUri(), listener);
client.setTryReconnectingMaxTime(0);
client.setTryReconnectingForever(false);
kClient = KurentoClient.createFromJsonRpcClient(client);
this.addKms(kms);
kms.setKurentoClient(kClient);

View File

@ -25,7 +25,6 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -35,7 +34,7 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.RandomStringUtils;
import org.kurento.client.KurentoConnectionListener;
import org.kurento.jsonrpc.client.JsonRpcWSConnectionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -44,8 +43,11 @@ import com.google.gson.JsonObject;
import io.openvidu.java.client.RecordingProperties;
import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.core.EndReason;
import io.openvidu.server.core.IdentifierPrefixes;
import io.openvidu.server.core.Session;
import io.openvidu.server.core.SessionEventsHandler;
import io.openvidu.server.core.SessionManager;
import io.openvidu.server.kurento.core.KurentoSession;
import io.openvidu.server.utils.MediaNodeStatusManager;
import io.openvidu.server.utils.QuarantineKiller;
@ -58,8 +60,6 @@ public abstract class KmsManager {
public static final Lock selectAndRemoveKmsLock = new ReentrantLock(true);
public static final int MAX_SECONDS_LOCK_WAIT = 15;
private Map<String, Lock> kmsReconnectionLocks = new ConcurrentHashMap<>();
private UpdatableTimerTask kurentoReconnectTimer;
public class KmsLoad implements Comparable<KmsLoad> {
@ -110,8 +110,17 @@ public abstract class KmsManager {
@Autowired
protected MediaNodeStatusManager mediaNodeStatusManager;
@Autowired
protected SessionEventsHandler sessionEventsHandler;
final protected Map<String, Kms> kmss = new ConcurrentHashMap<>();
private SessionManager sessionManager;
public KmsManager(SessionManager sessionManager) {
this.sessionManager = sessionManager;
}
public synchronized void addKms(Kms kms) {
this.kmss.put(kms.getId(), kms);
}
@ -165,55 +174,14 @@ public abstract class KmsManager {
return kmsLoads;
}
protected KurentoConnectionListener generateKurentoConnectionListener(final String kmsId) {
return new KurentoConnectionListener() {
protected JsonRpcWSConnectionListener generateKurentoConnectionListener(final String kmsId) {
return new JsonRpcWSConnectionListener() {
@Override
public void reconnected(boolean sameServer) {
final Kms kms = kmss.get(kmsId);
log.info("Kurento Client \"reconnected\" event for KMS {} (sameServer: {}) [{}]", kms.getUri(),
sameServer, kms.getKurentoClient().toString());
kmsReconnectionLocks.putIfAbsent(kms.getId(), new ReentrantLock());
boolean lockAcquired = false;
try {
if (kmsReconnectionLocks.get(kms.getId()).tryLock(5, TimeUnit.SECONDS)) {
lockAcquired = true;
if (kms.isKurentoClientConnected()) {
// Timer task of disconnected event successfully executed
log.warn(
"Timer task already executed for reconnected Kurento Client [{}] to KMS with uri {}. Skipping event listener execution",
kms.getKurentoClient().toString(), kms.getUri());
return;
}
kms.setKurentoClientConnected(true);
kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
if (!sameServer) {
// Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints)
log.warn("Kurento Client reconnected to a different KMS instance, with uri {}",
kms.getUri());
log.warn("Updating all webrtc endpoints for active sessions");
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
kms.getKurentoSessions().forEach(kSession -> {
kSession.restartStatusInKurento(timeOfKurentoDisconnection);
});
} else {
// Same KMS. We may infer that openvidu-server/KMS connection has been lost, but
// not the clients/KMS connections
log.warn("Kurento Client reconnected to same KMS {} with uri {}", kmsId, kms.getUri());
}
kms.setTimeOfKurentoClientDisconnection(0);
}
} catch (InterruptedException e) {
log.error("InterruptedException when waiting for lock on reconnected event of KMS with uri {}",
kms.getUri());
} finally {
if (lockAcquired) {
kmsReconnectionLocks.get(kms.getId()).unlock();
}
}
}
@Override
@ -232,71 +200,65 @@ public abstract class KmsManager {
kms.getUri(), kms.getKurentoClient().toString());
}
// TODO: this is a fix for the lack of reconnected event
kmsReconnectionLocks.putIfAbsent(kms.getId(), new ReentrantLock());
final AtomicInteger ITERATION = new AtomicInteger(0);
final int loops = 6;
final AtomicInteger iteration = new AtomicInteger(loops);
final long intervalWaitMs = 500L;
kurentoReconnectTimer = new UpdatableTimerTask(() -> {
boolean lockAcquired = false;
try {
if (kmsReconnectionLocks.get(kms.getId()).tryLock(5, TimeUnit.SECONDS)) {
lockAcquired = true;
if (iteration.decrementAndGet() < 0) {
if (kms.isKurentoClientConnected()) {
// reconnected listener already executed
log.info(
"Timer of KMS with uri {} and KurentoClient [{}] cancelled (reconnected event received during interval wait)",
kms.getUri(), kms.getKurentoClient().toString());
kurentoReconnectTimer.cancelTimer();
return;
}
log.error("KurentoClient [{}] could not reconnect to KMS with uri {} in {} seconds",
kms.getKurentoClient().toString(), kms.getUri(), (intervalWaitMs * 6 / 1000));
kurentoReconnectTimer.cancelTimer();
log.warn("Closing {} sessions hosted by KMS with uri {}: {}", kms.getKurentoSessions().size(),
kms.getUri(), kms.getKurentoSessions().stream().map(s -> s.getSessionId())
.collect(Collectors.joining(",", "[", "]")));
if (kms.getKurentoClient().isClosed()) {
log.info(
"Timer of KMS with uri {} and KurentoClient [{}] has been closed. Cancelling Timer",
kms.getUri(), kms.getKurentoClient().toString());
kurentoReconnectTimer.cancelTimer();
return;
}
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
sessionEventsHandler.onMediaServerCrashed(kms, timeOfKurentoDisconnection);
kms.getKurentoSessions().forEach(kSession -> {
sessionManager.closeSession(kSession.getSessionId(), EndReason.mediaServerCrashed);
});
} else {
try {
kms.getKurentoClient().getServerManager().getInfo();
log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected",
kms.getUri(), kms.getKurentoClient().toString());
kurentoReconnectTimer.cancelTimer();
kms.setKurentoClientConnected(true);
kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
} catch (Exception e) {
log.error(
"According to Timer KMS with uri {} and KurentoClient [{}] is not reconnected yet. Exception {}",
kms.getUri(), kms.getKurentoClient().toString(), e.getClass().getName());
return;
}
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected",
kms.getUri(), kms.getKurentoClient().toString());
kurentoReconnectTimer.cancelTimer();
kms.setKurentoClientConnected(true);
kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
if (kms.getKurentoSessions().isEmpty()) {
log.info("There were no sessions in the KMS with uri {}. Nothing must be done",
kms.getUri());
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
if (kms.getKurentoSessions().isEmpty()) {
log.info("There were no sessions in the KMS with uri {}. Nothing must be done",
kms.getUri());
} else {
if (isNewKms(kms)) {
log.warn("KMS with URI {} is a new KMS process. Resetting {} sessions: {}",
kms.getUri(), kms.getKurentoSessions().size(), kms.getKurentoSessions().stream()
.map(s -> s.getSessionId()).collect(Collectors.joining(",", "[", "]")));
kms.getKurentoSessions().forEach(kSession -> {
kSession.restartStatusInKurento(timeOfKurentoDisconnection);
});
} else {
if (isNewKms(kms)) {
log.warn("KMS with URI {} is a new KMS process. Resetting {} sessions: {}",
kms.getUri(), kms.getKurentoSessions().size(),
kms.getKurentoSessions().stream().map(s -> s.getSessionId())
.collect(Collectors.joining(",", "[", "]")));
kms.getKurentoSessions().forEach(kSession -> {
kSession.restartStatusInKurento(timeOfKurentoDisconnection);
});
} else {
log.info("KMS with URI {} is the same process. Nothing must be done", kms.getUri());
}
log.info("KMS with URI {} is the same process. Nothing must be done", kms.getUri());
}
}
kms.setTimeOfKurentoClientDisconnection(0);
}
} catch (Exception e) {
log.error(
"According to Timer KMS with uri {} and KurentoClient [{}] is not reconnected yet. Exception {}",
kms.getUri(), kms.getKurentoClient().toString(), e.getClass().getName());
} finally {
if (lockAcquired) {
kmsReconnectionLocks.get(kms.getId()).unlock();
}
kms.setTimeOfKurentoClientDisconnection(0);
}
}, () -> Long.valueOf(dynamicReconnectLoopSeconds(ITERATION.getAndIncrement()) * 1000));
}, () -> intervalWaitMs); // Try 2 times per seconds
kurentoReconnectTimer.updateTimer();
}
@ -318,6 +280,11 @@ public abstract class KmsManager {
// kms.setKurentoClientConnected(true);
// kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
}
@Override
public void reconnecting() {
}
};
}
@ -333,24 +300,6 @@ public abstract class KmsManager {
}
}
private int dynamicReconnectLoopSeconds(int iteration) {
// First 10 loops every second, next 20 loops ever 3s, the rest every 10s
final int[][] intervals = { new int[] { 1, 10 }, new int[] { 3, 20 }, new int[] { 10, Integer.MAX_VALUE } };
int accumulatedIntervals = 0;
for (int i = 0; i < intervals.length - 1; i++) {
if ((accumulatedIntervals + intervals[i][1]) > iteration) {
// Interval found for current iteration
return intervals[i][0];
} else {
// This iteration has already been surpassed
accumulatedIntervals += intervals[i][1];
}
}
// Return last interval
return intervals[intervals.length - 1][0];
}
public abstract List<Kms> initializeKurentoClients(List<KmsProperties> kmsProperties, boolean disconnectUponFailure)
throws Exception;

View File

@ -24,7 +24,7 @@ OPENVIDU_CDR_PATH=/opt/openvidu/cdr
OPENVIDU_WEBHOOK=false
OPENVIDU_WEBHOOK_ENDPOINT=
OPENVIDU_WEBHOOK_HEADERS=[]
OPENVIDU_WEBHOOK_EVENTS=["sessionCreated","sessionDestroyed","participantJoined","participantLeft","webrtcConnectionCreated","webrtcConnectionDestroyed","recordingStatusChanged","filterEventDispatched","signalSent","mediaNodeStatusChanged","autoscaling"]
OPENVIDU_WEBHOOK_EVENTS=["sessionCreated","sessionDestroyed","participantJoined","participantLeft","webrtcConnectionCreated","webrtcConnectionDestroyed","recordingStatusChanged","filterEventDispatched","signalSent","mediaNodeStatusChanged","autoscaling","mediaServerCrashed"]
OPENVIDU_RECORDING=false
OPENVIDU_RECORDING_DEBUG=false

View File

@ -18,6 +18,7 @@ import org.powermock.reflect.Whitebox;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import io.openvidu.server.kurento.core.KurentoSessionManager;
import io.openvidu.server.kurento.kms.FixedOneKmsManager;
import io.openvidu.server.kurento.kms.Kms;
import io.openvidu.server.kurento.kms.KmsManager;
@ -33,7 +34,7 @@ public class IntegrationTestConfiguration {
@Bean
public KmsManager kmsManager() throws Exception {
final KmsManager spy = Mockito.spy(new FixedOneKmsManager());
final KmsManager spy = Mockito.spy(new FixedOneKmsManager(new KurentoSessionManager()));
doAnswer(invocation -> {
List<Kms> successfullyConnectedKmss = new ArrayList<>();
List<KmsProperties> kmsProperties = invocation.getArgument(0);