Participants properly cleaned up after ws close in any case

pull/73/head
pabloFuente 2018-04-02 15:48:39 +02:00
parent 7f6916ebba
commit 42ea1e7ed4
3 changed files with 68 additions and 24 deletions

View File

@ -356,7 +356,7 @@ export class OpenViduInternal {
message: message
}, (error, response) => {
if (!!completionHandler) {
completionHandler(!!error ? new Error(error) : null);
completionHandler(!!error ? new Error(error.message) : null);
}
});
};

View File

@ -52,7 +52,30 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
log.debug("WebSocket session #{} - Request: {}", participantPrivateId, request);
RpcConnection rpcConnection = notificationService.addTransaction(transaction, request);
RpcConnection rpcConnection;
if (ProtocolElements.JOINROOM_METHOD.equals(request.getMethod())) {
// Store new RpcConnection information if method 'joinRoom'
rpcConnection = notificationService.newRpcConnection(transaction, request);
} else if (notificationService.getRpcConnection(participantPrivateId) == null) {
// Throw exception if any method is called before 'joinRoom'
log.warn(
"No connection found for participant with privateId {} when trying to execute method '{}'. Method 'Session.connect()' must be the first operation called in any session",
participantPrivateId, request.getMethod());
throw new OpenViduException(Code.TRANSPORT_ERROR_CODE,
"No connection found for participant with privateId " + participantPrivateId
+ ". Method 'Session.connect()' must be the first operation called in any session");
}
rpcConnection = notificationService.addTransaction(transaction, request);
String sessionId = rpcConnection.getSessionId();
if (sessionId == null && !ProtocolElements.JOINROOM_METHOD.equals(request.getMethod())) {
log.warn(
"No session information found for participant with privateId {} when trying to execute method '{}'. Method 'Session.connect()' must be the first operation called in any session",
participantPrivateId, request.getMethod());
throw new OpenViduException(Code.TRANSPORT_ERROR_CODE,
"No session information found for participant with privateId " + participantPrivateId
+ ". Method 'Session.connect()' must be the first operation called in any session");
}
transaction.startAsync();
@ -93,15 +116,15 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
String token = getStringParam(request, ProtocolElements.JOINROOM_TOKEN_PARAM);
String secret = getStringParam(request, ProtocolElements.JOINROOM_SECRET_PARAM);
String participantPrivatetId = rpcConnection.getParticipantPrivateId();
boolean recorder = false;
try {
recorder = getBooleanParam(request, ProtocolElements.JOINROOM_RECORDER_PARAM);
} catch (RuntimeException e) {
// Nothing happens. 'recorder' param to false
}
boolean generateRecorderParticipant = false;
if (openviduConfig.isOpenViduSecret(secret)) {
@ -119,7 +142,7 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
Token tokenObj = sessionManager.consumeToken(sessionId, participantPrivatetId, token);
Participant participant;
if (generateRecorderParticipant) {
participant = sessionManager.newRecorderParticipant(sessionId, participantPrivatetId, tokenObj,
clientMetadata);
@ -127,7 +150,7 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
participant = sessionManager.newParticipant(sessionId, participantPrivatetId, tokenObj,
clientMetadata);
}
rpcConnection.setSessionId(sessionId);
sessionManager.joinRoom(participant, sessionId, request.getId());
@ -259,6 +282,16 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
@Override
public void afterConnectionClosed(Session rpcSession, String status) throws Exception {
log.info("Connection closed for WebSocket session: {} - Status: {}", rpcSession.getSessionId(), status);
RpcConnection rpc = this.notificationService.closeRpcSession(rpcSession.getSessionId());
if (rpc != null && rpc.getSessionId() != null) {
io.openvidu.server.core.Session session = this.sessionManager.getSession(rpc.getSessionId());
if (session != null && session.getParticipantByPrivateId(rpc.getParticipantPrivateId()) != null) {
leaveRoomAfterConnClosed(rpc.getParticipantPrivateId());
}
}
this.notificationService.showRpcConnections();
String rpcSessionId = rpcSession.getSessionId();
if (this.webSocketTransportError.get(rpcSessionId) != null) {
@ -293,21 +326,24 @@ public class RpcHandler extends DefaultJsonRpcHandler<JsonObject> {
public static String getStringParam(Request<JsonObject> request, String key) {
if (request.getParams() == null || request.getParams().get(key) == null) {
throw new RuntimeException("Request element '" + key + "' is missing in method '" + request.getMethod() + "'. CHECK THAT 'openvidu-server' AND 'openvidu-browser' SHARE THE SAME VERSION NUMBER");
throw new RuntimeException("Request element '" + key + "' is missing in method '" + request.getMethod()
+ "'. CHECK THAT 'openvidu-server' AND 'openvidu-browser' SHARE THE SAME VERSION NUMBER");
}
return request.getParams().get(key).getAsString();
}
public static int getIntParam(Request<JsonObject> request, String key) {
if (request.getParams() == null || request.getParams().get(key) == null) {
throw new RuntimeException("Request element '" + key + "' is missing in method '" + request.getMethod() + "'. CHECK THAT 'openvidu-server' AND 'openvidu-browser' SHARE THE SAME VERSION NUMBER");
throw new RuntimeException("Request element '" + key + "' is missing in method '" + request.getMethod()
+ "'. CHECK THAT 'openvidu-server' AND 'openvidu-browser' SHARE THE SAME VERSION NUMBER");
}
return request.getParams().get(key).getAsInt();
}
public static boolean getBooleanParam(Request<JsonObject> request, String key) {
if (request.getParams() == null || request.getParams().get(key) == null) {
throw new RuntimeException("Request element '" + key + "' is missing in method '" + request.getMethod() + "'. CHECK THAT 'openvidu-server' AND 'openvidu-browser' SHARE THE SAME VERSION NUMBER");
throw new RuntimeException("Request element '" + key + "' is missing in method '" + request.getMethod()
+ "'. CHECK THAT 'openvidu-server' AND 'openvidu-browser' SHARE THE SAME VERSION NUMBER");
}
return request.getParams().get(key).getAsBoolean();
}

View File

@ -18,19 +18,22 @@ public class RpcNotificationService {
private static final Logger log = LoggerFactory.getLogger(RpcNotificationService.class);
private static ConcurrentMap<String, RpcConnection> rpcConnections = new ConcurrentHashMap<>();
private ConcurrentMap<String, RpcConnection> rpcConnections = new ConcurrentHashMap<>();
public RpcConnection newRpcConnection(Transaction t, Request<JsonObject> request) {
String participantPrivateId = t.getSession().getSessionId();
RpcConnection connection = new RpcConnection(t.getSession());
RpcConnection oldConnection = rpcConnections.putIfAbsent(participantPrivateId, connection);
if (oldConnection != null) {
log.warn("Concurrent initialization of rpcSession #{}", participantPrivateId);
connection = oldConnection;
}
return connection;
}
public RpcConnection addTransaction(Transaction t, Request<JsonObject> request) {
String participantPrivateId = t.getSession().getSessionId();
RpcConnection connection = rpcConnections.get(participantPrivateId);
if (connection == null) {
connection = new RpcConnection(t.getSession());
RpcConnection oldConnection = rpcConnections.putIfAbsent(participantPrivateId, connection);
if (oldConnection != null) {
log.warn("Concurrent initialization of rpcSession #{}", participantPrivateId);
connection = oldConnection;
}
}
connection.addTransaction(request.getId(), t);
return connection;
}
@ -82,20 +85,21 @@ public class RpcNotificationService {
}
}
public void closeRpcSession(String participantPrivateId) {
RpcConnection rpcSession = rpcConnections.get(participantPrivateId);
public RpcConnection closeRpcSession(String participantPrivateId) {
RpcConnection rpcSession = rpcConnections.remove(participantPrivateId);
if (rpcSession == null || rpcSession.getSession() == null) {
log.error("No session found for private id {}, unable to cleanup", participantPrivateId);
return;
return null;
}
Session s = rpcSession.getSession();
try {
s.close();
log.info("Closed session for participant with private id {}", participantPrivateId);
return rpcSession;
} catch (IOException e) {
log.error("Error closing session for participant with private id {}", participantPrivateId, e);
}
rpcConnections.remove(participantPrivateId);
return null;
}
private Transaction getAndRemoveTransaction(String participantPrivateId, Integer transactionId) {
@ -111,7 +115,11 @@ public class RpcNotificationService {
}
public void showRpcConnections() {
log.info("<PRIVATE_ID, RPC_CONNECTION>: {}", RpcNotificationService.rpcConnections.toString());
log.info("<PRIVATE_ID, RPC_CONNECTION>: {}", this.rpcConnections.toString());
}
public RpcConnection getRpcConnection(String participantPrivateId) {
return this.rpcConnections.get(participantPrivateId);
}
}