Skip to content

Topics

Tektite allows you to create topics and access them from any Kafka-compatible client. The topics have the same characteristics as they would in other Kafka compatible event streaming platforms - they are persistent and partitioned.

With Tektite a topic is just a type of stream. It's a stream that takes input data from external Kafka producers and outputs data to external Kafka consumers.

Creating topics

The simplest way to create a topic is by using the topic operator.

my-topic := (topic partitions = 16)

This creates a topic called my-topic with 16 partitions. It's just a simple stream composed of a single topic operator.

The number of partitions is mandatory when creating a topic.

Deleting topics

Topics are deleted just like any stream:

delete(my-topic)

Topic magic

The simple topic stream above is actually equivalent to the following stream:

my-stream := (kafka in partitions = 16) -> (kafka out)

The kafka in operator exposes a Kafka compatible producer endpoint. I.e. it accepts messages from any Kafka compatible producer. The produced data is sent to a kafka out which stores it persistently and exposes a Kafka compatible consumer endpoint so the data can be consumed from any Kafka compatible consumer.

The topic operator, under the hood, basically instantiates a kafka in followed by a kafka out as above. We provide a distinct topic operator as creating a simple vanilla topic is a very common thing to do in Tektite.

However, sometimes you want to do interesting things with topics that you can't do in existing event streaming platforms, and that's where a separate kafka in and kafka out operator become useful.

Filtered topics

How about a server side filter?

filtered-topic :=
    (kafka in) -> (filter by matches(to_string(val), "^hello")) -> (kafka out)

This creates a Kafka topic called filtered-topic which accepts any messages produced to it but only stores and exposes for consumption those where the message body starts with the string "hello".

Exposing an existing stream to Kafka consumers

Let's say you have an existing stream in Tektite called my-stream. You can expose it to Kafka consumers using a kafka out operator.

exposed-stream := my-stream -> (kafka out)

This will create a Kafka topic called exposed-stream which any Kafka compatible consumer can consume. Note that this topic has no kafka in so you cannot produce to it. It's a read-only topic!

Perhaps you have an existing topic living in your existing Apache Kafka installation - and you want to expose that to Kafka consumers after transforming it in some way. No problem!

upper-cased-topic :=
    (bridge from my-kafka-topic props = (bootstrap.servers="...")) ->
    (project key, hdrs, to_bytes(to_upper(to_string(val)))) ->
    (kafka out)

The possibilities are endless.

Write only topics

Sometimes you want only to expose the Kafka producer endpoint, not a consumer endpoint.

latest_sensor_readings :=
    (kafka in partitions = 32) ->
    (to table key = to_string(key))

This creates a topic called latest-sensor-readings that can only be produced to, as there is no kafka out operator.

As data arrives it is stored in a table which is keyed on the bytes in the Kafka message key converted to a string. A table only stores the latest value for the key, so if the key was the sensor id of a IoT sensor this would store the latest message from each sensor.

It could then be queried or used as input to other streams.

Perhaps you just want to count the number of readings in the last 5 minutes grouped by sensor

readings_last_5_mins :=
    (kafka in partitions = 16) ->
    (aggregate count(val) by to_string(key) size 5m hop 10s)

This would create a topic called readings_last_5_mins that can only be produced to. As messages arrive a windowed aggregation is maintained that counts the number of readings in the last 5 minutes, grouped by sensor.

The results of the aggregation can be queried or used as input to another stream.

The building blocks can be put together in many ways to do things with topics you never thought were possible.

The operator schema

Operators typically have an input and an output schema and the topic and kafka in and kafka out operators are no exception.

The input and output schema are the same, and they model the structure of a Kafka message

Here's the schema. Each column is written as column_name:column_type

offset: int
event_time: timestamp
key: bytes
val: bytes
hdrs: bytes

The offset column corresponds to the Kafka message offset in the partition. It's assigned by Tektite on receipt of the message before it is output from the kafka out operator.

The event_time column corresponds to the timestamp of the Kafka message. This can be assigned on the client or on the server depending on configuration (see kafka-use-server-timestamp configuration property). In Tektite all streams (not just topics) have an event_time.

The key field corresponds to the key of the Kafka message. It is an arbitrary byte string. It is optional and can be null

The val field corresponds to the body of the Kafka message. It is also a byte string.

The hdrs field corresponds to the raw headers of the Kafka message. It is a byte string and can be null

An operator that takes input from a kafka in operator will receive data with this schema. Similarly, the input to a kafka out operator must have this schema, or the Kafka consumer won't be able to understand it.

Using expressions to extract typed data

Commonly, your Kafka messages will contain structured data, e.g. in JSON format, and you want to perform filters or projections based on fields in that data.

Let's say your messages are from IoT sensors. The message key is the sensor_id encoded as a UTF-8 string. The message body is JSON with the following format:

{
  "country": "UK",
  "area_code": 1234,
  "temperature": 23.56
}

The message also contains a header called model_version containing the version of the sensor.

We want a topic that only stores messages from the UK from sensors with version = "12.23". We can use a filter with an expression that extracts values from the message.

The function library contains functions such as json_string and json_int which extract the named JSON field from the named column in the schema. There's also a function kafka_header which extracts the named Kafka header from the raw headers.

filtered_readings :=
    (kafka in partitions = 16) -> 
    (filter by json_string("country", val) == "UK" &&
        to_string(kafka_header("model_version", hdrs)) == "12.23") ->
    (kafka out)

Or perhaps we have a simple topic, and we want to hang an aggregation of it:

my_readings := (topic partitions = 16)

sensor_readings_by_country :=
   my_readings -> 
       (project json_string("country", val) as country,
                json_int("area", val) as area,
                json_float("temperature") as temp) ->
       (aggregate max(temp), min(temp), avg(temp) by country, area)        

Other times when you have an existing stream that you want to expose as a Kafka consumer endpoint, you want to convert the schema of the existing stream into the schema that the kafka out operator requires. You can use a projection to do this.

For example, let's say you have an existing stream sales with the schema

tx_id: string
cust_id: string
product_id: string
amount: int
price: decimal(10, 2)

And you want to convert this into an output topic where the key of the Kafka message is the tx_id and the message body is JSON of the form:

{
  "id": "tx12345",
  "customer_id": "cust46464",
  "product_id": "prod67676",
  "amount": 23,
  "price": "99.99"
}

Then you can create a new stream that exposes the data to Kafka consumers by using the built-in sprintf function to format the JSON string

transactions := ... // existing stream

transactions_out := transactions ->
   (project to_bytes(tx_id), 
            to_bytes(sprintf(("{\"id\": %s,
              \"customer_id\": %s,
              \"product_id\": %s,
              \"amount\": %d,
              \"price\": %s}",
                tx_id,
                cust_id,
                product_id,
                amount,
                to_string(price))),
            null) ->
   (kafka out)         

The input schema to the kafka out operator requires [key:bytes, val:bytes, hdrs:bytes] and that's what the project operator will output as it has three expressions each of which return a value of type bytes.

Data retention

If you don't want to keep the data in your topic forever, you can set a maximum retention time on it. Data will be deleted asynchronously from the topic once that time has been exceeded.

This will create a topic that keeps data up to 7 days:

topic-with-retention := (topic partitions = 16 retention = 7d)

The duration string is of the form of a positive integer followed by a symbol ms = milliseconds, s = seconds, m = minutes, h = hours, d = days.

Retention can also be specified on a kafka out operator

filtered :=
    (kafka in partitions = 16) ->
    (filter by len(val) > 1000) ->
    (kafka out retention = 2h)

Generating watermarks

Watermarks are automatically generated at a kafka in operator. Please see the section on generating watermarks for more information.