openvidu-server: fix close session and init session race condition

pull/707/head
pabloFuente 2022-03-17 17:16:17 +01:00
parent 73d275a42e
commit 85c0cc0584
4 changed files with 146 additions and 71 deletions

View File

@ -538,42 +538,41 @@ public abstract class SessionManager {
if (session == null) { if (session == null) {
throw new OpenViduException(Code.ROOM_NOT_FOUND_ERROR_CODE, "Session '" + sessionId + "' not found"); throw new OpenViduException(Code.ROOM_NOT_FOUND_ERROR_CODE, "Session '" + sessionId + "' not found");
} }
if (session.isClosed()) {
this.cleanCollections(sessionId);
throw new OpenViduException(Code.ROOM_CLOSED_ERROR_CODE, "Session '" + sessionId + "' already closed");
}
Set<Participant> participants = getParticipants(sessionId);
boolean sessionClosedByLastParticipant = false; try {
if (session.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
for (Participant p : participants) { try {
try { if (session.isClosed()) {
sessionClosedByLastParticipant = this.evictParticipant(p, null, null, reason); this.cleanCollections(sessionId);
} catch (OpenViduException e) { throw new OpenViduException(Code.ROOM_CLOSED_ERROR_CODE,
log.warn("Error evicting participant '{}' from session '{}'", p.getParticipantPublicId(), sessionId, e); "Session '" + sessionId + "' already closed");
}
}
if (!sessionClosedByLastParticipant) {
// This code should only be executed when there were no participants connected
// to the session. That is: if the session was in the automatic recording stop
// timeout with INDIVIDUAL recording (no docker participant connected)
try {
if (session.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
try {
if (session.isClosed()) {
return;
}
this.closeSessionAndEmptyCollections(session, reason, true);
} finally {
session.closingLock.writeLock().unlock();
} }
} else {
log.error("Timeout waiting for Session {} closing lock to be available", sessionId); boolean sessionClosedByLastParticipant = false;
Set<Participant> participants = getParticipants(sessionId);
for (Participant p : participants) {
try {
sessionClosedByLastParticipant = this.evictParticipant(p, null, null, reason);
} catch (OpenViduException e) {
log.warn("Error evicting participant '{}' from session '{}'", p.getParticipantPublicId(),
sessionId, e);
}
}
if (!sessionClosedByLastParticipant) {
// This code should only be executed when there were no participants connected
// to the session. That is: if the session was in the automatic recording stop
// timeout with INDIVIDUAL recording (no docker participant connected)
this.closeSessionAndEmptyCollections(session, reason, true);
}
} finally {
session.closingLock.writeLock().unlock();
} }
} catch (InterruptedException e) { } else {
log.error("InterruptedException while waiting for Session {} closing lock to be available", sessionId); log.error("Timeout waiting for Session {} closing lock to be available", sessionId);
} }
} catch (InterruptedException e) {
log.error("InterruptedException while waiting for Session {} closing lock to be available", sessionId);
} }
} }
@ -669,15 +668,15 @@ public abstract class SessionManager {
} }
public void closeAllSessionsAndRecordingsOfKms(Kms kms, EndReason reason) { public void closeAllSessionsAndRecordingsOfKms(Kms kms, EndReason reason) {
// Close all active sessions
kms.getKurentoSessions().forEach(kSession -> {
this.closeSession(kSession.getSessionId(), reason);
});
// Close all non active sessions configured with this Media Node // Close all non active sessions configured with this Media Node
this.closeNonActiveSessions(sessionNotActive -> { this.closeNonActiveSessions(sessionNotActive -> {
return (sessionNotActive.getSessionProperties().mediaNode() != null return (sessionNotActive.getSessionProperties().mediaNode() != null
&& kms.getId().equals(sessionNotActive.getSessionProperties().mediaNode())); && kms.getId().equals(sessionNotActive.getSessionProperties().mediaNode()));
}); });
// Close all active sessions
kms.getKurentoSessions().forEach(kSession -> {
this.closeSession(kSession.getSessionId(), reason);
});
// Stop all external recordings // Stop all external recordings
kms.getActiveRecordings().forEach(recordingIdSessionId -> { kms.getActiveRecordings().forEach(recordingIdSessionId -> {

View File

@ -186,6 +186,13 @@ public abstract class KmsManager {
public void disconnected() { public void disconnected() {
final Kms kms = kmss.get(kmsId); final Kms kms = kmss.get(kmsId);
// TODO: take a look at this
// if (kms.getTimeOfKurentoClientDisconnection() > 0) {
// log.warn("Event disconnected of KurentoClient {} is already being processed by other thread",
// kms.getKurentoClient().toString());
// return;
// }
kms.setKurentoClientConnected(false); kms.setKurentoClientConnected(false);
kms.setTimeOfKurentoClientDisconnection(System.currentTimeMillis()); kms.setTimeOfKurentoClientDisconnection(System.currentTimeMillis());
@ -225,7 +232,11 @@ public abstract class KmsManager {
log.warn("Removing Media Node {} after crash", kms.getId()); log.warn("Removing Media Node {} after crash", kms.getId());
String environmentId = removeMediaNodeUponCrash(kms.getId()); String environmentId = removeMediaNodeUponCrash(kms.getId());
// 2. Close all sessions and recordings with reason "nodeCrashed" // 2. Send nodeCrashed webhook event
sessionEventsHandler.onMediaNodeCrashed(kms, environmentId, timeOfKurentoDisconnection,
affectedSessionIds, affectedRecordingIds);
// 3. Close all sessions and recordings with reason "nodeCrashed"
log.warn("Closing {} sessions hosted by KMS with uri {}: {}", kms.getKurentoSessions().size(), log.warn("Closing {} sessions hosted by KMS with uri {}: {}", kms.getKurentoSessions().size(),
kms.getUri(), kms.getKurentoSessions().stream().map(s -> s.getSessionId()) kms.getUri(), kms.getKurentoSessions().stream().map(s -> s.getSessionId())
.collect(Collectors.joining(",", "[", "]"))); .collect(Collectors.joining(",", "[", "]")));
@ -237,10 +248,6 @@ public abstract class KmsManager {
RemoteOperationUtils.revertToRunRemoteOperations(); RemoteOperationUtils.revertToRunRemoteOperations();
} }
// 3. Send nodeCrashed webhook event
sessionEventsHandler.onMediaNodeCrashed(kms, environmentId, timeOfKurentoDisconnection,
affectedSessionIds, affectedRecordingIds);
if (infiniteRetry()) { if (infiniteRetry()) {
disconnected(); disconnected();
} }

View File

@ -25,9 +25,9 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import io.openvidu.java.client.*;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -53,7 +53,18 @@ import com.google.gson.JsonParser;
import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code; import io.openvidu.client.OpenViduException.Code;
import io.openvidu.client.internal.ProtocolElements; import io.openvidu.client.internal.ProtocolElements;
import io.openvidu.java.client.ConnectionProperties;
import io.openvidu.java.client.ConnectionType;
import io.openvidu.java.client.IceServerProperties;
import io.openvidu.java.client.KurentoOptions;
import io.openvidu.java.client.MediaMode;
import io.openvidu.java.client.OpenViduRole;
import io.openvidu.java.client.Recording.OutputMode; import io.openvidu.java.client.Recording.OutputMode;
import io.openvidu.java.client.RecordingLayout;
import io.openvidu.java.client.RecordingMode;
import io.openvidu.java.client.RecordingProperties;
import io.openvidu.java.client.SessionProperties;
import io.openvidu.java.client.VideoCodec;
import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.config.OpenviduConfig;
import io.openvidu.server.core.EndReason; import io.openvidu.server.core.EndReason;
import io.openvidu.server.core.IdentifierPrefixes; import io.openvidu.server.core.IdentifierPrefixes;
@ -101,26 +112,66 @@ public class SessionRestController {
} }
String sessionId; String sessionId;
if (sessionProperties.customSessionId() != null && !sessionProperties.customSessionId().isEmpty()) { Lock sessionLock = null;
if (sessionManager.getSessionWithNotActive(sessionProperties.customSessionId()) != null) {
log.warn("Session {} is already created", sessionProperties.customSessionId()); try {
return new ResponseEntity<>(HttpStatus.CONFLICT); if (sessionProperties.customSessionId() != null && !sessionProperties.customSessionId().isEmpty()) {
// Session has custom session id
sessionId = sessionProperties.customSessionId();
Session session = sessionManager.getSessionWithNotActive(sessionProperties.customSessionId());
if (session != null) {
// The session appears to already exist
if (session.closingLock.readLock().tryLock()) {
// The session indeed exists and is not being closed
try {
log.warn("Session {} is already created", sessionProperties.customSessionId());
return new ResponseEntity<>(HttpStatus.CONFLICT);
} finally {
session.closingLock.readLock().unlock();
}
} else {
// The session exists but is being closed
log.warn("Session {} is in the process of closing while calling POST {}/sessions", sessionId,
RequestMappings.API);
try {
if (session.closingLock.writeLock().tryLock(15, TimeUnit.SECONDS)) {
if (sessionManager
.getSessionWithNotActive(sessionProperties.customSessionId()) != null) {
// Other thread took the lock before and rebuilt the closing session
session.closingLock.writeLock().unlock();
return new ResponseEntity<>(HttpStatus.CONFLICT);
} else {
// This thread will rebuild the closing session
sessionLock = session.closingLock.writeLock();
}
} else {
log.error("Timeout waiting for Session {} closing lock to be available", sessionId);
}
} catch (InterruptedException e) {
log.error("InterruptedException while waiting for Session {} closing lock to be available",
sessionId);
}
}
}
} else {
sessionId = IdentifierPrefixes.SESSION_ID + RandomStringUtils.randomAlphabetic(1).toUpperCase()
+ RandomStringUtils.randomAlphanumeric(9);
} }
sessionId = sessionProperties.customSessionId();
} else {
sessionId = IdentifierPrefixes.SESSION_ID + RandomStringUtils.randomAlphabetic(1).toUpperCase()
+ RandomStringUtils.randomAlphanumeric(9);
}
Session sessionNotActive = sessionManager.storeSessionNotActive(sessionId, sessionProperties); Session sessionNotActive = sessionManager.storeSessionNotActive(sessionId, sessionProperties);
if (sessionNotActive == null) {
if (sessionNotActive == null) { return new ResponseEntity<>(HttpStatus.CONFLICT);
return new ResponseEntity<>(HttpStatus.CONFLICT); } else {
} else { log.info("New session {} created {}", sessionId, this.sessionManager.getSessionsWithNotActive().stream()
log.info("New session {} created {}", sessionId, this.sessionManager.getSessionsWithNotActive().stream() .map(Session::getSessionId).collect(Collectors.toList()).toString());
.map(Session::getSessionId).collect(Collectors.toList()).toString()); return new ResponseEntity<>(sessionNotActive.toJson(false, false).toString(),
return new ResponseEntity<>(sessionNotActive.toJson(false, false).toString(), RestUtils.getResponseHeaders(), HttpStatus.OK);
RestUtils.getResponseHeaders(), HttpStatus.OK); }
} finally {
if (sessionLock != null) {
sessionLock.unlock();
}
} }
} }
@ -133,8 +184,17 @@ public class SessionRestController {
Session session = this.sessionManager.getSession(sessionId); Session session = this.sessionManager.getSession(sessionId);
if (session != null) { if (session != null) {
JsonObject response = session.toJson(pendingConnections, webRtcStats); try {
return new ResponseEntity<>(response.toString(), RestUtils.getResponseHeaders(), HttpStatus.OK); JsonObject response = session.toJson(pendingConnections, webRtcStats);
return new ResponseEntity<>(response.toString(), RestUtils.getResponseHeaders(), HttpStatus.OK);
} catch (OpenViduException e) {
if (e.getCodeValue() == Code.ROOM_CLOSED_ERROR_CODE.getValue()) {
log.warn("Session closed while calling GET {}/sessions/{}", RequestMappings.API, sessionId);
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
} else {
throw e;
}
}
} else { } else {
Session sessionNotActive = this.sessionManager.getSessionNotActive(sessionId); Session sessionNotActive = this.sessionManager.getSessionNotActive(sessionId);
if (sessionNotActive != null) { if (sessionNotActive != null) {
@ -157,10 +217,16 @@ public class SessionRestController {
JsonObject json = new JsonObject(); JsonObject json = new JsonObject();
JsonArray jsonArray = new JsonArray(); JsonArray jsonArray = new JsonArray();
sessions.forEach(session -> { sessions.forEach(session -> {
JsonObject sessionJson = session.toJson(pendingConnections, webRtcStats); try {
jsonArray.add(sessionJson); JsonObject sessionJson = session.toJson(pendingConnections, webRtcStats);
jsonArray.add(sessionJson);
} catch (OpenViduException e) {
if (e.getCodeValue() != Code.ROOM_CLOSED_ERROR_CODE.getValue()) {
throw e;
}
}
}); });
json.addProperty("numberOfElements", sessions.size()); json.addProperty("numberOfElements", jsonArray.size());
json.add("content", jsonArray); json.add("content", jsonArray);
return new ResponseEntity<>(json.toString(), RestUtils.getResponseHeaders(), HttpStatus.OK); return new ResponseEntity<>(json.toString(), RestUtils.getResponseHeaders(), HttpStatus.OK);
} }
@ -655,6 +721,9 @@ public class SessionRestController {
Token token = sessionManager.newToken(session, connectionProperties.getRole(), Token token = sessionManager.newToken(session, connectionProperties.getRole(),
connectionProperties.getData(), connectionProperties.record(), connectionProperties.getData(), connectionProperties.record(),
connectionProperties.getKurentoOptions(), connectionProperties.getCustomIceServers()); connectionProperties.getKurentoOptions(), connectionProperties.getCustomIceServers());
log.info("Generated token {}", token.getToken());
return new ResponseEntity<>(token.toJsonAsParticipant().toString(), RestUtils.getResponseHeaders(), return new ResponseEntity<>(token.toJsonAsParticipant().toString(), RestUtils.getResponseHeaders(),
HttpStatus.OK); HttpStatus.OK);
} catch (Exception e) { } catch (Exception e) {
@ -923,7 +992,8 @@ public class SessionRestController {
IceServerProperties.Builder iceServerPropertiesBuilder = new IceServerProperties.Builder(); IceServerProperties.Builder iceServerPropertiesBuilder = new IceServerProperties.Builder();
iceServerPropertiesBuilder.url(customIceServerJson.get("url").getAsString()); iceServerPropertiesBuilder.url(customIceServerJson.get("url").getAsString());
if (customIceServerJson.has("staticAuthSecret")) { if (customIceServerJson.has("staticAuthSecret")) {
iceServerPropertiesBuilder.staticAuthSecret(customIceServerJson.get("staticAuthSecret").getAsString()); iceServerPropertiesBuilder
.staticAuthSecret(customIceServerJson.get("staticAuthSecret").getAsString());
} }
if (customIceServerJson.has("username")) { if (customIceServerJson.has("username")) {
iceServerPropertiesBuilder.username(customIceServerJson.get("username").getAsString()); iceServerPropertiesBuilder.username(customIceServerJson.get("username").getAsString());
@ -937,9 +1007,10 @@ public class SessionRestController {
} catch (Exception e) { } catch (Exception e) {
throw new Exception("Type error in some parameter of 'customIceServers': " + e.getMessage()); throw new Exception("Type error in some parameter of 'customIceServers': " + e.getMessage());
} }
} else if(!openviduConfig.getWebrtcIceServersBuilders().isEmpty()){ } else if (!openviduConfig.getWebrtcIceServersBuilders().isEmpty()) {
// If not defined in connection, check if defined in openvidu config // If not defined in connection, check if defined in openvidu config
for (IceServerProperties.Builder iceServerPropertiesBuilder: openviduConfig.getWebrtcIceServersBuilders()) { for (IceServerProperties.Builder iceServerPropertiesBuilder : openviduConfig
.getWebrtcIceServersBuilders()) {
IceServerProperties.Builder configIceBuilder = iceServerPropertiesBuilder.clone(); IceServerProperties.Builder configIceBuilder = iceServerPropertiesBuilder.clone();
builder.addCustomIceServer(configIceBuilder.build()); builder.addCustomIceServer(configIceBuilder.build());
} }
@ -970,8 +1041,6 @@ public class SessionRestController {
.onlyPlayWithSubscribers(onlyPlayWithSubscribers).networkCache(networkCache).build(); .onlyPlayWithSubscribers(onlyPlayWithSubscribers).networkCache(networkCache).build();
} }
return builder; return builder;
} }

View File

@ -322,7 +322,7 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
} else { } else {
log.error("ERROR: token not valid"); log.error("ERROR: token not valid");
throw new OpenViduException(Code.USER_UNAUTHORIZED_ERROR_CODE, throw new OpenViduException(Code.USER_UNAUTHORIZED_ERROR_CODE,
"Unable to join session " + sessionId + ". Token " + token + "is not valid"); "Unable to join session " + sessionId + ". Token " + token + " is not valid");
} }
} }