mirror of https://github.com/OpenVidu/openvidu.git
openvidu-server: UpdatableTimerTask. Use it in session garbage collector
parent
5c9e18c7ba
commit
a0ac5fa768
|
@ -23,8 +23,6 @@ import java.util.Iterator;
|
|||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
@ -60,6 +58,7 @@ import io.openvidu.server.utils.FormatChecker;
|
|||
import io.openvidu.server.utils.GeoLocation;
|
||||
import io.openvidu.server.utils.GeoLocationByIp;
|
||||
import io.openvidu.server.utils.QuarantineKiller;
|
||||
import io.openvidu.server.utils.UpdatableTimerTask;
|
||||
|
||||
public abstract class SessionManager {
|
||||
|
||||
|
@ -432,57 +431,54 @@ public abstract class SessionManager {
|
|||
"Garbage collector for non active sessions is disabled (property 'OPENVIDU_SESSIONS_GARBAGE_INTERVAL' is 0)");
|
||||
return;
|
||||
}
|
||||
TimerTask task = new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Remove all non active sessions created more than the specified time
|
||||
log.info("Running non active sessions garbage collector...");
|
||||
final long currentMillis = System.currentTimeMillis();
|
||||
new UpdatableTimerTask(() -> {
|
||||
|
||||
// Loop through all non active sessions. Safely remove them and clean all of
|
||||
// their data if their threshold has elapsed
|
||||
for (Iterator<Entry<String, Session>> iter = sessionsNotActive.entrySet().iterator(); iter.hasNext();) {
|
||||
final Session sessionNotActive = iter.next().getValue();
|
||||
final String sessionId = sessionNotActive.getSessionId();
|
||||
long sessionExistsSince = currentMillis - sessionNotActive.getStartTime();
|
||||
if (sessionExistsSince > (openviduConfig.getSessionGarbageThreshold() * 1000)) {
|
||||
try {
|
||||
if (sessionNotActive.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
|
||||
try {
|
||||
if (sessions.containsKey(sessionId)) {
|
||||
// The session passed to active during lock wait
|
||||
continue;
|
||||
}
|
||||
iter.remove();
|
||||
cleanCollections(sessionId);
|
||||
log.info("Non active session {} cleaned up by garbage collector", sessionId);
|
||||
} finally {
|
||||
sessionNotActive.closingLock.writeLock().unlock();
|
||||
// Remove all non active sessions created more than the specified time
|
||||
log.info("Running non active sessions garbage collector...");
|
||||
final long currentMillis = System.currentTimeMillis();
|
||||
|
||||
// Loop through all non active sessions. Safely remove them and clean all of
|
||||
// their data if their threshold has elapsed
|
||||
for (Iterator<Entry<String, Session>> iter = sessionsNotActive.entrySet().iterator(); iter.hasNext();) {
|
||||
final Session sessionNotActive = iter.next().getValue();
|
||||
final String sessionId = sessionNotActive.getSessionId();
|
||||
long sessionExistsSince = currentMillis - sessionNotActive.getStartTime();
|
||||
if (sessionExistsSince > (openviduConfig.getSessionGarbageThreshold() * 1000)) {
|
||||
try {
|
||||
if (sessionNotActive.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
|
||||
try {
|
||||
if (sessions.containsKey(sessionId)) {
|
||||
// The session passed to active during lock wait
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
log.error(
|
||||
"Timeout waiting for Session closing lock to be available for garbage collector to clean session {}",
|
||||
sessionId);
|
||||
iter.remove();
|
||||
cleanCollections(sessionId);
|
||||
log.info("Non active session {} cleaned up by garbage collector", sessionId);
|
||||
} finally {
|
||||
sessionNotActive.closingLock.writeLock().unlock();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
} else {
|
||||
log.error(
|
||||
"InterruptedException while waiting for Session closing lock to be available for garbage collector to clean session {}",
|
||||
"Timeout waiting for Session closing lock to be available for garbage collector to clean session {}",
|
||||
sessionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Warn about possible ghost sessions
|
||||
for (Iterator<Entry<String, Session>> iter = sessions.entrySet().iterator(); iter.hasNext();) {
|
||||
final Session sessionActive = iter.next().getValue();
|
||||
if (sessionActive.getParticipants().size() == 0) {
|
||||
log.warn("Possible ghost session {}", sessionActive.getSessionId());
|
||||
} catch (InterruptedException e) {
|
||||
log.error(
|
||||
"InterruptedException while waiting for Session closing lock to be available for garbage collector to clean session {}",
|
||||
sessionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
new Timer().scheduleAtFixedRate(task, openviduConfig.getSessionGarbageInterval() * 1000,
|
||||
openviduConfig.getSessionGarbageInterval() * 1000);
|
||||
|
||||
// Warn about possible ghost sessions
|
||||
for (Iterator<Entry<String, Session>> iter = sessions.entrySet().iterator(); iter.hasNext();) {
|
||||
final Session sessionActive = iter.next().getValue();
|
||||
if (sessionActive.getParticipants().size() == 0) {
|
||||
log.warn("Possible ghost session {}", sessionActive.getSessionId());
|
||||
}
|
||||
}
|
||||
}, () -> new Long(openviduConfig.getSessionGarbageInterval() * 1000)).updateTimer();
|
||||
|
||||
log.info(
|
||||
"Garbage collector for non active sessions initialized. Running every {} seconds and cleaning up non active Sessions more than {} seconds old",
|
||||
openviduConfig.getSessionGarbageInterval(), openviduConfig.getSessionGarbageThreshold());
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
package io.openvidu.server.utils;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class UpdatableTimerTask extends TimerTask {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(UpdatableTimerTask.class);
|
||||
|
||||
private Runnable task;
|
||||
private Supplier<Long> period;
|
||||
private Long oldP;
|
||||
private Timer timer;
|
||||
|
||||
/**
|
||||
* @param task The task to run periodically.
|
||||
* @param period Delay before first execution and period to wait between
|
||||
* executions.Besides, this function will be called after each
|
||||
* execution of the task to update the period if necessary. This
|
||||
* way, the current wait period will always be respected before
|
||||
* updating the value (if the function returns a different long
|
||||
* than previous one just after the task ends).
|
||||
*/
|
||||
public UpdatableTimerTask(Runnable task, Supplier<Long> period) {
|
||||
super();
|
||||
Objects.requireNonNull(task);
|
||||
Objects.requireNonNull(period);
|
||||
this.task = task;
|
||||
this.period = period;
|
||||
}
|
||||
|
||||
private UpdatableTimerTask(Runnable task, Supplier<Long> period, Long oldP) {
|
||||
this(task, period);
|
||||
this.oldP = oldP;
|
||||
}
|
||||
|
||||
public final void updateTimer() {
|
||||
Long p = period.get();
|
||||
Objects.requireNonNull(p);
|
||||
if (oldP == null || !oldP.equals(p)) {
|
||||
cancel();
|
||||
if (timer == null) {
|
||||
timer = new Timer();
|
||||
} else {
|
||||
timer.cancel();
|
||||
timer.purge();
|
||||
}
|
||||
timer.schedule(new UpdatableTimerTask(task, period, p), p, p);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
// Protect the inner run method so if any exception is thrown, the following
|
||||
// scheduled TimerTask doesn't get cancelled
|
||||
try {
|
||||
task.run();
|
||||
} catch (Exception e) {
|
||||
log.error("Exception running UpdatableTimerTask: {}", e.getMessage());
|
||||
}
|
||||
updateTimer();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue