Skip to content

Implement an Aggregator

Level: L3-L4 Topics: Streaming Data, Distributed Systems, Fault Tolerance

Problem Statement

You have N servers, each sending a stream of CPU usage measurements with timestamps. Design and implement an Aggregator class that computes the average CPU usage across all servers at each timestamp.

The aggregator has the following interface:

class Aggregator:
    def __init__(self, n: int):
        # n = number of servers

    def Receive(self, replica_id: int, t: int, measure: float):
        # Called when server replica_id reports CPU usage 'measure' at time t

    def SendAvg(self, t: int, avg: float):
        # Callback: emit the average CPU usage at time t

When the aggregator has received a measurement from every active server for time t, it should call SendAvg(t, average).

Critical requirement: If a server dies (stops sending data), the aggregator should not block forever waiting for it. Dead servers should be detected and excluded from the average.

Background & Constraints

  • Each server sends measurements at regular intervals, but messages may arrive out of order across servers.
  • Server IDs are integers from 0 to N-1.
  • Timestamps are integers representing time steps (e.g., seconds).
  • A server is considered "dead" if it has not sent a measurement for a configurable timeout period.
  • Once a server is considered dead, its measurements should not be expected for computing averages.
  • If a dead server comes back (sends a new measurement), it should be re-included.

Examples

Setup: 3 servers (IDs 0, 1, 2)

Receive(0, 1, 80.0)    // Server 0 reports 80% at t=1
Receive(1, 1, 60.0)    // Server 1 reports 60% at t=1
Receive(2, 1, 90.0)    // Server 2 reports 90% at t=1
→ SendAvg(1, 76.67)    // Average: (80+60+90)/3

Receive(0, 2, 70.0)    // Server 0 reports 70% at t=2
Receive(1, 2, 65.0)    // Server 1 reports 65% at t=2
// Server 2 is silent...

// After timeout, server 2 is considered dead
→ SendAvg(2, 67.5)     // Average: (70+65)/2, excluding dead server 2

Receive(2, 3, 85.0)    // Server 2 comes back at t=3
Receive(0, 3, 72.0)
Receive(1, 3, 68.0)
→ SendAvg(3, 75.0)     // Average: (72+68+85)/3, server 2 re-included

Hints & Common Pitfalls

  • Tracking active servers: Maintain a set of "alive" servers based on recent activity. Use the timeout to transition servers to "dead" status.

  • When to emit: The tricky part is deciding when you have enough data to emit an average for time t. You cannot wait indefinitely for a dead server. One approach: set a wall-clock or logical-time deadline for each timestamp.

  • Out-of-order messages: Server 0 might send time t=5 before server 1 sends t=3. Your data structure must handle accumulating partial data for multiple timestamps simultaneously.

  • Common mistake: Dividing by N instead of the number of servers that actually reported. The denominator should be the number of active servers at that timestamp.

  • Data cleanup: Once SendAvg is called for time t, discard the accumulated data for t to avoid unbounded memory growth.

Follow-Up Questions

  1. Slower SendAvg pace: What if SendAvg should emit averages at a coarser granularity — for example, one average per minute when servers report every second? How do you accumulate and aggregate the data?

  2. Scale to 1M+ streams: If you have over a million servers reporting, what are the bottlenecks? How would you shard or distribute the aggregator itself? Consider hierarchical aggregation (local aggregators per rack/cluster feeding into a global aggregator).

  3. Weighted averages: What if different servers have different weights (e.g., based on traffic volume)? How does this change the interface and implementation?

  4. Percentiles instead of average: If instead of the average you need to emit the P99 latency, how does this change the problem? Can you still do it incrementally?