SET allow_experimental_object_type = 1; -- Create Event CREATE TABLE event ( website_id UUID, session_id UUID, event_id UUID, rev_id UInt32, --session hostname LowCardinality(String), browser LowCardinality(String), os LowCardinality(String), device LowCardinality(String), screen LowCardinality(String), language LowCardinality(String), country LowCardinality(String), subdivision1 LowCardinality(String), subdivision2 LowCardinality(String), city String, --pageview url String, referrer String, page_title String, --event event_type UInt32, event_name String, created_at DateTime('UTC') ) engine = MergeTree ORDER BY (website_id, session_id, created_at) SETTINGS index_granularity = 8192; CREATE TABLE event_queue ( website_id UUID, session_id UUID, event_id UUID, rev_id UInt32, --session hostname LowCardinality(String), browser LowCardinality(String), os LowCardinality(String), device LowCardinality(String), screen LowCardinality(String), language LowCardinality(String), country LowCardinality(String), subdivision1 LowCardinality(String), subdivision2 LowCardinality(String), city String, --pageview url String, referrer String, page_title String, --event event_type UInt32, event_name String, created_at DateTime('UTC') ) ENGINE = Kafka SETTINGS kafka_broker_list = 'domain:9092,domain:9093,domain:9094', -- input broker list kafka_topic_list = 'event', kafka_group_name = 'event_consumer_group', kafka_format = 'JSONEachRow', kafka_max_block_size = 1048576, kafka_skip_broken_messages = 1; CREATE MATERIALIZED VIEW event_queue_mv TO event AS SELECT website_id, session_id, event_id, rev_id, hostname, browser, os, device, screen, language, country, subdivision1, subdivision2, city, url, referrer, page_title, event_type, event_name, created_at FROM event_queue; CREATE TABLE event_data ( website_id UUID, session_id UUID, event_id UUID, rev_id UInt32, event_name String, event_key String, event_string_value Nullable(String), event_numeric_value Nullable(UInt32), event_date_value Nullable(DateTime('UTC')), event_data_type UInt32, created_at DateTime('UTC') ) engine = MergeTree ORDER BY (website_id, session_id, created_at) SETTINGS index_granularity = 8192; CREATE TABLE event_data_queue ( website_id UUID, session_id UUID, event_id UUID, rev_id UInt32, event_name String, event_key String, event_string_value Nullable(String), event_numeric_value Nullable(UInt64), event_date_value Nullable(DateTime('UTC')), event_data_type UInt32, created_at DateTime('UTC') ) ENGINE = Kafka SETTINGS kafka_broker_list = 'domain:9092,domain:9093,domain:9094', -- input broker list kafka_topic_list = 'event_data', kafka_group_name = 'event_data_consumer_group', kafka_format = 'JSONEachRow', kafka_max_block_size = 1048576, kafka_skip_broken_messages = 1; CREATE MATERIALIZED VIEW event_data_queue_mv TO event_data AS SELECT website_id, session_id, event_id, rev_id, event_name, event_key, event_string_value, event_numeric_value, event_date_value, event_data_type, created_at FROM event_data_queue;