nodeRecovered event

pull/707/head
pabloFuente 2022-03-24 12:34:02 +01:00
parent 05b8da962e
commit 5cc6579bf0
17 changed files with 144 additions and 128 deletions

View File

@ -150,7 +150,7 @@ OPENVIDU_WEBHOOK=false
# List of events that will be sent by OpenVidu Webhook service
# Default value is all available events
OPENVIDU_WEBHOOK_EVENTS=[sessionCreated,sessionDestroyed,participantJoined,participantLeft,webrtcConnectionCreated,webrtcConnectionDestroyed,recordingStatusChanged,filterEventDispatched,mediaNodeStatusChanged,nodeCrashed]
OPENVIDU_WEBHOOK_EVENTS=[sessionCreated,sessionDestroyed,participantJoined,participantLeft,webrtcConnectionCreated,webrtcConnectionDestroyed,recordingStatusChanged,filterEventDispatched,mediaNodeStatusChanged,nodeCrashed,nodeRecovered]
# How often the garbage collector of non active sessions runs.
# This helps cleaning up sessions that have been initialized through

View File

@ -288,7 +288,7 @@ OPENVIDU_WEBHOOK=false
# List of events that will be sent by OpenVidu Webhook service
# Default value is all available events
OPENVIDU_WEBHOOK_EVENTS=[sessionCreated,sessionDestroyed,participantJoined,participantLeft,webrtcConnectionCreated,webrtcConnectionDestroyed,recordingStatusChanged,filterEventDispatched,mediaNodeStatusChanged,nodeCrashed]
OPENVIDU_WEBHOOK_EVENTS=[sessionCreated,sessionDestroyed,participantJoined,participantLeft,webrtcConnectionCreated,webrtcConnectionDestroyed,recordingStatusChanged,filterEventDispatched,mediaNodeStatusChanged,nodeCrashed,nodeRecovered]
# How often the garbage collector of non active sessions runs.
# This helps cleaning up sessions that have been initialized through

View File

@ -285,7 +285,7 @@ OPENVIDU_WEBHOOK=false
# List of events that will be sent by OpenVidu Webhook service
# Default value is all available events
OPENVIDU_WEBHOOK_EVENTS=[sessionCreated,sessionDestroyed,participantJoined,participantLeft,webrtcConnectionCreated,webrtcConnectionDestroyed,recordingStatusChanged,filterEventDispatched,mediaNodeStatusChanged,nodeCrashed]
OPENVIDU_WEBHOOK_EVENTS=[sessionCreated,sessionDestroyed,participantJoined,participantLeft,webrtcConnectionCreated,webrtcConnectionDestroyed,recordingStatusChanged,filterEventDispatched,mediaNodeStatusChanged,nodeCrashed,nodeRecovered]
# How often the garbage collector of non active sessions runs.
# This helps cleaning up sessions that have been initialized through

View File

