From dfac7e1af5980e4fb795c20b483e251edacee5de Mon Sep 17 00:00:00 2001 From: Mike Cao Date: Thu, 22 Sep 2022 10:36:23 -0700 Subject: [PATCH] Updated Kafka loading process. --- lib/kafka.js | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/lib/kafka.js b/lib/kafka.js index 4ad119ca..03833585 100644 --- a/lib/kafka.js +++ b/lib/kafka.js @@ -8,16 +8,15 @@ const log = debug('umami:kafka'); function getClient() { const { username, password } = new URL(process.env.KAFKA_URL); const brokers = process.env.KAFKA_BROKER.split(','); - const fs = require('fs'); const ssl = username && password ? { ssl: { checkServerIdentity: () => undefined, - ca: [fs.readFileSync('./lib/cert/ca_cert.pem', 'utf-8')], - key: fs.readFileSync('./lib/cert/client_key.pem', 'utf-8'), - cert: fs.readFileSync('./lib/cert/client_cert.pem', 'utf-8'), + ca: [process.env.CA_CERT], + key: process.env.CLIENT_KEY, + cert: process.env.CLIENT_CERT, }, sasl: { mechanism: 'plain', @@ -62,6 +61,8 @@ function getDateFormat(date) { } async function sendMessage(params, topic) { + await getKafka(); + await producer.send({ topic, messages: [ @@ -73,21 +74,25 @@ async function sendMessage(params, topic) { }); } +async function getKafka() { + if (!kafka) { + kafka = process.env.KAFKA_URL && process.env.KAFKA_BROKER && (global[KAFKA] || getClient()); + + if (kafka) { + producer = global[KAFKA_PRODUCER] || (await getProducer()); + } + } + + return kafka; +} + // Initialization let kafka; let producer; -(async () => { - kafka = process.env.KAFKA_URL && process.env.KAFKA_BROKER && (global[KAFKA] || getClient()); - - if (kafka) { - producer = global[KAFKA_PRODUCER] || (await getProducer()); - } -})(); - export default { client: kafka, - producer: producer, + producer, log, getDateFormat, sendMessage,