Kafka Streams Windowing¶
Windowed operations group stream records into finite time intervals for aggregation, supporting tumbling (non-overlapping), hopping (overlapping), session (gap-based), and sliding (join) windows with configurable grace periods for late-arriving events.
Key Facts¶
- Windows are evaluated continuously as new records arrive
- Each record belongs to one or more windows depending on window type
- Window retention controls how many past windows are kept in state stores
- Grace period defines additional time after window closes to accept late-arriving records; after expiry, late records are dropped
- Retention must be >= window size + grace period
- Stream time: maximum timestamp seen so far across all processed records (not wall clock)
- Timestamp assignment controlled by
message.timestamp.type:CreateTime(producer-set) orLogAppendTime(broker-set)
Patterns¶
Tumbling Window (Fixed, Non-Overlapping)¶
// 10-minute tumbling windows
stream.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(10)))
.count()
.toStream()
.to("counts");
// With grace period for late events
TimeWindows.ofSizeAndGrace(Duration.ofMinutes(10), Duration.ofMinutes(2))
// Windows: [00:00, 00:10), [00:10, 00:20), ...
// Record at 00:07 -> window [00:00, 00:10)
Hopping Window (Fixed, Overlapping)¶
// 10-minute windows advancing every 5 minutes
stream.groupByKey()
.windowedBy(
TimeWindows.ofSizeAndGrace(Duration.ofMinutes(10), Duration.ofMinutes(2))
.advanceBy(Duration.ofMinutes(5))
)
.count();
// Windows: [00:00, 00:10), [00:05, 00:15), [00:10, 00:20), ...
// Record at 00:07 -> belongs to BOTH [00:00, 00:10) AND [00:05, 00:15)
Session Window (Dynamic, Gap-Based)¶
// Sessions close after 5 minutes of inactivity
stream.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5)))
.count();
// Sessions tracked independently per key
// Session duration varies based on actual activity patterns
Sliding Window (Join Windows)¶
// For stream-stream joins: how far apart events can be joined
KStream<String, String> joined = stream1.join(
stream2,
(left, right) -> left + "+" + right,
JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1))
);
ksqlDB Windowing¶
-- Tumbling window aggregation
CREATE TABLE pageviews_per_region AS
SELECT regionid, COUNT(*) FROM pageviews
WINDOW TUMBLING (SIZE 30 SECONDS)
GROUP BY regionid
EMIT CHANGES;
-- Hopping window with retention and grace
CREATE TABLE hourly_counts AS
SELECT userid, COUNT(*) FROM events
WINDOW HOPPING (SIZE 1 HOUR, ADVANCE BY 10 MINUTES,
RETENTION 7 DAYS, GRACE PERIOD 30 MINUTES)
GROUP BY userid
EMIT CHANGES;
-- Session window
SELECT userid, COUNT(*) AS session_count
FROM pageviews WINDOW SESSION (5 MINUTES)
GROUP BY userid EMIT CHANGES;
Time Semantics¶
| Time Concept | Description |
|---|---|
| Event time | When the record was created at the source |
| Ingestion time | When the broker stored the record |
| Processing time | When the stream processing app processes it |
| Stream time | Maximum timestamp seen so far |
Timestamp Extractors¶
// Built-in extractors
Consumed.with(Serdes.String(), Serdes.String())
.withTimestampExtractor(new FailOnInvalidTimestamp()); // throws on invalid
.withTimestampExtractor(new LogAndSkipOnInvalidTimestamp()); // logs, skips
.withTimestampExtractor(new WallclockTimestampExtractor()); // uses system time
// Custom: extract from message body
.withTimestampExtractor((record, partitionTime) -> {
MyEvent event = (MyEvent) record.value();
return event.getTimestamp();
});
Gotchas¶
- Grace period default is 24 hours in ksqlDB - this can consume significant memory; tune based on actual late-data characteristics
- Late records after grace period are silently dropped - no error, no DLQ; monitor dropped record metrics
- Retention must be >= window size + grace period - otherwise windows are evicted before they can accept late data
- Session windows can merge - if two sessions for the same key overlap after a late event arrives, they merge into one larger session
- Window start times are aligned to epoch - tumbling windows start at Unix epoch boundaries, not at application start time
- Slow writer = open segment = retention exceeds config - if writing is slow, segment stays open for a long time, so actual retention can be much longer than configured
See Also¶
- kafka streams - core topology, operations, joins
- kafka streams state stores - state stores backing windowed aggregations
- kafka streams time semantics - event time vs processing time, watermarks
- ksqldb - SQL windowing syntax
- Kafka Streams Windowing Documentation