@ -23,14 +23,14 @@ public class CDREvent {
protected String sessionId;
protected String uniqueSessionId;
protected Long timeStamp;
protected Long timestamp;
protected CDREventName eventName;
public CDREvent(CDREventName eventName, String sessionId, String uniqueSessionId, Long timeStamp) {
public CDREvent(CDREventName eventName, String sessionId, String uniqueSessionId, Long timestamp) {
this.eventName = eventName;
this.sessionId = sessionId;
this.uniqueSessionId = uniqueSessionId;
this.timeStamp = timeStamp;
this.timestamp = timestamp;
}
public String getSessionId() {
@ -42,7 +42,7 @@ public class CDREvent {
}
public Long getTimestamp() {
return this.timeStamp;
return this.timestamp;
}
public CDREventName getEventName() {
@ -57,7 +57,7 @@ public class CDREvent {
if (uniqueSessionId != null) {
json.addProperty("uniqueSessionId", this.uniqueSessionId);
}
json.addProperty("timestamp", this.timeStamp);
json.addProperty("timestamp", this.timestamp);
return json;
}

View File

@ -35,7 +35,7 @@ public class CDREventEnd extends CDREvent {
EndReason reason, Long timestamp) {
super(eventName, sessionId, uniqueSessionId, timestamp);
this.startTime = startTime;
this.duration = (int) ((this.timeStamp - this.startTime) / 1000);
this.duration = (int) ((this.timestamp - this.startTime) / 1000);
this.reason = reason;
}

View File

@ -20,7 +20,7 @@ package io.openvidu.server.cdr;
public enum CDREventName {
sessionCreated, sessionDestroyed, participantJoined, participantLeft, webrtcConnectionCreated,
webrtcConnectionDestroyed, recordingStatusChanged, filterEventDispatched,
signalSent, mediaNodeStatusChanged, autoscaling, nodeCrashed
webrtcConnectionDestroyed, recordingStatusChanged, filterEventDispatched, signalSent, mediaNodeStatusChanged,
autoscaling, nodeCrashed, nodeRecovered
}

View File

@ -1,60 +0,0 @@
package io.openvidu.server.cdr;
import java.util.List;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
public class CDREventNodeCrashed extends CDREvent {
public enum NodeRole {
masternode, medianode
}
private String id;
private String environmentId;
private String ip;
private String uri;
private NodeRole nodeRole;
private List<String> sessionIds;
private List<String> recordingIds;
private String clusterId;
public CDREventNodeCrashed(Long timeStamp, String id, String environmentId, String ip, String uri,
NodeRole nodeRole, List<String> sessionIds, List<String> recordingIds, String clusterId) {
super(CDREventName.nodeCrashed, null, null, timeStamp);
this.id = id;
this.environmentId = environmentId;
this.ip = ip;
this.uri = uri;
this.nodeRole = nodeRole;
this.sessionIds = sessionIds;
this.recordingIds = recordingIds;
this.clusterId = clusterId;
}
@Override
public JsonObject toJson() {
JsonObject json = super.toJson();
json.addProperty("id", this.id);
if (this.environmentId != null) {
json.addProperty("environmentId", this.environmentId);
}
json.addProperty("ip", this.ip);
json.addProperty("uri", this.uri);
json.addProperty("nodeRole", this.nodeRole.name());
JsonArray sIds = new JsonArray();
this.sessionIds.forEach(sId -> sIds.add(sId));
json.add("sessionIds", sIds);
JsonArray rIds = new JsonArray();
this.recordingIds.forEach(rId -> rIds.add(rId));
json.add("recordingIds", rIds);
json.addProperty("clusterId", this.clusterId);
return json;
}
}

View File

@ -25,7 +25,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import io.openvidu.java.client.IceServerProperties;
import org.kurento.client.GenericMediaEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,8 +37,8 @@ import com.google.gson.JsonObject;
import io.openvidu.client.OpenViduException;
import io.openvidu.client.OpenViduException.Code;
import io.openvidu.client.internal.ProtocolElements;
import io.openvidu.java.client.IceServerProperties;
import io.openvidu.java.client.OpenViduRole;
import io.openvidu.server.cdr.CDREventNodeCrashed;
import io.openvidu.server.cdr.CallDetailRecord;
import io.openvidu.server.config.OpenviduBuildInfo;
import io.openvidu.server.config.OpenviduConfig;
@ -167,16 +166,16 @@ public class SessionEventsHandler {
this.openviduConfig.getMediaServer().name());
switch (this.openviduConfig.getMediaServer()) {
case mediasoup:
// mediasoup supports simulcast
result.addProperty(ProtocolElements.PARTICIPANTJOINED_SIMULCAST_PARAM,
this.openviduConfig.isWebrtcSimulcast());
break;
case kurento:
default:
// Kurento does not support simulcast
result.addProperty(ProtocolElements.PARTICIPANTJOINED_SIMULCAST_PARAM, false);
break;
case mediasoup:
// mediasoup supports simulcast
result.addProperty(ProtocolElements.PARTICIPANTJOINED_SIMULCAST_PARAM,
this.openviduConfig.isWebrtcSimulcast());
break;
case kurento:
default:
// Kurento does not support simulcast
result.addProperty(ProtocolElements.PARTICIPANTJOINED_SIMULCAST_PARAM, false);
break;
}
if (participant.getToken() != null) {
@ -188,7 +187,7 @@ public class SessionEventsHandler {
result.addProperty(ProtocolElements.PARTICIPANTJOINED_COTURNIP_PARAM, openviduConfig.getCoturnIp());
result.addProperty(ProtocolElements.PARTICIPANTJOINED_COTURNPORT_PARAM, openviduConfig.getCoturnPort());
List<IceServerProperties> customIceServers = participant.getToken().getCustomIceServers();
if (customIceServers!= null && !customIceServers.isEmpty()) {
if (customIceServers != null && !customIceServers.isEmpty()) {
result.add(ProtocolElements.PARTICIPANTJOINED_CUSTOM_ICE_SERVERS,
participant.getToken().getCustomIceServersAsJson());
}
@ -669,11 +668,11 @@ public class SessionEventsHandler {
* This handler must be called before cleaning any sessions or recordings hosted
* by the crashed Media Node
*/
public void onMediaNodeCrashed(Kms kms, String environmentId, long timeOfKurentoDisconnection,
List<String> sessionIds, List<String> recordingIds) {
public void onMediaNodeCrashed(Kms kms, String environmentId, long timeOfDisconnection, List<String> sessionIds,
List<String> recordingIds) {
}
public void onMasterNodeCrashed(CDREventNodeCrashed event) {
public void onMediaNodeRecovered(Kms kms, String environmentId, long timeOfConnection) {
}
public void storeRecordingToSendClientEvent(Recording recording) {

View File

@ -50,11 +50,13 @@ public class FixedOneKmsManager extends KmsManager {
JsonRpcClientNettyWebSocket client = new JsonRpcClientNettyWebSocket(firstProps.getUri(), listener);
client.setTryReconnectingMaxTime(0);
client.setTryReconnectingForever(false);
kClient = KurentoClient.createFromJsonRpcClient(client);
client.setConnectionTimeout(MAX_CONNECT_TIME_MILLIS);
client.setRequestTimeout(MAX_REQUEST_TIMEOUT);
kClient = KurentoClient.createFromJsonRpcClientHonoringClientTimeouts(client);
kms.setKurentoClient(kClient);
// TODO: This should be done in KurentoClient connected event
kms.setKurentoClientConnected(true);
kms.setKurentoClientConnected(true, false);
this.addKms(kms);
@ -90,7 +92,11 @@ public class FixedOneKmsManager extends KmsManager {
}
@Override
protected String removeMediaNodeUponCrash(String mediaNodeId, boolean followedByReconnection, String message) {
protected void removeMediaNodeUponCrash(String mediaNodeId) {
}
@Override
protected String getEnvironmentId(String mediaNodeId) {
return null;
}

View File

@ -66,7 +66,7 @@ public class Kms {
private LoadManager loadManager;
private KmsManager kmsManager;
private boolean isFirstReconnectionAttempt = true;
private boolean hasTriggeredNodeCrashedEvent = false;
private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false);
private AtomicLong timeOfKurentoClientConnection = new AtomicLong(0);
private AtomicLong timeOfKurentoClientDisconnection = new AtomicLong(0);
@ -128,30 +128,33 @@ public class Kms {
return true; // loadManager.allowMoreElements(this);
}
public boolean isFirstReconnectionAttempt() {
return this.isFirstReconnectionAttempt;
public boolean hasTriggeredNodeCrashedEvent() {
return this.hasTriggeredNodeCrashedEvent;
}
public void setFirstReconnectionAttempt(boolean isFirst) {
this.isFirstReconnectionAttempt = isFirst;
public void setHasTriggeredNodeCrashedEvent(boolean hasTriggeredNodeCrashedEvent) {
this.hasTriggeredNodeCrashedEvent = hasTriggeredNodeCrashedEvent;
}
public boolean isKurentoClientConnected() {
return this.isKurentoClientConnected.get();
}
public void setKurentoClientConnected(boolean isConnected) {
public void setKurentoClientConnected(boolean isConnected, boolean reconnection) {
final long timestamp = System.currentTimeMillis();
this.isKurentoClientConnected.set(isConnected);
if (isConnected) {
this.setTimeOfKurentoClientConnection(timestamp);
kmsManager.getMediaNodeManager().mediaNodeUsageRegistration(this, timestamp, kmsManager.getKmss());
this.setTimeOfKurentoClientDisconnection(0);
this.setHasTriggeredNodeCrashedEvent(false);
if (!reconnection) {
kmsManager.getMediaNodeManager().mediaNodeUsageRegistration(this, timestamp, kmsManager.getKmss());
}
if (this.mediaServer == null) {
this.fetchMediaServerType();
}
} else {
this.setTimeOfKurentoClientDisconnection(timestamp);
kmsManager.getMediaNodeManager().mediaNodeUsageDeregistration(this, timestamp);
}
}

View File

@ -109,10 +109,12 @@ public abstract class KmsManager {
protected SessionManager sessionManager;
protected LoadManager loadManager;
protected final int MAX_REQUEST_TIMEOUT = 10000;
protected final int MAX_CONNECT_TIME_MILLIS = 3000;
// Media Node reconnection cycle: 6 attempts, 2 times per second (3s total)
final int MAX_RECONNECT_TIME_MILLIS = 3000;
final int INTERVAL_WAIT_MS = 500;
final int RECONNECTION_LOOPS = MAX_RECONNECT_TIME_MILLIS / INTERVAL_WAIT_MS;
final int RECONNECTION_LOOPS = MAX_CONNECT_TIME_MILLIS / INTERVAL_WAIT_MS;
public KmsManager(SessionManager sessionManager, LoadManager loadManager) {
this.sessionManager = sessionManager;
@ -186,7 +188,7 @@ public abstract class KmsManager {
kms.getKurentoClient().toString());
// TODO: This should be done here, not after KurentoClient#create method
// returns, but it seems that this event is never triggered
// kms.setKurentoClientConnected(true);
// kms.setKurentoClientConnected(true, false);
}
@Override
@ -225,7 +227,7 @@ public abstract class KmsManager {
kms.getKurentoClient().toString());
}
kms.setKurentoClientConnected(false);
kms.setKurentoClientConnected(false, false);
disconnectionHandler(kms, 0);
}
@ -235,22 +237,23 @@ public abstract class KmsManager {
final AtomicInteger iteration = new AtomicInteger(RECONNECTION_LOOPS);
final long initTime = System.currentTimeMillis();
final int accumulatedTimeout = reconnectionSecondsConsumed + (MAX_RECONNECT_TIME_MILLIS / 1000);
final int accumulatedTimeout = reconnectionSecondsConsumed + (MAX_CONNECT_TIME_MILLIS / 1000);
final UpdatableTimerTask kurentoClientReconnectTimer = new UpdatableTimerTask(() -> {
if (iteration.decrementAndGet() < 0) {
kms.getKurentoClientReconnectTimer().cancelTimer();
boolean mustRetryReconnection = accumulatedTimeout < openviduConfig.getReconnectionTimeout();
boolean mustRemoveMediaNode = true;
if (kms.isFirstReconnectionAttempt()) {
if (!kms.hasTriggeredNodeCrashedEvent()) {
log.error(
"OpenVidu Server [{}] could not reconnect to Media Node {} with IP {} in {} seconds. Media Node crashed",
kms.getKurentoClient().toString(), kms.getId(), kms.getIp(),
(INTERVAL_WAIT_MS * RECONNECTION_LOOPS / 1000));
kms.setFirstReconnectionAttempt(false);
kms.setHasTriggeredNodeCrashedEvent(true);
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
final List<String> affectedSessionIds = kms.getKurentoSessions().stream()
@ -258,14 +261,17 @@ public abstract class KmsManager {
final List<String> affectedRecordingIds = kms.getActiveRecordings().stream()
.map(entry -> entry.getKey()).collect(Collectors.toUnmodifiableList());
// 1. Remove Media Node from cluster
String environmentId = removeMediaNodeUponCrash(kms.getId(), mustRetryReconnection,
"Removing Media Node " + kms.getId() + " after node crash");
// 2. Send nodeCrashed webhook event
// 1. Send nodeCrashed webhook event
String environmentId = getEnvironmentId(kms.getId());
sessionEventsHandler.onMediaNodeCrashed(kms, environmentId, timeOfKurentoDisconnection,
affectedSessionIds, affectedRecordingIds);
// 2. Remove Media Node from cluster
if (!mustRetryReconnection) {
mustRemoveMediaNode = false;
removeMediaNodeUponCrash(kms.getId());
}
// 3. Close all sessions and recordings with reason "nodeCrashed"
log.warn("Closing {} sessions hosted by Media Node {} with IP {}: {}",
kms.getKurentoSessions().size(), kms.getId(), kms.getIp(),
@ -296,15 +302,17 @@ public abstract class KmsManager {
"Reconnection process to Media Node {} with IP {} aborted. {} seconds have been consumed and the upper limit is {} seconds",
kms.getId(), kms.getIp(), accumulatedTimeout,
openviduConfig.getReconnectionTimeout());
removeMediaNodeUponCrash(kms.getId(), mustRetryReconnection, "Removing Media Node "
+ kms.getId() + " with IP " + kms.getIp() + " after reconnection abort");
if (mustRemoveMediaNode) {
removeMediaNodeUponCrash(kms.getId());
}
}
} else {
if ((System.currentTimeMillis() - initTime) > MAX_RECONNECT_TIME_MILLIS) {
if ((System.currentTimeMillis() - initTime) > MAX_CONNECT_TIME_MILLIS) {
// KurentoClient connection timeout exceeds the limit. This prevents a
// single reconnection attempt to exceed the total timeout limit
// single reconnection attempt to exceed the total timeout limit if the
// connection gets stuck
iteration.set(0);
return;
}
@ -321,13 +329,13 @@ public abstract class KmsManager {
log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected",
kms.getUri(), kms.getKurentoClient().toString());
kms.setFirstReconnectionAttempt(true);
kms.getKurentoClientReconnectTimer().cancelTimer();
kms.setKurentoClientConnected(true);
final boolean mustTriggerNodeRecoveredEvent = kms.hasTriggeredNodeCrashedEvent();
final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection();
kms.setKurentoClientConnected(true, true);
if (kms.getKurentoSessions().isEmpty()) {
log.info("There were no sessions in the KMS with uri {}. Nothing must be done",
kms.getUri());
@ -345,7 +353,13 @@ public abstract class KmsManager {
}
}
kms.setTimeOfKurentoClientDisconnection(0);
if (mustTriggerNodeRecoveredEvent) {
// Send nodeRecovered webhook event
String environmentId = getEnvironmentId(kms.getId());
long timeOfConnection = kms.getTimeOfKurentoClientConnection();
sessionEventsHandler.onMediaNodeRecovered(kms, environmentId, timeOfConnection);
}
}
}, () -> Long.valueOf(INTERVAL_WAIT_MS)); // Try 2 times per second
@ -377,8 +391,9 @@ public abstract class KmsManager {
public abstract void decrementActiveRecordings(RecordingProperties recordingProperties, String recordingId,
Session session);
protected abstract String removeMediaNodeUponCrash(String mediaNodeId, boolean followedByReconnection,
String message);
protected abstract void removeMediaNodeUponCrash(String mediaNodeId);
protected abstract String getEnvironmentId(String mediaNodeId);
@PostConstruct
protected abstract void postConstructInitKurentoClients();

View File

@ -8,7 +8,7 @@ public interface MediaNodeManager {
public void mediaNodeUsageRegistration(Kms kms, long timeOfConnection, Collection<Kms> existingKmss);
public void mediaNodeUsageDeregistration(Kms kms, long timeOfDisconnection);
public void mediaNodeUsageDeregistration(String mediaNodeId, long timeOfDisconnection);
public void dropIdleMediaNode(String mediaNodeId);

View File

@ -11,7 +11,7 @@ public class MediaNodeManagerDummy implements MediaNodeManager {
}
@Override
public void mediaNodeUsageDeregistration(Kms kms, long timeOfDisconnection) {
public void mediaNodeUsageDeregistration(String mediaNodeId, long timeOfDisconnection) {
}
@Override

View File

@ -23,7 +23,7 @@ OPENVIDU_CDR_PATH=/opt/openvidu/cdr
OPENVIDU_WEBHOOK=false
OPENVIDU_WEBHOOK_ENDPOINT=
OPENVIDU_WEBHOOK_HEADERS=[]
OPENVIDU_WEBHOOK_EVENTS=["sessionCreated","sessionDestroyed","participantJoined","participantLeft","webrtcConnectionCreated","webrtcConnectionDestroyed","recordingStatusChanged","filterEventDispatched","signalSent","mediaNodeStatusChanged","autoscaling","nodeCrashed"]
OPENVIDU_WEBHOOK_EVENTS=["sessionCreated","sessionDestroyed","participantJoined","participantLeft","webrtcConnectionCreated","webrtcConnectionDestroyed","recordingStatusChanged","filterEventDispatched","signalSent","mediaNodeStatusChanged","autoscaling","nodeCrashed","nodeRecovered"]
OPENVIDU_RECORDING=false
OPENVIDU_RECORDING_DEBUG=false

View File

@ -59,7 +59,7 @@ public class IntegrationTestConfiguration {
when(kClient.getServerManager()).thenReturn(serverManagerMock);
kms.setKurentoClient(kClient);
kms.setKurentoClientConnected(true);
kms.setKurentoClientConnected(true, false);
spy.addKms(kms);
successfullyConnectedKmss.add(kms);

View File

@ -21,7 +21,7 @@ OPENVIDU_CDR_PATH=/opt/openvidu/cdr
OPENVIDU_WEBHOOK=false
OPENVIDU_WEBHOOK_ENDPOINT=
OPENVIDU_WEBHOOK_HEADERS=[]
OPENVIDU_WEBHOOK_EVENTS=["sessionCreated","sessionDestroyed","participantJoined","participantLeft","webrtcConnectionCreated","webrtcConnectionDestroyed","recordingStatusChanged","filterEventDispatched","mediaNodeStatusChanged","nodeCrashed"]
OPENVIDU_WEBHOOK_EVENTS=["sessionCreated","sessionDestroyed","participantJoined","participantLeft","webrtcConnectionCreated","webrtcConnectionDestroyed","recordingStatusChanged","filterEventDispatched","mediaNodeStatusChanged","nodeCrashed","nodeRecovered"]
OPENVIDU_RECORDING=false
OPENVIDU_RECORDING_VERSION=2.19.0

View File

@ -17,6 +17,7 @@
package io.openvidu.test.browsers.utils.webhook;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -48,10 +49,12 @@ public class CustomWebhook {
public static CountDownLatch initLatch;
public static int accumulatedNumberOfEvents = 0;
static ConcurrentMap<String, BlockingQueue<JsonObject>> events = new ConcurrentHashMap<>();
static BlockingQueue<JsonObject> eventsInOrder = new LinkedBlockingDeque<>();
public static void main(String[] args, CountDownLatch initLatch) {
CustomWebhook.initLatch = initLatch;
accumulatedNumberOfEvents = 0;
CustomWebhook.eventsInOrder.clear();
CustomWebhook.events.clear();
CustomWebhook.context = new SpringApplicationBuilder(CustomWebhook.class)
.properties("spring.config.location:classpath:aplication-pro-webhook.properties").build().run(args);
@ -59,22 +62,23 @@ public class CustomWebhook {
public static void shutDown() {
accumulatedNumberOfEvents = 0;
CustomWebhook.eventsInOrder.clear();
CustomWebhook.events.clear();
CustomWebhook.context.close();
}
public static void clean() {
accumulatedNumberOfEvents = 0;
CustomWebhook.eventsInOrder.clear();
CustomWebhook.events.clear();
}
public synchronized static JsonObject waitForEvent(String eventName, int maxSecondsWait)
throws TimeoutException, InterruptedException {
public synchronized static JsonObject waitForEvent(String eventName, int maxSecondsWait) throws Exception {
return CustomWebhook.waitForEvent(eventName, maxSecondsWait, TimeUnit.SECONDS);
}
public synchronized static JsonObject waitForEvent(String eventName, int maxWait, TimeUnit timeUnit)
throws TimeoutException, InterruptedException {
throws Exception {
if (events.get(eventName) == null) {
events.put(eventName, new LinkedBlockingDeque<>());
}
@ -86,6 +90,54 @@ public class CustomWebhook {
}
}
public synchronized static JsonObject waitForNextEventToBeOfType(String eventName, int maxSecondsWait)
throws Exception {
JsonObject event = eventsInOrder.poll(maxSecondsWait, TimeUnit.SECONDS);
if (event == null) {
throw new TimeoutException("Timeout waiting for Webhook " + eventName);
} else {
String ev = event.get("event").getAsString();
if (!eventName.equals(ev)) {
throw new Exception("Wrong event type receieved. Excpeceted " + eventName + " but got " + ev);
} else {
// Remove the very same event from the map of events
if (!CustomWebhook.events.get(eventName).contains(event)) {
throw new Exception("Lack of event " + eventName);
} else {
JsonObject sameEvent = null;
Iterator<JsonObject> it = CustomWebhook.events.get(eventName).iterator();
boolean found = false;
while (!found && it.hasNext()) {
JsonObject json = it.next();
if (json.equals(event)) {
found = true;
sameEvent = json;
it.remove();
}
}
if (!event.equals(sameEvent)) {
throw new Exception(
"Events were different! " + event.toString() + " | " + sameEvent.toString());
}
return event;
}
}
}
}
public synchronized static JsonObject waitToNotReceiveEvent(String eventName, int secondsForNotReceiving)
throws Exception {
if (events.get(eventName) == null) {
events.put(eventName, new LinkedBlockingDeque<>());
}
JsonObject event = CustomWebhook.events.get(eventName).poll(secondsForNotReceiving, TimeUnit.SECONDS);
if (event != null) {
throw new Exception("WebHook received event " + eventName + " and it wasn't expected: " + event.toString());
} else {
return event;
}
}
@RestController
public class WebhookController {
@RequestMapping("/webhook")
@ -93,6 +145,7 @@ public class CustomWebhook {
JsonObject event = JsonParser.parseString(eventString).getAsJsonObject();
System.out.println("Webhook event: " + event.toString());
accumulatedNumberOfEvents++;
eventsInOrder.add(event);
final String eventName = event.get("event").getAsString();
final BlockingQueue<JsonObject> queue = new LinkedBlockingDeque<>();
if (!CustomWebhook.events.computeIfAbsent(eventName, e -> {