mirror of https://github.com/OpenVidu/openvidu.git
openvidu-server: close UpdatableTimerTasks on @PreDestroy
parent
e8121bf35c
commit
d0483f9033
|
@ -87,6 +87,8 @@ public abstract class SessionManager {
|
||||||
|
|
||||||
public FormatChecker formatChecker = new FormatChecker();
|
public FormatChecker formatChecker = new FormatChecker();
|
||||||
|
|
||||||
|
private UpdatableTimerTask sessionGarbageCollectorTimer;
|
||||||
|
|
||||||
final protected ConcurrentMap<String, Session> sessions = new ConcurrentHashMap<>();
|
final protected ConcurrentMap<String, Session> sessions = new ConcurrentHashMap<>();
|
||||||
final protected ConcurrentMap<String, Session> sessionsNotActive = new ConcurrentHashMap<>();
|
final protected ConcurrentMap<String, Session> sessionsNotActive = new ConcurrentHashMap<>();
|
||||||
protected ConcurrentMap<String, ConcurrentHashMap<String, Participant>> sessionidParticipantpublicidParticipant = new ConcurrentHashMap<>();
|
protected ConcurrentMap<String, ConcurrentHashMap<String, Participant>> sessionidParticipantpublicidParticipant = new ConcurrentHashMap<>();
|
||||||
|
@ -165,8 +167,9 @@ public abstract class SessionManager {
|
||||||
|
|
||||||
public abstract String getParticipantPrivateIdFromStreamId(String sessionId, String streamId)
|
public abstract String getParticipantPrivateIdFromStreamId(String sessionId, String streamId)
|
||||||
throws OpenViduException;
|
throws OpenViduException;
|
||||||
|
|
||||||
public abstract void onVideoData(Participant participant, Integer transactionId, Integer height, Integer width, Boolean videoActive, Boolean audioActive);
|
public abstract void onVideoData(Participant participant, Integer transactionId, Integer height, Integer width,
|
||||||
|
Boolean videoActive, Boolean audioActive);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a Session given its id
|
* Returns a Session given its id
|
||||||
|
@ -422,6 +425,9 @@ public abstract class SessionManager {
|
||||||
log.warn("Error closing session '{}': {}", sessionId, e.getMessage());
|
log.warn("Error closing session '{}': {}", sessionId, e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (this.sessionGarbageCollectorTimer != null) {
|
||||||
|
this.sessionGarbageCollectorTimer.cancelTimer();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
|
@ -431,7 +437,8 @@ public abstract class SessionManager {
|
||||||
"Garbage collector for non active sessions is disabled (property 'OPENVIDU_SESSIONS_GARBAGE_INTERVAL' is 0)");
|
"Garbage collector for non active sessions is disabled (property 'OPENVIDU_SESSIONS_GARBAGE_INTERVAL' is 0)");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
new UpdatableTimerTask(() -> {
|
|
||||||
|
this.sessionGarbageCollectorTimer = new UpdatableTimerTask(() -> {
|
||||||
|
|
||||||
// Remove all non active sessions created more than the specified time
|
// Remove all non active sessions created more than the specified time
|
||||||
log.info("Running non active sessions garbage collector...");
|
log.info("Running non active sessions garbage collector...");
|
||||||
|
@ -477,7 +484,9 @@ public abstract class SessionManager {
|
||||||
log.warn("Possible ghost session {}", sessionActive.getSessionId());
|
log.warn("Possible ghost session {}", sessionActive.getSessionId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, () -> new Long(openviduConfig.getSessionGarbageInterval() * 1000)).updateTimer();
|
}, () -> new Long(openviduConfig.getSessionGarbageInterval() * 1000));
|
||||||
|
|
||||||
|
this.sessionGarbageCollectorTimer.updateTimer();
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
"Garbage collector for non active sessions initialized. Running every {} seconds and cleaning up non active Sessions more than {} seconds old",
|
"Garbage collector for non active sessions initialized. Running every {} seconds and cleaning up non active Sessions more than {} seconds old",
|
||||||
|
|
|
@ -57,6 +57,8 @@ public abstract class KmsManager {
|
||||||
|
|
||||||
private Map<String, Lock> kmsReconnectionLocks = new ConcurrentHashMap<>();
|
private Map<String, Lock> kmsReconnectionLocks = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private UpdatableTimerTask kurentoReconnectTimer;
|
||||||
|
|
||||||
public class KmsLoad implements Comparable<KmsLoad> {
|
public class KmsLoad implements Comparable<KmsLoad> {
|
||||||
|
|
||||||
private Kms kms;
|
private Kms kms;
|
||||||
|
@ -231,10 +233,9 @@ public abstract class KmsManager {
|
||||||
|
|
||||||
// TODO: this is a fix for the lack of reconnected event
|
// TODO: this is a fix for the lack of reconnected event
|
||||||
kmsReconnectionLocks.putIfAbsent(kms.getId(), new ReentrantLock());
|
kmsReconnectionLocks.putIfAbsent(kms.getId(), new ReentrantLock());
|
||||||
final UpdatableTimerTask[] TIMER = new UpdatableTimerTask[1];
|
|
||||||
final AtomicInteger ITERATION = new AtomicInteger(0);
|
final AtomicInteger ITERATION = new AtomicInteger(0);
|
||||||
|
|
||||||
TIMER[0] = new UpdatableTimerTask(() -> {
|
kurentoReconnectTimer = new UpdatableTimerTask(() -> {
|
||||||
boolean lockAcquired = false;
|
boolean lockAcquired = false;
|
||||||
try {
|
try {
|
||||||
if (kmsReconnectionLocks.get(kms.getId()).tryLock(5, TimeUnit.SECONDS)) {
|
if (kmsReconnectionLocks.get(kms.getId()).tryLock(5, TimeUnit.SECONDS)) {
|
||||||
|
@ -245,7 +246,7 @@ public abstract class KmsManager {
|
||||||
log.info(
|
log.info(
|
||||||
"Timer of KMS with uri {} and KurentoClient [{}] cancelled (reconnected event received during interval wait)",
|
"Timer of KMS with uri {} and KurentoClient [{}] cancelled (reconnected event received during interval wait)",
|
||||||
kms.getUri(), kms.getKurentoClient().toString());
|
kms.getUri(), kms.getKurentoClient().toString());
|
||||||
TIMER[0].cancelTimer();
|
kurentoReconnectTimer.cancelTimer();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -253,14 +254,14 @@ public abstract class KmsManager {
|
||||||
log.info(
|
log.info(
|
||||||
"Timer of KMS with uri {} and KurentoClient [{}] has been closed. Cancelling Timer",
|
"Timer of KMS with uri {} and KurentoClient [{}] has been closed. Cancelling Timer",
|
||||||
kms.getUri(), kms.getKurentoClient().toString());
|
kms.getUri(), kms.getKurentoClient().toString());
|
||||||
TIMER[0].cancelTimer();
|
kurentoReconnectTimer.cancelTimer();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
kms.getKurentoClient().getServerManager().getInfo();
|
kms.getKurentoClient().getServerManager().getInfo();
|
||||||
log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected",
|
log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected",
|
||||||
kms.getUri(), kms.getKurentoClient().toString());
|
kms.getUri(), kms.getKurentoClient().toString());
|
||||||
TIMER[0].cancelTimer();
|
kurentoReconnectTimer.cancelTimer();
|
||||||
kms.setKurentoClientConnected(true);
|
kms.setKurentoClientConnected(true);
|
||||||
kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
|
kms.setTimeOfKurentoClientConnection(System.currentTimeMillis());
|
||||||
|
|
||||||
|
@ -296,7 +297,7 @@ public abstract class KmsManager {
|
||||||
}
|
}
|
||||||
}, () -> new Long(dynamicReconnectLoopSeconds(ITERATION.getAndIncrement()) * 1000));
|
}, () -> new Long(dynamicReconnectLoopSeconds(ITERATION.getAndIncrement()) * 1000));
|
||||||
|
|
||||||
TIMER[0].updateTimer();
|
kurentoReconnectTimer.updateTimer();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -365,6 +366,9 @@ public abstract class KmsManager {
|
||||||
this.kmss.values().forEach(kms -> {
|
this.kmss.values().forEach(kms -> {
|
||||||
kms.getKurentoClient().destroy();
|
kms.getKurentoClient().destroy();
|
||||||
});
|
});
|
||||||
|
if (kurentoReconnectTimer != null) {
|
||||||
|
kurentoReconnectTimer.cancelTimer();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String generateKmsId() {
|
public static String generateKmsId() {
|
||||||
|
|
|
@ -67,7 +67,7 @@ public class UpdatableTimerTask extends TimerTask {
|
||||||
try {
|
try {
|
||||||
task.run();
|
task.run();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Exception running UpdatableTimerTask: {}", e.getMessage());
|
log.error("Exception running UpdatableTimerTask: {} - {}", e.getMessage(), e.getStackTrace());
|
||||||
}
|
}
|
||||||
updateTimer();
|
updateTimer();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue