Aggregations
Unlike stateless projections, aggregations apply aggregate functions to the incoming data and the current state to create new state. The state can optionally be grouped by a list of expressions.
Tektite supports both non-windowed aggregations and windowed aggregations.
Aggregations are defined using the aggregate
operator and support the following aggregate functions:
count
- compute the count of rows seenmin
- compute the minimum of values seenmax
- compute the maximum of values seensum
- compute the sum of values seenavg
- compute the average (mean) of values seen
The operand to the aggregate function can be any valid Tektite expression which evaluates to a type that that is compatible with the aggregate function:
count
can be used with any typemin
requires operand typeint
,float
,decimal
,string
,bytes
,timestamp
. Forstring
andbytes
, a lexicographical comparison is performedmax
requires operand typeint
,float
,decimal
,string
,bytes
,timestamp
. Forstring
andbytes
, a lexicographical comparison is performedsum
requires operand typeint
,float
,decimal
avg
requires operand typeint
,float
,decimal
,timestamp
Non-windowed aggregations
A non-windowed aggregation computes the aggregate functions over all incoming data received.
Here's an example that computes the min
, max
, count
and sum
of all sales received so far.
sales_totals := sales_topic ->
(partition by const partitions = 1) ->
(aggregate min(amount), max(amount), count(amount), sum(amount))
The partition
operator is necessary as the incoming data is already partitioned into multiple partitions, but we want to compute
a single row over all data, so we need to repartition the incoming data into a single partition before computing the
aggregation. If we didn't do this, we'd end up with a totals row per partition.
This can then be queried:
tektite> (scan all from sales_totals);
+---------------------------------------------------------------------------------------------------------------------+
| event_time | min(amount) | max(amount) | count(amount) | sum(amount) |
+---------------------------------------------------------------------------------------------------------------------+
| 2024-05-14 07:21:07.851000 | 85.23 | 123.99 | 16 | 1751.28 |
+---------------------------------------------------------------------------------------------------------------------+
You could also store the updates as a new stream - the stream would have a new entry every time the totals were updated:
sales_totals_updates := sales_totals -> (store stream);
tektite> (scan all from sales_totals_updates);
+--------------------------------------------------------------------------------------------------------------------+
| offset | event_time | min(amount) | max(amount) | count(amount) | sum(amount) |
+--------------------------------------------------------------------------------------------------------------------+
| 0 | 2024-05-14 07:36:02.928000 | 85.23 | 123.99 | 17 | 1836.51 |
| 1 | 2024-05-14 07:36:03.926000 | 85.23 | 123.99 | 18 | 1921.74 |
| 2 | 2024-05-14 07:36:04.878000 | 85.23 | 123.99 | 19 | 2006.97 |
| 3 | 2024-05-14 07:36:05.801000 | 85.23 | 123.99 | 20 | 2092.20 |
| 4 | 2024-05-14 07:36:06.624000 | 85.23 | 123.99 | 21 | 2177.43 |
| 5 | 2024-05-14 07:36:07.651000 | 85.23 | 123.99 | 22 | 2262.66 |
| 6 | 2024-05-14 07:36:08.521000 | 85.23 | 123.99 | 23 | 2347.89 |
| 7 | 2024-05-14 07:36:09.377000 | 85.23 | 123.99 | 24 | 2433.12 |
+--------------------------------------------------------------------------------------------------------------------+
8 rows returned
Or you could expose the update as a Kafka topic, so they can be consumed by any Kafka consumer:
sales_totals_updates := sales_totals -> (kafka out);
Grouping data in aggregations
Often, instead of computing a single row in an aggregation, you want to calculate the aggregate functions over subsets of the data.
For example, you might want to calculate min
, max
, count
and sum
of sales keyed by country. To do this
you provide one or more key expressions in the aggregate
as follows:
sales_totals := sales_topic ->
(partition by country partitions = 16) ->
(aggregate min(amount), max(amount), count(amount), sum(amount) by country)
tektite> (scan all from sales_totals);
+-------------------------------------------------------------------------------------------------------------------+
| event_time | country | min(amount) | max(amount) | count(amount) | sum(amount) |
+-------------------------------------------------------------------------------------------------------------------+
| 2024-05-14 07:53:54.146000 | uk | 56.23 | 85.23 | 6 | 415.38 |
| 2024-05-14 07:54:35.669000 | usa | 23.23 | 56.23 | 5 | 215.15 |
+-------------------------------------------------------------------------------------------------------------------+
Note that we repartition the incoming data by the country
field as that is what we are grouping by. This ensures all incoming
data for the same value of country
ends up on the same partition so the aggregation can be calculated correctly.
The partition step won't be necessary if the incoming data is already partitioned on the country
column.
You can also key by multiple expressions:
city_totals := sales_topic ->
(partition by country, city partitions = 16) ->
(aggregate min(amount), max(amount), count(amount), sum(amount) by country, city)
tektite> (scan all from city_totals);
+------------------------------------------------------------------------------------------------------------------------------+
| event_time | country | city | min(amount) | max(amount) | count(amount) | sum(amount) |
+------------------------------------------------------------------------------------------------------------------------------+
| 2024-05-14 07:53:54.146000 | uk | london | 76.23 | 85.23 | 3 | 246.69 |
| 2024-05-14 07:53:44.438000 | uk | manchester | 56.23 | 56.23 | 3 | 168.69 |
| 2024-05-14 07:54:24.980000 | usa | austin | 23.23 | 23.23 | 2 | 46.46 |
| 2024-05-14 07:54:35.669000 | usa | miami | 56.23 | 56.23 | 3 | 168.69 |
+------------------------------------------------------------------------------------------------------------------------------+
Windowed aggregations
Very commonly, you don't want to compute aggregations over all data but over some window, e.g. in a window of a day or an hour. You accomplish this with a windowed aggregation.
A windowed aggregation is similar to a non-windowed aggregation but has extra parameters to define the window size
and
hop
.
Window size
is a duration that represents how long a window stays open, for example, if you wanted to aggregate
sales by hour, the window size would be 1 hour.
Window hop
is also a duration that represents the time between opening new windows. If hop
is equal to size
then at
any particular time we only have one window open. This is known in other systems as a hopping window.
If hop
is less than size, then we can have multiple open windows at any one time. For example, we might have size
set
to 1 hour, and hop
set to 10 minutes. This gives us more timely results as we don't have to wait an hour to get the most
recent sales results, we will get updated results every 10 minutes.
Here's an example of the sales aggregation from before, but this time using a windowed aggregation:
sales_by_country_by_hour := sales_topic ->
(partition by country partitions = 16) ->
(aggregate min(amount), max(amount), count(amount), sum(amount)
by country size = 1h hop = 10m)
Note that durations are expressed as a positive integer followed by one of ms
for milliseconds, s
for seconds,
m
for minutes, h
for hours or d
for days.
Tektite maintains aggregate values for each open window, and results are emitted and visible when the window is closed. How do we determinate that?
Watermarks
Tektite uses watermarks to determine when aggregate windows can be closed.
At any one time, there can be multiple windows open for an aggregation depending on the values of size
and hop
.
As data arrives at the aggregate
operator it first determines which window(s) the event intersects with.
Each window is defined by the start time of the window ws
and the end time of the window we
.
A row intersects with a window if ws
< event_time
<= we
. Each incoming row has an event_time
column.
For each intersecting window, the aggregate expressions are evaluated for the incoming row.
It's not only rows that flow through the network of interconnecting streams in Tektite. Tektite also injects watermarks
at the places where data enters the system - bridge from
and kafka in
operators, and these flow through the streams
along all the routes that data can flow.
A watermark carries a timestamp with it. This timestamp means that no more data with an event_time
less than this timestamp
is expected to flow through the operator.
When the aggregate
operator receives a watermark it can close all open windows whose we
timestamp is <= the watermark timestamp.
In some systems, events can arrive late, and we might not want to close the window until some time after the watermark, this
is done by specifying the lateness
parameter.
Here we don't close open windows until 10 seconds after the timestamp carried by the watermark.
sales_by_country_by_hour := sales_topic ->
(partition by country partitions = 16) ->
(aggregate min(amount), max(amount), count(amount), sum(amount)
by country size = 1h hop = 10m lateness = 10s)
By default, watermarks are injected in bridge from
and kafka in
operators approximately 1 second after the event_time
of the highest event_time
seen on incoming data. This can be configured on those operators. For more information on
watermarks see the section on generating watermarks
When a window is closed the aggregate values for the last closed window becomes visible and can be queried:
tektite> (scan all from sales_by_country_by_hour);
+-----------------------------------------------------------------------------------------------------------------------------------------------+
| event_time | country | min(amount) | max(amount) | count(amount) | sum(amount) |
+-----------------------------------------------------------------------------------------------------------------------------------------------+
| 2024-05-14 10:51:17.962000 | uk | 56.23 | 85.23 | 5 | 339.15 |
| 2024-05-14 10:51:29.687000 | usa | 23.23 | 23.23 | 2 | 46.46 |
+-----------------------------------------------------------------------------------------------------------------------------------------------+
Note that the aggregate results are keyed on country
so we only store the most recent aggregate results per country. The
event_time
column represents the highest event_time
for any row which contributed towards that row of the aggregate results.
We could also create a new stream that receives updates from the aggregation or expose it as a Kafka consumer endpoint or siphon the results off to an external topic.
Sometimes you may want to retain aggregation results from all closed windows, not just the most recent one. You do this by
specifying the window_cols
parameter set to true
.
sales_by_country_by_hour := sales_topic ->
(partition by country partitions = 16) ->
(aggregate min(amount), max(amount), count(amount), sum(amount)
by country size = 1h hop = 10m window_cols = true)
Then closed windows will be output with additional columns ws
and we
and the key of the data will be [ws, we, country]
By default, a windowed aggregation stores results persistently in a table with the same name as the stream, when windows are closed. Sometimes you don't want results to be stored, you just want to emit results somewhere else. For example you might want to send results to an external Kafka topic or expose results as a Kafka topic in Tektite.
You set the store
parameter to false
to prevent the aggregate storing results.
Here's an example of creating a new (read-only) topic that exposes the latest aggregate sales figures from the existing topic
sales_topic
:
sales_by_country_by_hour_topic := sales_topic ->
(partition by country partitions = 16) ->
(aggregate min(amount), max(amount), count(amount), sum(amount)
by country size = 1h hop = 10m store = false) ->
(kafka out)
And here's an example of computing latest aggregate sales figures and outputting them directly to an external topic in Apache Kafka
sales_by_country_by_hour_out := sales_topic ->
(partition by country partitions = 16) ->
(aggregate min(amount), max(amount), count(amount), sum(amount)
by country size = 1h hop = 10m store = false) ->
(bridge to external_sales_figures props = ("bootstrap.servers" = "foo.com:9092")))
Here's an example of computing latest aggregate sales figures without persisting them and using a store table
operator
to explicitly persist the results in a table.
sales_by_country_by_hour_with_table := sales_topic ->
(partition by country partitions = 16) ->
(aggregate min(amount), max(amount), count(amount), sum(amount)
by country size = 1h hop = 10m store = false) ->
(store table by country)
Data retention
If you don't want to keep the results of your aggregation forever, you can set a maximum retention time on it. Data will be deleted asynchronously from the aggregation once that time has been exceeded.
The following will delete the aggregate results after 1 day:
sales_by_country_by_hour := sales_topic ->
(partition by country partitions = 16) ->
(aggregate min(amount), max(amount), count(amount), sum(amount)
by country size = 1h hop = 10m retention = 1d)