Bug fix: collections are now cleaned up on EOFExceptions of web sockets

pull/30/head
pabloFuente 2018-01-10 17:26:54 +01:00
parent f348b5380a
commit 5f7bd94ad3
5 changed files with 55 additions and 34 deletions

View File

@ -27,7 +27,6 @@ public abstract class SessionManager {
protected ConcurrentMap<String, Session> sessions = new ConcurrentHashMap<>(); protected ConcurrentMap<String, Session> sessions = new ConcurrentHashMap<>();
protected ConcurrentMap<String, ConcurrentHashMap<String, Token>> sessionidTokenTokenobj = new ConcurrentHashMap<>(); protected ConcurrentMap<String, ConcurrentHashMap<String, Token>> sessionidTokenTokenobj = new ConcurrentHashMap<>();
protected ConcurrentMap<String, ConcurrentHashMap<String, Participant>> sessionidParticipantpublicidParticipant = new ConcurrentHashMap<>(); protected ConcurrentMap<String, ConcurrentHashMap<String, Participant>> sessionidParticipantpublicidParticipant = new ConcurrentHashMap<>();
protected ConcurrentMap<String, Boolean> insecureUsers = new ConcurrentHashMap<>(); protected ConcurrentMap<String, Boolean> insecureUsers = new ConcurrentHashMap<>();
private volatile boolean closed = false; private volatile boolean closed = false;
@ -152,7 +151,7 @@ public abstract class SessionManager {
this.sessionidTokenTokenobj.put(sessionId, new ConcurrentHashMap<>()); this.sessionidTokenTokenobj.put(sessionId, new ConcurrentHashMap<>());
this.sessionidParticipantpublicidParticipant.put(sessionId, new ConcurrentHashMap<>()); this.sessionidParticipantpublicidParticipant.put(sessionId, new ConcurrentHashMap<>());
showMap(); showTokens();
return sessionId; return sessionId;
} }
@ -162,7 +161,7 @@ public abstract class SessionManager {
if (isMetadataFormatCorrect(serverMetadata)) { if (isMetadataFormatCorrect(serverMetadata)) {
String token = new BigInteger(130, new SecureRandom()).toString(32); String token = new BigInteger(130, new SecureRandom()).toString(32);
this.sessionidTokenTokenobj.get(sessionId).put(token, new Token(token, role, serverMetadata)); this.sessionidTokenTokenobj.get(sessionId).put(token, new Token(token, role, serverMetadata));
showMap(); showTokens();
return token; return token;
} else { } else {
throw new OpenViduException(Code.GENERIC_ERROR_CODE, throw new OpenViduException(Code.GENERIC_ERROR_CODE,
@ -184,14 +183,7 @@ public abstract class SessionManager {
} else { } else {
this.sessionidParticipantpublicidParticipant.putIfAbsent(sessionId, new ConcurrentHashMap<>()); this.sessionidParticipantpublicidParticipant.putIfAbsent(sessionId, new ConcurrentHashMap<>());
this.sessionidTokenTokenobj.putIfAbsent(sessionId, new ConcurrentHashMap<>()); this.sessionidTokenTokenobj.putIfAbsent(sessionId, new ConcurrentHashMap<>());
this.sessionidTokenTokenobj.get(sessionId).putIfAbsent(token, new Token(token)); this.sessionidTokenTokenobj.get(sessionId).putIfAbsent(token, new Token(token));
/*
* this.sessionidParticipantpublicidParticipant.get(sessionId).putIfAbsent(
* token, new Participant());
* this.sessionidTokenTokenobj.get(sessionId).putIfAbsent(token, new
* Token(token));
*/
return true; return true;
} }
} }
@ -268,10 +260,16 @@ public abstract class SessionManager {
} }
} }
protected void showMap() { public void showTokens() {
System.out.println("------------------------------"); log.info("<SESSIONID, TOKENS>: {}", this.sessionidTokenTokenobj.toString());
System.out.println(this.sessionidTokenTokenobj.toString()); }
System.out.println("------------------------------");
public void showInsecureParticipants() {
log.info("<INSECURE_PARTICIPANTS>: {}", this.insecureUsers.toString());
}
public void showAllParticipants() {
log.info("<SESSIONID, PARTICIPANTS>: {}", this.sessionidParticipantpublicidParticipant.toString());
} }

View File

