Flink, Watermarks, and Windowed Aggregation
TL;DR
Stream processing lets you compute over unbounded data in real time. Flink does this with event-time semantics, watermarks, and exactly-once checkpointing — but half the time a simple Kafka consumer with Redis counters is all you need.
What It Is

Batch processing has a clear start and end. You load yesterday's data, run a computation, store the result. MapReduce. Spark jobs. Nightly ETL. The world understood this model for years.
Stream processing has no end. Data arrives continuously. You compute results as events flow through. No waiting for "the batch to finish." Results appear in seconds, not hours.
The difference isn't just speed. It changes what's possible. You can't detect fraud in a nightly batch — the money's gone by morning. You can't alert on server failures with a 15-minute batch window — your users already left. Real-time requires streaming.
Uber processes over 1 trillion messages per day through their streaming infrastructure. Their surge pricing, ETA calculations, and fraud detection all run on stream processing. They didn't start with Flink — they started with simple consumers and migrated when the complexity demanded it.
Why Stream Processing Exists
A simple Kafka consumer can read events and process them one by one. That's technically stream processing. But certain problems need more.
Aggregation over time. "Count ad clicks per campaign in the last 5 minutes." A single consumer would need to maintain state (click counts), handle late arrivals (events that arrive after the 5-minute window closes), and survive crashes (don't lose the counts). Building this from scratch is reinventing Flink badly.
Multi-stream joins. "Match each ad impression with its click within 30 minutes." You're joining two streams — impressions and clicks — with a time-bounded window. Doing this correctly with raw consumers requires managing buffers, timeouts, and state that survives restarts.
Event time processing. "Compute hourly revenue based on when orders were placed, not when my system received them." Events arrive out of order. Network delays cause gaps. The system must handle late data gracefully. This is where watermarks come in.
If your use case is "read message, call API, done" — skip Flink. A Kafka consumer is fine. But if your use case involves aggregation, joins, time windows, or complex event patterns, you need a stream processing framework.
Flink Architecture
Flink runs as a cluster with two components: a JobManager and multiple TaskManagers.
┌─────────────┐
│ JobManager │
│ (coordinator)│
│ │
│ - job graph │
│ - scheduling│
│ - checkpts │
└──────┬──────┘
┌────────────┼────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│TaskMgr 1 │ │TaskMgr 2 │ │TaskMgr 3 │
│ │ │ │ │ │
│ slot: A │ │ slot: C │ │ slot: E │
│ slot: B │ │ slot: D │ │ slot: F │
└──────────┘ └──────────┘ └──────────┘
JobManager: the coordinator. It takes your job graph (the DAG of operations), schedules tasks across TaskManagers, triggers checkpoints, and handles failures. One per cluster (with standby for HA).
TaskManagers: the workers. Each TaskManager has a fixed number of "task slots" — each slot runs one parallel subtask. A TaskManager with 4 slots can run 4 parallel operators.
Job graph (dataflow DAG): your stream processing logic compiles into a directed acyclic graph of operators.
[Kafka Source] → [Filter] → [KeyBy user_id] → [Window 5min] → [Sum] → [Kafka Sink]
↓ ↓ ↓
Parallelism: 6 Parallelism: 6 Parallelism: 3
(6 Kafka partitions) (6 key groups) (3 output partitions)
Each operator can run with configurable parallelism. Flink handles data shuffling between operators. If you key by user_id, all events for the same user route to the same operator instance — similar to Kafka's partition-by-key guarantee.
Event Time vs Processing Time
This is the most misunderstood concept in stream processing. Get it wrong and your aggregations will be incorrect.
Processing time: the wall clock on the machine when the event is processed. Simple. Wrong.
Event time: the timestamp embedded in the event when it actually happened. Correct. Harder.
Example — Why Processing Time Fails:
Event A: user clicked ad at 10:00:00 (event time)
→ arrives at Flink at 10:00:01 (processing time)
Event B: user clicked ad at 10:00:02 (event time)
→ delayed by network congestion
→ arrives at Flink at 10:00:45 (processing time)
Processing time window [10:00:00 - 10:00:05]:
→ Contains Event A (arrived at 10:00:01)
→ Does NOT contain Event B (arrived at 10:00:45)
→ B falls into a later window, even though it happened
during the first window!
Event time window [10:00:00 - 10:00:05]:
→ Contains both Event A and Event B
→ Correct result, regardless of arrival delays
Event time is almost always what you want. The only exception is monitoring Flink itself — "how many events per second is Flink processing right now?" — which naturally uses processing time.
// Assign timestamps from the event's own field
DataStream<ClickEvent> clicks = env
.addSource(new KafkaSource<>("clicks"))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<ClickEvent>forBoundedOutOfOrderness(
Duration.ofSeconds(10))
.withTimestampAssigner(
(event, ts) -> event.getClickTime())
);
Watermarks — Telling Flink When to Close the Window
Watermarks answer one question: "Have all events before time T arrived?"
Without watermarks, Flink would never know when to close a time window. Events can be delayed by seconds, minutes, or hours. How long should Flink wait before declaring a 5-minute window "complete" and emitting results?
How Watermarks Work
A watermark is a special marker in the event stream that says: "No event with a timestamp earlier than T will ever arrive again."
Stream of events with timestamps:
[10:00:01] [10:00:03] [10:00:02] [10:00:05] [W:10:00:00] [10:00:07]
↑
Watermark: t=10:00:00
"All events before 10:00:00
have arrived"
When watermark reaches 10:05:00:
→ Window [10:00 - 10:05] can close
→ Results are emitted
Bounded Out-of-Orderness
The most common watermark strategy. You tell Flink: "events can be at most N seconds late." Flink generates watermarks as max_event_time_seen - N.
// "Events may arrive up to 10 seconds late"
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(10))
// If the latest event timestamp is 10:05:00
// Watermark = 10:05:00 - 10s = 10:04:50
// Window [10:00 - 10:05] still open (watermark < 10:05)
// When latest event timestamp reaches 10:05:10
// Watermark = 10:05:10 - 10s = 10:05:00
// Window [10:00 - 10:05] NOW closes
Set the bound too low and you lose late events. Set it too high and windows stay open too long, increasing latency and memory usage. There's no right answer — it depends on your data's lateness characteristics. Look at your p99 event delay and use that.
Late Events
What happens when an event arrives after the watermark has passed? By default, Flink drops it. But you can configure side outputs to capture late events.
OutputTag<ClickEvent> lateTag = new OutputTag<>("late-clicks"){};
SingleOutputStreamOperator<ClickCount> result = clicks
.keyBy(click -> click.getCampaignId())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1)) // grace period
.sideOutputLateData(lateTag)
.sum("count");
// Get late events for separate handling
DataStream<ClickEvent> lateClicks = result.getSideOutput(lateTag);
// Log them, reprocess them, alert on them — your choice
Windows — Grouping Unbounded Data
Windows slice an infinite stream into finite chunks for aggregation. Three types cover 95% of use cases.
Tumbling Windows
Fixed-size, non-overlapping. Every event belongs to exactly one window.
Time: |--- 5 min ---|--- 5 min ---|--- 5 min ---|
Window: [ Window 1 ][ Window 2 ][ Window 3 ]
Events: [A, B, C ][D, E ][F, G, H, I ]
Each window produces one output:
Window 1: count = 3
Window 2: count = 2
Window 3: count = 4
// Count clicks per campaign every 5 minutes
clicks
.keyBy(click -> click.getCampaignId())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.sum("count");
Use case: hourly sales reports, 5-minute metric aggregations, daily user counts. Any time you want non-overlapping buckets.
Sliding Windows
Fixed-size, overlapping. A single event may belong to multiple windows. Useful for moving averages.
Window size: 10 min, Slide: 5 min
Time: |--- 10 min ---|
| |--- 10 min ---|
| | |--- 10 min ---|
Window: [ Window 1 ]
[ Window 2 ]
[ Window 3 ]
Event at t=7 falls into Window 1 AND Window 2
// Average response time over 10 minutes, updated every 5 minutes
requests
.keyBy(req -> req.getServiceName())
.window(SlidingEventTimeWindows.of(
Time.minutes(10), // window size
Time.minutes(5) // slide interval
))
.aggregate(new AverageAggregator());
Use case: "average latency over the last 10 minutes, updated every minute." Moving averages. Trend detection. Anomaly detection with overlapping reference windows.
Session Windows
Gap-based. A session window closes when no events arrive for a specified gap duration. Each key gets its own session.
Gap: 5 minutes
User A events: [click][click][click]----10 min gap----[click][click]
[ Session 1 ] [ Session 2 ]
User B events: [click]---2min---[click]---2min---[click]---8min---[click]
[ Session 1 ] [Sess 2]
// User session activity: session ends after 5 minutes of inactivity
userEvents
.keyBy(event -> event.getUserId())
.window(EventTimeSessionWindows.withGap(Time.minutes(5)))
.process(new SessionSummaryFunction());
Use case: user session analysis ("how many pages did user X visit in this session?"), shopping cart abandonment detection, conversation grouping in chat systems.
Checkpointing — Surviving Failures
Flink's killer feature for production use. Periodic snapshots of all operator state to durable storage. When a failure occurs, Flink restores the latest checkpoint and replays from Kafka.
How Checkpointing Works
Normal operation:
Source (offset: 1000) → Process → Sink
Checkpoint triggers:
1. JobManager inserts checkpoint barrier into streams
2. Each operator saves its state when barrier passes through
3. State saved to checkpoint storage (S3, HDFS)
Checkpoint #42:
Source state: Kafka offset = 1000
Window state: {campaign_A: 47 clicks, campaign_B: 23 clicks}
Saved to: s3://checkpoints/job-123/chk-42/
Failure at offset 1050:
1. Flink restores checkpoint #42
2. Source rewinds to Kafka offset 1000
3. Window state restored: {campaign_A: 47, campaign_B: 23}
4. Replays offsets 1000-1050 from Kafka
5. Processing continues from offset 1050+
// Enable checkpointing
env.enableCheckpointing(60_000); // every 60 seconds
// Configure checkpoint storage
env.getCheckpointConfig().setCheckpointStorage(
"s3://my-bucket/flink-checkpoints/"
);
// Exactly-once semantics (default, but let's be explicit)
env.getCheckpointConfig().setCheckpointingMode(
CheckpointingMode.EXACTLY_ONCE
);
// Tolerate up to 3 consecutive checkpoint failures
env.getCheckpointConfig()
.setTolerableCheckpointFailureNumber(3);
The Barrier Alignment Problem
Exactly-once checkpointing requires barrier alignment. When an operator receives data from multiple input streams, it must wait for checkpoint barriers from all inputs before snapshotting its state. During this wait, it buffers data from the faster streams.
Input 1: [data][data][BARRIER][data][data]
↓
operator waits...
↓ (buffers Input 1 data)
Input 2: [data][data][data][BARRIER]
↓
both barriers arrived → snapshot state → continue
This causes backpressure during checkpointing. For lower latency at the cost of at-least-once (instead of exactly-once), use unaligned checkpoints introduced in Flink 1.11.
Gotcha
Checkpoint interval is a trade-off. Too frequent (every 5 seconds) adds overhead — state snapshots consume CPU and I/O. Too infrequent (every 10 minutes) means more data to replay on failure. 30-60 seconds is a reasonable default for most workloads.
Stateful Processing
The real power of Flink is stateful operators. Each operator can maintain state that survives restarts (via checkpointing) and is partitioned by key.
// Stateful fraud detection: flag if user makes > 3 purchases in 60s
public class FraudDetector extends KeyedProcessFunction<
String, Transaction, Alert> {
private ListState<Long> recentTimestamps;
@Override
public void open(Configuration config) {
recentTimestamps = getRuntimeContext().getListState(
new ListStateDescriptor<>("timestamps", Long.class));
}
@Override
public void processElement(Transaction tx, Context ctx,
Collector<Alert> out) throws Exception {
long now = tx.getTimestamp();
long windowStart = now - 60_000;
recentTimestamps.add(now);
// Count recent transactions, prune old ones
int count = 0;
List<Long> valid = new ArrayList<>();
for (Long ts : recentTimestamps.get()) {
if (ts >= windowStart) { valid.add(ts); count++; }
}
recentTimestamps.update(valid);
if (count > 3) {
out.collect(new Alert(tx.getUserId(),
"Suspicious: " + count + " transactions in 60s"));
}
}
}
This state is automatically checkpointed. If the TaskManager crashes, Flink restores state and replays from Kafka. No events lost, no duplicate alerts.
Flink vs Spark Streaming
This comparison comes up constantly in interviews. Here's the honest breakdown.
| Dimension | Flink | Spark Structured Streaming |
|---|---|---|
| Processing model | True streaming (event-by-event) | Micro-batch (100ms+ batches) |
| Latency | Milliseconds | Seconds (batch interval) |
| Event time | Native, first-class watermarks | Supported (since 2.1) |
| State | Built-in, managed, checkpointed | External or internal state store |
| Batch processing | Supported but secondary | Native strength |
| Ecosystem | Smaller, streaming-focused | Massive (MLlib, GraphX, SQL) |
| Recovery | Checkpoint + replay | Checkpoint + restart micro-batches |
When to pick Flink: you need sub-second latency, complex event-time processing, or heavy stateful computation. Fraud detection at a fintech. Real-time pricing at a marketplace. Uber's surge pricing runs on Flink.
When to pick Spark Streaming: you already have Spark for batch, your streaming latency requirements are seconds (not milliseconds), and you want one framework for everything. Many data teams standardize on Spark because their batch pipelines already use it.
The spicy opinion: Flink is technically superior for streaming, but Spark wins on practicality for most teams. Most organizations have data engineers who know Spark. They have Spark clusters already running. Adding Flink means a new cluster, new monitoring, new expertise. Unless your latency requirements demand it, the operational cost of running two frameworks isn't worth it.
When NOT to Use Flink
This is the most underrated section. Flink is powerful. It's also often overkill.
Simple Consumer Pattern
If your logic is "read event, call API, write result," you don't need Flink. A plain Kafka consumer group with 10 instances handles this perfectly.
# This does NOT need Flink
from kafka import KafkaConsumer
consumer = KafkaConsumer('order-events', group_id='email-sender')
for message in consumer:
order = json.loads(message.value)
send_confirmation_email(order['email'], order['order_id'])
# Kafka auto-commits offset after processing
Adding Flink here means deploying a JobManager, TaskManagers, configuring checkpoints, and managing state — all for a stateless read-and-forward pattern. Don't.
Counter Pattern
"Count events per key in the last 5 minutes." This sounds like a windowed aggregation — Flink territory. But often, Redis does it better.
# Simple windowed counter WITHOUT Flink
import redis
import math
r = redis.Redis()
def count_event(event_type):
# 5-minute bucket key
bucket = math.floor(time.time() / 300) * 300
key = f"count:{event_type}:{bucket}"
pipe = r.pipeline()
pipe.incr(key)
pipe.expire(key, 600) # expire after 10 minutes
pipe.execute()
def get_current_count(event_type):
bucket = math.floor(time.time() / 300) * 300
return int(r.get(f"count:{event_type}:{bucket}") or 0)
This handles millions of events. No event time. No watermarks. No checkpointing. Just Redis INCR. If you don't need event-time semantics or complex windowing logic, this is the right answer.
The Decision Framework
Do you need windowed aggregation?
NO → Simple Kafka consumer
YES ↓
Do you need event-time processing (not wall-clock time)?
NO → Redis counters with time-bucketed keys
YES ↓
Do you need to join multiple streams?
NO → Redis counters with event-time bucketing might still work
YES ↓
Do you need exactly-once state management?
NO → Redis + idempotent writes
YES → Flink (or Spark Streaming)
Patterns for System Design Interviews
Pattern 1: Real-Time Ad Click Aggregation
[Click events] → [Kafka] → [Flink]
↓
Tumbling window (1 minute)
KeyBy campaign_id
Sum click_count
↓
[Kafka: aggregated-clicks] → [Dashboard DB]
"Design a system that shows advertisers their click counts, updated every minute." This is the canonical Flink use case. Tumbling 1-minute windows, keyed by campaign. Checkpointed to S3. Output to a Kafka topic that feeds the dashboard.
Pattern 2: Fraud Detection
Flink keyed by user_id, maintaining per-user state: recent transactions, average purchase amount, known devices. Each transaction updates state and checks fraud rules. State is checkpointed. PayPal processes 5 billion events per day through a Flink pipeline with sub-2-second latency from transaction to fraud decision. This is where Flink genuinely shines.
Pattern 3: Sessionization
Flink keyed by user_id with session windows (30-minute gap). Groups pageviews into sessions, computing pages viewed, duration, entry/exit pages. Session windows are dynamic -- they grow as events arrive and close when the gap is detected. Each user has independent sessions.
Trade-offs Table
| Trade-off | Choose A | Choose B |
|---|---|---|
| Latency vs Throughput | True streaming/Flink (ms latency) | Micro-batch/Spark (higher throughput per resource) |
| Correctness vs Simplicity | Event-time + watermarks (correct) | Processing-time (simpler, may miscount) |
| Recovery speed vs Overhead | Frequent checkpoints (fast recovery) | Infrequent checkpoints (less I/O overhead) |
| Late data handling vs Memory | Large allowed lateness (catches late data) | Small/no lateness (less state to hold) |
| Flink vs Simple consumer | Flink (stateful, windowed, event-time) | Kafka consumer + Redis (stateless, counter-based) |
| One framework vs Best-of-breed | Spark for batch + streaming (simpler ops) | Flink for streaming + Spark for batch (optimal each) |

