diff --git a/db/clickhouse/migrations/01_init/migration.sql b/db/clickhouse/migrations/01_init/migration.sql new file mode 100644 index 00000000..88a9b683 --- /dev/null +++ b/db/clickhouse/migrations/01_init/migration.sql @@ -0,0 +1,126 @@ +-- Create Pageview +CREATE TABLE pageview +( + website_id UInt32, + session_uuid UUID, + created_at DateTime, + url String, + referrer String +) + engine = MergeTree PRIMARY KEY (session_uuid, created_at) + ORDER BY (session_uuid, created_at) + SETTINGS index_granularity = 8192; + +CREATE TABLE pageview_queue ( + website_id UInt32, + session_uuid UUID, + created_at DateTime, + url String, + referrer String +) + +ENGINE = Kafka +SETTINGS kafka_broker_list = '', -- input broker list + kafka_topic_list = 'pageview', + kafka_group_name = 'pageview_consumer_group', + kafka_format = 'JSONEachRow', + kafka_max_block_size = 1048576, + kafka_skip_broken_messages = 1; + +CREATE MATERIALIZED VIEW pageview_queue_mv TO pageview AS +SELECT website_id, + session_uuid, + created_at, + url, + referrer +FROM umami.pageview_queue; + +-- Create Session +CREATE TABLE session +( + session_uuid UUID, + website_id UInt32, + created_at DateTime, + hostname LowCardinality(String), + browser LowCardinality(String), + os LowCardinality(String), + device LowCardinality(String), + screen LowCardinality(String), + language LowCardinality(String), + country LowCardinality(String) +) + engine = MergeTree PRIMARY KEY (session_uuid, created_at) + ORDER BY (session_uuid, created_at) + SETTINGS index_granularity = 8192; + +CREATE TABLE session_queue ( + session_uuid UUID, + website_id UInt32, + created_at DateTime, + hostname LowCardinality(String), + browser LowCardinality(String), + os LowCardinality(String), + device LowCardinality(String), + screen LowCardinality(String), + language LowCardinality(String), + country LowCardinality(String) +) +ENGINE = Kafka +SETTINGS kafka_broker_list = '', -- input broker list + kafka_topic_list = 'session', + kafka_group_name = 'session_consumer_group', + kafka_format = 'JSONEachRow', + kafka_max_block_size = 1048576, + kafka_skip_broken_messages = 1; + +CREATE MATERIALIZED VIEW session_queue_mv TO session AS +SELECT session_uuid, + website_id, + created_at, + hostname, + browser, + os, + device, + screen, + language, + country +FROM session_queue; + +-- Create event +CREATE TABLE event +( + event_uuid UUID, + website_id UInt32, + session_uuid UUID, + created_at DateTime, + url String, + event_name String +) + engine = MergeTree PRIMARY KEY (event_uuid, created_at) + ORDER BY (event_uuid, created_at) + SETTINGS index_granularity = 8192; + +CREATE TABLE event_queue ( + event_uuid UUID, + website_id UInt32, + session_uuid UUID, + created_at DateTime, + url String, + event_name String +) +ENGINE = Kafka +SETTINGS kafka_broker_list = '', -- input broker list + kafka_topic_list = 'event', + kafka_group_name = 'event_consumer_group', + kafka_format = 'JSONEachRow', + kafka_max_block_size = 1048576, + kafka_skip_broken_messages = 1; + +CREATE MATERIALIZED VIEW event_queue_mv TO event AS +SELECT event_uuid, + website_id, + session_uuid, + created_at, + url, + event_name +FROM event_queue; \ No newline at end of file diff --git a/pages/api/collect.js b/pages/api/collect.js index 92451016..cb6219a8 100644 --- a/pages/api/collect.js +++ b/pages/api/collect.js @@ -7,6 +7,7 @@ import { getJsonBody, getIpAddress } from 'lib/request'; import { ok, send, badRequest, forbidden } from 'lib/response'; import { createToken } from 'lib/crypto'; import { removeTrailingSlash } from 'lib/url'; +import { uuid } from 'lib/crypto'; export default async (req, res) => { await useCors(req, res); @@ -71,10 +72,19 @@ export default async (req, res) => { url = removeTrailingSlash(url); } + const event_uuid = uuid(website_id, url, session_uuid, event_name); + if (type === 'pageview') { await savePageView(website_id, { session_id, session_uuid, url, referrer }); } else if (type === 'event') { - await saveEvent(website_id, { session_id, session_uuid, url, event_name, event_data }); + await saveEvent(website_id, { + event_uuid, + session_id, + session_uuid, + url, + event_name, + event_data, + }); } else { return badRequest(res); } diff --git a/queries/analytics/event/saveEvent.js b/queries/analytics/event/saveEvent.js index 3633eb18..d6ced9ca 100644 --- a/queries/analytics/event/saveEvent.js +++ b/queries/analytics/event/saveEvent.js @@ -39,8 +39,14 @@ async function relationalQuery(website_id, { session_id, url, event_name, event_ ); } -async function clickhouseQuery(website_id, { session_uuid, url, event_name }) { - const params = [website_id, session_uuid, url?.substr(0, URL_LENGTH), event_name?.substr(0, 50)]; +async function clickhouseQuery(website_id, { event_uuid, session_uuid, url, event_name }) { + const params = [ + website_id, + event_uuid, + session_uuid, + url?.substr(0, URL_LENGTH), + event_name?.substr(0, 50), + ]; return rawQueryClickhouse( ` @@ -50,13 +56,13 @@ async function clickhouseQuery(website_id, { session_uuid, url, event_name }) { ); } -async function kafkaQuery(website_id, { session_uuid, url, event_type, event_value }) { +async function kafkaQuery(website_id, { event_uuid, session_uuid, url, event_name }) { const params = { + event_uuid: event_uuid, website_id: website_id, session_uuid: session_uuid, url: url?.substr(0, URL_LENGTH), - event_type: event_type?.substr(0, 50), - event_value: event_value?.substr(0, 50), + event_name: event_name?.substr(0, 50), }; await kafkaProducer(params, 'event');