@ -167,7 +167,7 @@ public class KurentoSession implements Session {
closePipeline(); closePipeline();
log.debug("Room {} closed", this.sessionId); log.debug("Session {} closed", this.sessionId);
if (destroyKurentoClient) { if (destroyKurentoClient) {
kurentoClient.destroy(); kurentoClient.destroy();
@ -204,7 +204,7 @@ public class KurentoSession implements Session {
participants.remove(participant.getParticipantPrivateId()); participants.remove(participant.getParticipantPrivateId());
log.debug("SESSION {}: Cancel receiving media from user '{}' for other users", this.sessionId, participant.getParticipantPublicId()); log.debug("SESSION {}: Cancel receiving media from participant '{}' for other participant", this.sessionId, participant.getParticipantPublicId());
for (KurentoParticipant other : participants.values()) { for (KurentoParticipant other : participants.values()) {
other.cancelReceivingMedia(participant.getParticipantPublicId()); other.cancelReceivingMedia(participant.getParticipantPublicId());
} }

View File

@ -108,7 +108,7 @@ public class KurentoSessionManager extends SessionManager {
} }
} }
showMap(); showTokens();
Set<Participant> remainingParticipants = null; Set<Participant> remainingParticipants = null;
try { try {
@ -125,7 +125,7 @@ public class KurentoSessionManager extends SessionManager {
sessionidParticipantpublicidParticipant.remove(sessionId); sessionidParticipantpublicidParticipant.remove(sessionId);
sessionidTokenTokenobj.remove(sessionId); sessionidTokenTokenobj.remove(sessionId);
showMap(); showTokens();
log.warn("Session '{}' removed and closed", sessionId); log.warn("Session '{}' removed and closed", sessionId);
} }

View File

@ -2,6 +2,8 @@ package io.openvidu.server.rpc;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.kurento.jsonrpc.DefaultJsonRpcHandler; import org.kurento.jsonrpc.DefaultJsonRpcHandler;
import org.kurento.jsonrpc.Session; import org.kurento.jsonrpc.Session;
@ -35,6 +37,8 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
@Autowired @Autowired
RpcNotificationService notificationService; RpcNotificationService notificationService;
private ConcurrentMap<String, Boolean> webSocketTransportError = new ConcurrentHashMap<>();
@Override @Override
public void handleRequest(Transaction transaction, Request<JsonObject> request) throws Exception { public void handleRequest(Transaction transaction, Request<JsonObject> request) throws Exception {
@ -50,9 +54,6 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
RpcConnection rpcConnection = notificationService.addTransaction(transaction, request); RpcConnection rpcConnection = notificationService.addTransaction(transaction, request);
// ParticipantRequest participantRequest = new ParticipantRequest(rpcSessionId,
// Integer.toString(request.getId()));
transaction.startAsync(); transaction.startAsync();
switch (request.getMethod()) { switch (request.getMethod()) {
@ -111,12 +112,12 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
sessionManager.joinRoom(participant, sessionId, request.getId()); sessionManager.joinRoom(participant, sessionId, request.getId());
} else { } else {
System.out.println("Error: metadata format is incorrect"); log.error("ERROR: Metadata format is incorrect");
throw new OpenViduException(Code.USER_METADATA_FORMAT_INVALID_ERROR_CODE, throw new OpenViduException(Code.USER_METADATA_FORMAT_INVALID_ERROR_CODE,
"Unable to join room. The metadata received has an invalid format"); "Unable to join room. The metadata received has an invalid format");
} }
} else { } else {
System.out.println("Error: sessionId or token not valid"); log.error("ERROR: sessionId or token not valid");
throw new OpenViduException(Code.USER_UNAUTHORIZED_ERROR_CODE, throw new OpenViduException(Code.USER_UNAUTHORIZED_ERROR_CODE,
"Unable to join room. The user is not authorized"); "Unable to join room. The user is not authorized");
} }
@ -238,18 +239,31 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
@Override @Override
public void afterConnectionClosed(Session rpcSession, String status) throws Exception { public void afterConnectionClosed(Session rpcSession, String status) throws Exception {
log.info("Connection closed for WebSocket session: {} - Status: {}", rpcSession.getSessionId(), status); log.info("Connection closed for WebSocket session: {} - Status: {}", rpcSession.getSessionId(), status);
this.notificationService.showRpcConnections();
String rpcSessionId = rpcSession.getSessionId();
if (this.webSocketTransportError.get(rpcSessionId) != null) {
log.warn(
"Evicting participant with private id {} because a transport error took place and its web socket connection is now closed",
rpcSession.getSessionId());
this.leaveRoomAfterConnClosed(rpcSessionId);
this.webSocketTransportError.remove(rpcSessionId);
}
} }
@Override @Override
public void handleTransportError(Session rpcSession, Throwable exception) throws Exception { public void handleTransportError(Session rpcSession, Throwable exception) throws Exception {
log.error("Transport exception for WebSocket session: {} - Exception: {}", rpcSession.getSessionId(), log.error("Transport exception for WebSocket session: {} - Exception: {}", rpcSession.getSessionId(),
exception.getMessage()); exception);
if ("EOFException".equals(exception.getClass().getSimpleName())) {
// Store WebSocket connection interrupted exception for this web socket to
// automatically evict the participant on "afterConnectionClosed" event
this.webSocketTransportError.put(rpcSession.getSessionId(), true);
}
} }
@Override @Override
public void handleUncaughtException(Session rpcSession, Exception exception) { public void handleUncaughtException(Session rpcSession, Exception exception) {
log.error("Uncaught exception for WebSocket session: {} - Exception: {}", rpcSession.getSessionId(), log.error("Uncaught exception for WebSocket session: {} - Exception: {}", rpcSession.getSessionId(), exception);
exception.getMessage());
} }
@Override @Override

View File

@ -38,7 +38,8 @@ public class RpcNotificationService {
public void sendResponse(String participantPrivateId, Integer transactionId, Object result) { public void sendResponse(String participantPrivateId, Integer transactionId, Object result) {
Transaction t = getAndRemoveTransaction(participantPrivateId, transactionId); Transaction t = getAndRemoveTransaction(participantPrivateId, transactionId);
if (t == null) { if (t == null) {
log.error("No transaction {} found for paticipant with private id {}, unable to send result {}", transactionId, participantPrivateId, result); log.error("No transaction {} found for paticipant with private id {}, unable to send result {}",
transactionId, participantPrivateId, result);
return; return;
} }
try { try {
@ -48,10 +49,12 @@ public class RpcNotificationService {
} }
} }
public void sendErrorResponse(String participantPrivateId, Integer transactionId, Object data, OpenViduException error) { public void sendErrorResponse(String participantPrivateId, Integer transactionId, Object data,
OpenViduException error) {
Transaction t = getAndRemoveTransaction(participantPrivateId, transactionId); Transaction t = getAndRemoveTransaction(participantPrivateId, transactionId);
if (t == null) { if (t == null) {
log.error("No transaction {} found for paticipant with private id {}, unable to send result {}", transactionId, participantPrivateId, data); log.error("No transaction {} found for paticipant with private id {}, unable to send result {}",
transactionId, participantPrivateId, data);
return; return;
} }
try { try {
@ -65,7 +68,8 @@ public class RpcNotificationService {
public void sendNotification(final String participantPrivateId, final String method, final Object params) { public void sendNotification(final String participantPrivateId, final String method, final Object params) {
RpcConnection rpcSession = rpcConnections.get(participantPrivateId); RpcConnection rpcSession = rpcConnections.get(participantPrivateId);
if (rpcSession == null || rpcSession.getSession() == null) { if (rpcSession == null || rpcSession.getSession() == null) {
log.error("No rpc session found for private id {}, unable to send notification {}: {}", participantPrivateId, method, params); log.error("No rpc session found for private id {}, unable to send notification {}: {}",
participantPrivateId, method, params);
return; return;
} }
Session s = rpcSession.getSession(); Session s = rpcSession.getSession();
@ -73,7 +77,8 @@ public class RpcNotificationService {
try { try {
s.sendNotification(method, params); s.sendNotification(method, params);
} catch (Exception e) { } catch (Exception e) {
log.error("Exception sending notification '{}': {} to participant with private id {}", method, params, participantPrivateId, e); log.error("Exception sending notification '{}': {} to participant with private id {}", method, params,
participantPrivateId, e);
} }
} }
@ -105,4 +110,8 @@ public class RpcNotificationService {
return t; return t;
} }
public void showRpcConnections() {
log.info("<PRIVATE_ID, RPC_CONNECTION>: {}", RpcNotificationService.rpcConnections.toString());
}
} }