Design a Real-Time Click Aggregation Pipeline
TL;DR
A click aggregation pipeline ingests billions of click events per day and produces real-time counts (clicks per ad per minute, unique visitors per page per hour) that power dashboards and billing. The naive approach -- write every click to a database and query with GROUP BY -- dies at 100K events/sec. The real system uses stream processing (Flink) to aggregate in-flight, ClickHouse or Druid for time-series OLAP queries, and HyperLogLog for approximate distinct counting. Twitter migrated from Lambda architecture (batch + streaming) to Kappa (streaming only) because maintaining two codepaths was an operational nightmare. Uber uses triple deduplication (client-side, Kafka, and aggregation layer) because at their scale, even 0.1% duplicate events mean millions of dollars in incorrect billing.
The System
An ad click aggregation pipeline counts every click on an advertisement, groups them by ad ID, campaign, time window, device, and geography, and produces aggregated metrics that advertisers use for billing and performance analysis. When you see "Your ad received 847,293 clicks today" in Google Ads, that number came from a pipeline like this.
Google processes hundreds of billions of ad events per day across Search, YouTube, and Display Network. Meta's ad analytics pipeline handles similar volumes for Facebook and Instagram ads. ClickHouse was originally built at Yandex specifically for their ad analytics pipeline (Yandex.Metrica), handling 20+ billion events per day and responding to queries in under 1 second. The financial stakes are enormous: a 0.1% over-count on Google's ad revenue (~$280 billion annually) means $280 million in incorrect billing. Accuracy is not optional -- it is a legal requirement.
Requirements
Functional
- Ingest click events: Accept click events containing ad_id, user_id, timestamp, device_type, geo, referrer_url
- Real-time aggregation: Produce counts per ad per minute, with rollups to hourly and daily
- Approximate distinct counts: Count unique users who clicked an ad (not just total clicks)
- Querying: Support queries like "clicks on ad X in the last hour, broken down by country and device"
- Reconciliation: A batch pipeline runs daily to produce "ground truth" numbers that validate the real-time counts
- Late event handling: Events arriving up to 5 minutes late are incorporated into the correct time window
Non-Functional
- Throughput: 1 billion events/day = ~11,600 events/sec sustained, 50K events/sec peak
- Aggregation latency: Real-time counts available within 10 seconds of the event
- Query latency: Dashboard queries return in under 500ms for 95th percentile
- Accuracy: Total click counts must be exact (no over/under-counting). Unique counts can be approximate (5% error acceptable)
- Durability: No event loss. Every click must be counted exactly once in the final aggregated output
- Data retention: Raw events: 7 days. Minute-level aggregates: 30 days. Hourly aggregates: 1 year. Daily aggregates: forever
Back-of-Envelope Math
Event volume:
1B events/day = 11,574 events/sec average
Peak (5x, during US business hours): ~58K events/sec
Event size:
ad_id (8B) + user_id (8B) + timestamp (8B) + device (1B) +
geo (4B) + referrer (100B) + metadata (71B) = ~200 bytes
1B events * 200 bytes = 200 GB/day raw event data
Aggregation dimensions:
Unique ads: 10M
Time windows: minute (1440/day), hour (24/day), day (1/day)
Devices: 4 types (mobile, desktop, tablet, other)
Countries: 200
Per-minute aggregates: 10M ads * 1440 minutes * sparse = ~50M rows/day
(most ad-minute combinations have zero clicks, so actual rows are sparse)
Storage:
Raw events (7 days): 200 GB * 7 = 1.4 TB
Minute aggregates (30 days): 50M rows * 50 bytes * 30 = 75 GB
Hourly aggregates (1 year): 50M/60 * 50 bytes * 365 = 15 GB
Daily aggregates (forever): trivial
HyperLogLog for unique counts:
12 KB per HyperLogLog counter (standard precision)
10M ads * 12 KB = 120 GB
Too much for a single counter per ad. Use time-windowed counters:
10M ads * 24 hours * 12 KB = 2.88 TB. Still too much.
Use 4 KB counters (lower precision, ~2% error): 10M * 24 * 4KB = 960 GB.
Or: only maintain HLL for active ads in each window.
Active ads per hour: ~500K. 500K * 4 KB = 2 GB per hour. Reasonable.
The Naive Design
┌──────────┐ ┌──────────────┐ ┌──────────────┐
│ Client │────>│ Click API │────>│ PostgreSQL │
│ (click) │ │ │ │ │
└──────────┘ └──────────────┘ └──────────────┘
INSERT INTO clicks (ad_id, user_id, timestamp, device, geo)
VALUES (?, ?, ?, ?, ?);
-- Dashboard query:
SELECT ad_id, COUNT(*), COUNT(DISTINCT user_id)
FROM clicks
WHERE timestamp BETWEEN '2025-01-01 12:00' AND '2025-01-01 13:00'
GROUP BY ad_id;
Rows go in, GROUP BY comes out. For 1,000 events/day, this works.
Where Does This Break First?
The write path. At 11,600 inserts/sec (50K peak), PostgreSQL's row-based storage engine spends most of its time on transaction overhead, WAL writes, and index maintenance. The read path is even worse: COUNT(DISTINCT user_id) across 100M rows requires a full scan, taking minutes. And COUNT(*) for billing -- where over-counting means you charged an advertiser too much -- cannot tolerate the eventual consistency of read replicas.
Where It Breaks
Problem 1: Row-based databases are wrong for analytics. PostgreSQL stores data row-by-row. An analytics query like "count clicks per ad in the last hour" reads every column of every row, even though it only needs ad_id and timestamp. A column-oriented database (ClickHouse, Druid) stores each column separately, reading only the 2 columns needed and compressing them 10-50x better than row storage.
Problem 2: COUNT(DISTINCT) does not scale. Exact unique counting requires maintaining a set of all seen user IDs. At 100M clicks/hour with 20M unique users, that set consumes gigabytes of memory and the sort/hash operation takes seconds. HyperLogLog gives you approximate unique counts (2% error) using 4 KB per counter instead of gigabytes.
Problem 3: Late events corrupt window boundaries. An event with timestamp 12:59:58 arrives at 13:00:03 (5 seconds late). If you have already closed the 12:00-13:00 window and reported the count, this event is lost. Stream processing frameworks (Flink) handle this with watermarks -- a configurable deadline after which late events are either incorporated into the window or discarded.
Problem 4: Deduplication at 0.1% error costs millions. Network retries, client-side retransmission, and Kafka exactly-once failures all produce duplicate events. At 1B events/day, 0.1% duplication = 1M duplicate clicks. If the average CPC (cost per click) is $1, that is $1M/day in incorrect billing. You need multi-layer dedup.
The Real Design
┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐
│ Click Event │────>│ Kafka │────>│ Flink Aggregation │
│ (from ad │ │ (raw │ │ ┌────────────────┐ │
│ server) │ │ events) │ │ │ Dedup (Bloom) │ │
└──────────────┘ └──────────────┘ │ │ Window (1min) │ │
│ │ Aggregate │ │
│ │ HLL (uniques) │ │
│ └────────────────┘ │
└──────────┬───────────┘
│
┌─────────────────────────┼────────────┐
│ │ │
┌────────v──────┐ ┌────────v──────┐ │
│ ClickHouse │ │ Kafka │ │
│ (aggregated │ │ (aggregated │ │
│ results) │ │ output) │ │
└───────────────┘ └───────────────┘ │
│
┌───────────────────────┘
│
┌────────v──────────┐
│ Reconciliation │
│ (Spark batch, │
│ runs daily) │
└───────────────────┘
Why ClickHouse
ClickHouse was purpose-built for this exact use case at Yandex. Here is why it is the right choice over PostgreSQL, Druid, or a generic data warehouse.
Column-oriented storage: A query like "count clicks per ad for the last hour" reads only the ad_id and timestamp columns. In PostgreSQL, this reads the entire row (200 bytes per event). In ClickHouse, it reads only 16 bytes per event (8 bytes for each column). That is 12.5x less I/O.
Compression: ClickHouse compresses columns individually. A column of ad_id values (repeated integers) compresses 20-50x with LZ4. A column of timestamps (monotonically increasing) compresses 100x+ with delta encoding. Raw data: 200 GB/day. Compressed in ClickHouse: ~10 GB/day.
MergeTree engine: ClickHouse's primary storage engine sorts data by a primary key (e.g., ad_id, timestamp) and stores it in sorted chunks. Range queries on the primary key skip entire chunks, enabling sub-second queries on billions of rows.
-- ClickHouse table
CREATE TABLE click_aggregates (
ad_id UInt64,
window_start DateTime,
window_end DateTime,
total_clicks UInt64,
unique_users_hll AggregateFunction(uniqCombined, UInt64),
device_mobile UInt32,
device_desktop UInt32,
geo_us UInt32,
geo_eu UInt32,
geo_other UInt32
)
ENGINE = AggregatingMergeTree()
ORDER BY (ad_id, window_start);
Query performance: ClickHouse scans 1-2 billion rows per second per core. A query across 1 day of data (50M aggregate rows) completes in 50ms on a single node. This is why dashboards feel instant.
Flink Watermarks and Late Event Handling
Flink processes the event stream and aggregates clicks into 1-minute tumbling windows. But events arrive out of order.
Event timeline (as received by Flink):
12:59:57 click on ad_123 (on time)
12:59:59 click on ad_456 (on time)
13:00:01 click on ad_789 (on time, new window)
13:00:03 click on ad_123 (timestamp: 12:59:58, LATE by 5 seconds)
Watermarks: Flink tracks the "expected" event time using watermarks. A watermark at time T means "I believe all events with timestamp <= T have arrived." The watermark advances as events flow through.
Watermark strategy: max_event_time - allowed_lateness
allowed_lateness = 5 minutes
Event at 13:00:03 has timestamp 12:59:58
Current watermark: 12:59:57 (max seen so far minus 5 min = still open)
Window [12:59, 13:00) has not been closed yet -> event is counted
Later, when watermark advances past 13:05:00, the [12:59, 13:00) window closes.
Any events for that window arriving after 13:05 are dropped or sent to a side output.
Allowed lateness trade-off: Higher lateness = more accurate counts (fewer dropped events) but higher memory usage (windows stay open longer, accumulating state). For ad click billing: 5-minute lateness captures 99.9% of events. The remaining 0.1% goes to the reconciliation layer.
Triple Deduplication (Uber's Approach)
Uber's financial reporting pipeline uses three layers of deduplication because at their scale, even a small duplicate rate causes significant billing errors.
Layer 1: Client-side deduplication
The ad server generates a unique click_id (UUID) for each click event. If the client retransmits the event (network timeout), it sends the same click_id. The first receiver (Kafka producer or API gateway) checks a short-lived cache:
def receive_click(event):
if redis.setnx(f"click_dedup:{event.click_id}", 1):
redis.expire(f"click_dedup:{event.click_id}", 300) # 5 min TTL
kafka.produce("raw-clicks", event)
else:
pass # duplicate, discard
Layer 2: Kafka-level deduplication
Kafka producers use enable.idempotence=true, which assigns a sequence number to each message. If the producer retransmits (due to broker ACK timeout), Kafka deduplicates based on the sequence number. This handles producer-to-broker duplicates but not application-level duplicates.
Layer 3: Aggregation-layer deduplication (Bloom filter in Flink)
In the Flink aggregation job, each event passes through a Bloom filter keyed by click_id. If the Bloom filter reports "seen," the event is dropped.
Bloom filter sizing:
Window: 1 hour of events = ~42M events
False positive rate: 0.01% (1 in 10,000)
Formula: m = -(n * ln(p)) / (ln(2))^2
= -(42M * ln(0.0001)) / 0.4805
= ~805M bits = ~100 MB per worker
With 10 Flink workers: ~1 GB total
Bloom filter is cleared at window boundary.
The false positive rate of 0.01% means 1 in 10,000 legitimate events is incorrectly dropped. At 1B events/day, that is 100K events. For billing, this under-count is safer than over-counting (you owe the advertiser a small refund rather than overcharging). The daily reconciliation job catches and corrects this.
Hot Shard Mitigation
A viral ad gets 100x the normal click rate. All events for that ad_id hash to the same Kafka partition and the same Flink worker. That worker is overwhelmed while others sit idle.
Solution: salted partitioning
Instead of partitioning by ad_id, partition by ad_id + random_salt(0-9):
Events for the same ad spread across 10 partitions. Each Flink worker produces a partial aggregate. A second aggregation stage merges the 10 partial results:
Worker 1: ad_123_0 -> 847 clicks
Worker 2: ad_123_1 -> 923 clicks
...
Worker 10: ad_123_9 -> 756 clicks
Merger: ad_123 -> sum(847 + 923 + ... + 756) = 8,412 clicks
This adds a second aggregation stage but eliminates hot shards. The trade-off: unique counts cannot simply be summed across partitions (duplicates might span partitions). Use HyperLogLog merge to combine partial unique counts: HLL_MERGE(hll_0, hll_1, ..., hll_9).
Deep Dives

Deep Dive 1: HyperLogLog for Approximate Distinct Counting
Exact distinct counting (COUNT(DISTINCT user_id)) requires maintaining a set of all seen user IDs. At 20M unique users, that set is 160 MB (8 bytes per ID). Per ad, per window. For 500K active ads per hour: 500K * 160 MB = 80 TB. Impossible.
HyperLogLog approximates distinct counts using a fixed 4-12 KB of memory per counter, regardless of how many unique elements it has seen.
How it works (simplified):
1. Hash each user_id to a 64-bit value
2. Count the number of leading zeros in the hash
3. Track the MAXIMUM number of leading zeros seen
4. The max leading zeros estimates log2(distinct count)
If the max leading zeros is 20, the estimated distinct count is 2^20 = ~1M
HyperLogLog improves on this by using 2^p "registers" (buckets),
each tracking its own max leading zeros, then harmonically averaging.
Precision vs. memory:
| Precision (p) | Registers | Memory | Relative error (std dev) |
|---|---|---|---|
| 10 | 1,024 | 1 KB | 3.25% |
| 12 | 4,096 | 4 KB | 1.62% |
| 14 | 16,384 | 16 KB | 0.81% |
| 16 | 65,536 | 64 KB | 0.41% |
For ad click counting, p=12 (1.62% error, 4 KB) is sufficient. 500K active ads * 4 KB = 2 GB. Fits in memory on a single Flink worker.
HyperLogLog is mergeable: You can merge two HLL counters into one by taking the element-wise maximum of their registers. This is critical for distributed aggregation -- each Flink worker maintains a local HLL, and the merger combines them without re-processing the raw events.
Deep Dive 2: Reconciliation with Batch Layer
The real-time pipeline prioritizes speed over perfect accuracy. A daily batch job (Spark) reprocesses the raw events from Kafka (retained for 7 days) and produces "ground truth" aggregates.
Real-time pipeline: latency 10 seconds, accuracy ~99.9%
Batch pipeline: latency 24 hours, accuracy 100%
Reconciliation: compare real-time vs. batch counts
If |real_time - batch| / batch < 0.1%: counts match, no action
If discrepancy > 0.1%: flag for investigation, use batch counts for billing
Why keep the batch layer? Three reasons:
-
Billing accuracy: Advertisers pay per click. The batch layer is the billing source of truth. The real-time layer powers dashboards (where 99.9% accuracy is fine for monitoring).
-
Bug detection: If the real-time pipeline has a bug (double-counting, missed dedup), the batch pipeline catches it. A discrepancy alert fires, and engineers investigate.
-
Reprocessing: If the real-time pipeline was wrong for 3 hours (e.g., a deployment bug), you can reprocess those 3 hours from raw Kafka events through the batch pipeline. This is much easier than trying to "fix" the real-time pipeline's state.
Lambda vs. Kappa architecture:
Lambda (batch + streaming): Two codepaths, same logic, maintained separately. Double the engineering effort, double the bugs. Twitter famously struggled with this -- their batch and streaming counts diverged by 5-10% because subtle differences in logic accumulated.
Kappa (streaming only): One codepath (Flink), with the ability to replay Kafka from the beginning to reprocess. Simpler, but requires Kafka retention long enough for reprocessing (7 days of raw events = 1.4 TB). Twitter migrated to Kappa and reported significant reduction in operational overhead.
I would recommend Kappa with a lightweight batch reconciliation check (not a full batch pipeline). The reconciliation runs daily, compares Flink's output against a simple Spark count of raw events, and alerts on discrepancies. This gives you streaming's speed with batch's verification, without maintaining two full aggregation pipelines.
Deep Dive 3: Exactly-Once Semantics in the Pipeline
End-to-end exactly-once counting requires exactly-once at every stage:
Source (ad server) -> Kafka -> Flink -> ClickHouse
Stage 1: Ad server -> Kafka
- Kafka producer idempotence: sequence numbers prevent duplicate writes
- If producer retries, Kafka deduplicates by sequence number
- Guarantees: at-most-once per message (no duplicates FROM producer)
Stage 2: Kafka -> Flink
- Flink checkpointing: periodically snapshots Kafka offsets + Flink state
- On failure recovery: Flink restores from checkpoint, replays from last offset
- Events between last checkpoint and failure are reprocessed
- The dedup Bloom filter (part of Flink state) prevents double-counting
Stage 3: Flink -> ClickHouse
- Flink commits aggregated results to ClickHouse
- Uses Flink's two-phase commit (TPC) protocol:
Phase 1: Flink writes to ClickHouse (uncommitted)
Phase 2: After Flink checkpoint succeeds, ClickHouse commits
- If Flink crashes between phases: on recovery, Flink either re-commits
or rolls back, depending on checkpoint state
Practical reality: True end-to-end exactly-once is extremely hard and adds 20-30% latency overhead (due to barriers, TPC, and checkpoint coordination). Most production systems use at-least-once with application-level deduplication (the triple-dedup approach) and accept that 99.9% accuracy at low latency is more valuable than 100% accuracy at high latency. The batch reconciliation catches the remaining 0.1%.
Alternative Designs
Alternative 1: Pure Database Approach (TimescaleDB)
Use TimescaleDB (time-series extension for PostgreSQL) with continuous aggregates. It automatically maintains materialized views of time-bucketed aggregates as data arrives.
Alternative 2: Druid for Real-Time OLAP
Apache Druid ingests events and pre-computes rollups during ingestion. Query latency is sub-second on billions of events. Used by Airbnb, Netflix, and Walmart.
Alternative 3: Materialized Views in a Data Warehouse (BigQuery, Snowflake)
Stream events to BigQuery via Pub/Sub. Use BigQuery's streaming buffer + materialized views for near-real-time aggregates.
| Aspect | Flink + ClickHouse | TimescaleDB | Druid | BigQuery Streaming |
|---|---|---|---|---|
| Ingest throughput | 100K+ events/sec | 50K events/sec | 100K+ events/sec | 100K+ events/sec |
| Query latency | 50-200ms | 100-500ms | 50-200ms | 1-5 seconds |
| Aggregation latency | 5-10 seconds | Minutes (continuous agg) | 1-5 seconds | 10-60 seconds |
| Exact distinct counts | No (HLL only) | Yes (SQL) | Approximate (Theta) | Yes (SQL) |
| Operational complexity | High (Flink + ClickHouse) | Low (single database) | High (Druid cluster) | Low (managed) |
| Cost at 1B events/day | ~$3K/mo | ~$2K/mo | ~$5K/mo | ~$8K/mo |
| Late event handling | Watermarks (flexible) | Upsert on conflict | Best-effort | Streaming buffer (1 hr) |
Flink + ClickHouse for maximum control and performance. TimescaleDB if your team already runs PostgreSQL and the volume is under 50K events/sec. Druid if you need sub-second OLAP on raw events (not pre-aggregated). BigQuery if you are a GCP shop and can tolerate higher query latency.
Scaling Math Verification
Ingestion (58K events/sec peak):
- Kafka: 6 partitions, 10K msgs/sec per partition. Fine.
- Flink: 10 workers, each processing 5,800 events/sec. At 50 microseconds per event (dedup + aggregate): 29% CPU utilization per worker. Headroom.
Aggregation output:
- 1-minute windows across 500K active ads: 500K aggregate records per minute = 8,333/sec
- ClickHouse ingests 100K+ rows/sec on a single node. 8% utilization.
HLL memory:
- 500K active ads * 4 KB per HLL = 2 GB per Flink worker (state)
- With 10 workers, 2 GB each = 20 GB total HLL state. Per-worker state is manageable.
ClickHouse query performance:
- Daily aggregates: 50M rows * 50 bytes = 2.5 GB. Compressed: ~125 MB.
- Full-day scan: 125 MB / 500 MB/sec (single core SSD read) = 0.25 seconds.
- With 8 cores: 31ms. Sub-100ms query latency confirmed.
Failure Analysis
| Component | Current capacity | At 10x (10B events/day) | Breaks? | Fix |
|---|---|---|---|---|
| Kafka (6 partitions) | 58K events/sec peak | 580K events/sec | Yes | Scale to 60 partitions, add brokers |
| Flink workers (10) | 5.8K events/worker/sec | 58K events/worker | Yes | Scale to 100 workers |
| HLL memory per worker | 2 GB | 20 GB | Maybe | Reduce precision to p=10, or shard by ad_id |
| ClickHouse (1 node) | 8.3K writes/sec | 83K writes/sec | No | Single ClickHouse node handles this |
| ClickHouse storage | 10 GB/day compressed | 100 GB/day | No | Add cold storage tier with TTL |
| Dedup Bloom filter | 100 MB per worker | 1 GB per worker | No | Use time-partitioned Bloom filters, rotate hourly |
| Reconciliation (Spark) | 200 GB/day | 2 TB/day | Maybe | Larger Spark cluster, partitioned processing |
The first bottleneck at 10x is Kafka and Flink worker count. Both scale linearly. The more interesting challenge is the dedup Bloom filter at 1 GB per worker (10 GB total). At this scale, consider time-partitioned Bloom filters with hourly rotation to bound memory growth, or switch to a distributed dedup store (Redis or RocksDB-backed Flink state) with time-based expiration.
What's Expected at Each Level
| Aspect | Mid-Level | Senior | Staff+ |
|---|---|---|---|
| Architecture | Kafka -> consumer -> database | Kafka -> Flink -> OLAP database (ClickHouse/Druid) | Salted partitioning, watermarks, two-phase commit |
| Aggregation | GROUP BY in SQL | Stream processing with tumbling windows | Flink watermarks, allowed lateness, window triggers |
| Distinct counting | COUNT(DISTINCT) | Mentions HyperLogLog | HLL precision/memory trade-off, HLL merge for distributed |
| Deduplication | "Use a unique constraint" | Event dedup with idempotency key | Triple dedup (client, Kafka idempotence, Bloom filter in Flink) |
| Hot shard | Not mentioned | Mentions partition skew | Salted keys with two-stage aggregation, HLL merge |
| Accuracy | "Count everything" | Acknowledges trade-off between speed and accuracy | Lambda vs Kappa, reconciliation batch layer, billing vs dashboard |
| OLAP choice | PostgreSQL | "Use ClickHouse/Druid" | Why ClickHouse (column-oriented, compression, MergeTree), Yandex origin |
| Real-world reference | None | "Twitter uses streaming" | Twitter Lambda-to-Kappa migration, Uber triple dedup, ClickHouse at Yandex |
The single most important signal at any level: do you understand that the pipeline has two consumers with different accuracy requirements (dashboards and billing) and that trying to serve both with one pipeline at one accuracy level is the wrong approach? Real-time for dashboards, batch-reconciled for billing.
References from Our Courses
- Kafka Partitions and Ordering — ordered click event ingestion for aggregation
- Flink and Stream Processing — windowed aggregation of click counts
- HyperLogLog and Count-Min — approximate unique click counting at scale
- OLTP vs OLAP — ClickHouse for analytical queries on aggregated data
Red Team This Design
Ready to stress-test this architecture? The Attack companion tears apart every decision in this design — from hardware physics to security holes to what actually happens at 10x scale.