Watermarks
Watermarks are used in Tektite for determining when aggregation windows can be closed.
Watermarks are generated at the points where data enters the system which are the kafka in
and topic
operators for
Kafka topics hosted in Tektite or the bridge from
operator for data entering from an external topic.
A watermark carries a timestamp and flows with the rest of the data through the network of streams along all the routes that data can flow. When an operator receives a watermark with a particular timestamp, the watermark tells the operator that it shouldn't expect to see any more data with a timestamp less than this value.
As watermarks flow through partitions, joins and unions - all of which can receive data from different processors, the watermark is delayed until a watermark has been received from each sending processor, and then a single watermark with a value of the minimum of the incoming watermarks is forwarded.
Internally, watermarks piggyback on a closely related Tektite concept: barriers. Barriers are periodically injected into streams at sources and flow through the network of streams. They're used to guarantee snapshot isolation.
When a watermark reaches a windowed aggregation, the aggregate
operator makes a decision about which open windows (if any)
should be closed and have results emitted for them.
Tektite supports two strategies for generating watermarks.
Event time watermark
With an event time strategy, the value given to the watermark when injected at the source is the maximum value of any data seen so far
at the source for the processor minus the value of watermark_lateness
. The watermark is based on the actual timestamp of the message
which often does not correspond to when the message is processed.
As an example, let's say the maximum timestamp a kafka in
operator has seen for incoming produced messages is
2024-05-16 09:00:00
and watermark_lateness
is set to 5s
. Then the next watermark to be generated will have a timestamp
value of 2024-05-16 08:59:55
.
This watermark will then flow through the system, and if it reaches a windowed aggregation
the aggregation may decide to close all open windows with a window end <= 2024-05-16 08:59:55
(depending on configuration)
Event time watermarks are the default type and will be injected in any kafka in
, topic
or bridge from
operators automatically
if not explicitly configured.
Here's an example of explicitly configuring a topic
with event time watermarks:
my_topic :=
(topic partitions = 16
watermark_type = event_time
watermark_lateness = 10s
watermark_idle_timeout = 30s)
The same parameters can be used on kafka in
and bridge from
.
With an event time strategy, if no data is received on a processor then watermark value can get stuck, and this could prevent downstream windowed aggregations from closing.
To unstick stuck watermarks a special watermark with a value of -1
is generated if no data is received for watermark_idle_timeout
. If a windowed aggregation receives a watermark of -1
its
upstreams are all idle, and can emit results.
The default value of watermark_lateness
is 1s
. The default value of watermark_idle_timeout
is 1m
Processing time watermark
With a processing time strategy, the value given to the watermark when injected at the source is based on the processing time (i.e.
the current system time on the server) minus the value of watermark_lateness
.
Here's an example of configuring a bridge from
with processing time watermarks:
sales-imported :=
(bridge from sales partitions = 16 props = ("bootstrap.servers" = "mykafka.foo.com:9092")
watermark_type = processing_time
watermark_lateness = 10s) ->
(store stream)