Francis/uc 37 secure kafka (#1532)

* add ssl encryption to kafka client

* fix missing columns in getPageview CH. fix Kafka SSL Pathing
pull/1536/head
Francis Cao 2022-09-21 11:31:52 -07:00 committed by GitHub
parent 3932583bc9
commit 55218d1ddd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 12 additions and 4 deletions

3
.gitignore vendored
View File

@ -38,3 +38,6 @@ yarn-error.log*
*.dev.yml *.dev.yml
# cert
/lib/cert

View File

@ -8,11 +8,17 @@ const log = debug('umami:kafka');
function getClient() { function getClient() {
const { username, password } = new URL(process.env.KAFKA_URL); const { username, password } = new URL(process.env.KAFKA_URL);
const brokers = process.env.KAFKA_BROKER.split(','); const brokers = process.env.KAFKA_BROKER.split(',');
const fs = require('fs');
const ssl = const ssl =
username && password username && password
? { ? {
ssl: true, ssl: {
checkServerIdentity: () => undefined,
ca: [fs.readFileSync('./lib/cert/ca_cert.pem', 'utf-8')],
key: fs.readFileSync('./lib/cert/client_key.pem', 'utf-8'),
cert: fs.readFileSync('./lib/cert/client_cert.pem', 'utf-8'),
},
sasl: { sasl: {
mechanism: 'plain', mechanism: 'plain',
username, username,
@ -63,7 +69,7 @@ async function sendMessage(params, topic) {
value: JSON.stringify(params), value: JSON.stringify(params),
}, },
], ],
acks: 0, acks: 1,
}); });
} }

View File

@ -27,9 +27,8 @@ async function relationalQuery(websites, start_at) {
async function clickhouseQuery(websites, start_at) { async function clickhouseQuery(websites, start_at) {
return clickhouse.rawQuery( return clickhouse.rawQuery(
`select `select
view_id,
website_id, website_id,
session_id, session_uuid,
created_at, created_at,
url url
from event from event