Interview Gotchas
Gotcha 1: Don't Default to Flink
The number one mistake: reaching for Flink when the problem doesn't need it. "Count requests per user in the last hour" does not need a stream processing framework. Redis INCR with time-bucketed keys handles this at massive scale with zero operational overhead. Show the interviewer you know when NOT to use Flink.
Gotcha 2: Event Time Requires Watermarks
You can't just say "we'll use event time." You must explain how Flink knows when a time window is complete. That's watermarks. "We'll use bounded out-of-orderness with a 10-second tolerance" is the full answer. Without watermarks, windows never close.
Gotcha 3: Watermark Delay vs Completeness
A 10-second watermark delay means results are delayed by 10 seconds. A 5-minute delay means results are delayed by 5 minutes. The wider the delay, the more late events you capture but the longer users wait for results. There's no universally correct value — it depends on your data.
Gotcha 4: Exactly-Once Requires End-to-End Design
Flink's exactly-once checkpointing only works within Flink. If your sink writes to Kafka, you need Kafka transactions. If your sink writes to a database, you need idempotent writes or two-phase commit. Saying "Flink has exactly-once" without addressing the sink is incomplete.
Gotcha 5: State Size Grows With Keys
Stateful operators in Flink store state per key. If you key by user_id and have 100 million users, that's 100 million state entries. State is stored in RocksDB on disk (for large state) but still needs to be checkpointed. Monitor state size. Expire old state with TTL. Candidates who forget about state growth in their design will get pushed on scalability.