Bridging to and from external topics
You can connect a stream to an external topic that lives in any Kafka compatible server - it could an existing Apache Kafka or RedPanda instance, or another Tektite cluster. We call this bridging to an external topic. Bridges can be set up to consume messages from an external topic into the stream or to produce messages from the stream to the external topic.
Bridges are resilient; they cope with temporary unavailability of the external topic, reconnecting automatically when the server is available again.
Bridging from an external topic
To bridge from an external topic and consume messages into your stream you use the bridge from
operator. This operator
will internally maintain a set of Kafka message consumers which consume messages and make them available in your stream.
Here's an example of creating a stream that consumes from an external topic called external-topic
.
The messages contain JSON
which have a string field called country
. The stream filters out any messages which don't have country == UK
then
exposes the stream as a Tektite (read-only) topic which can be consumed by any Kafka consumer.
uk_sales :=
(bridge from external-sales partitions = 16
props = ("bootstrap.servers" =
"mykafka1.foo.com:9092, mykafka2.foo.com:9092")) ->
(filter by json_string("country", val) == "UK") ->
(kafka out)
Here's another example, where we consume from an external topic and maintain a windowed aggregation which captures total number and value of sales per country in a one-hour window with a 10-minute hop.
uk_sales_totals :=
(bridge from external-sales partitions = 16
props = ("bootstrap.servers" =
"mykafka1.foo.com:9092, mykafka2.foo.com:9092")) ->
(project json_string("country", val) as country,
to_decimal(json_string("value", val), 10, 2) as value) ->
(partition by country partitions = 10) ->
(aggregate count(value), sum(value) by country size = 1h hop = 10m)
Note that partitions
must be specified and must correspond to the number of partitions in the external topic.
The props
parameter is used to provide properties to the Kafka client. The bootstrap.servers
property, at minimum
must be provided. This must contain the addresses of one or more Kafka servers that host the external topic in a comma
separated list.
By default, only messages arriving at the external topic after the bridge from
was created will be consumed. If you want
to consume all existing messages in the external topic you can set the auto.offset.reset
property to earliest
. The default
value is latest
.
The bridge from
operator has some parameters that can be set:
max_poll_messages
: Maximum number of messages to consume (if available) and pass to the stream in a single batch. Defaults to1000
poll_timeout
: Maximum time to wait polling for messages to arrive before polling again. Defaults to50ms
The bridge from
also generates watermarks.
Bridging to an external topic
To send messages from your stream to an external topic you use the bridge to
operator.
The bridge to
will automatically
reconnect to the external server after unavailability. If the server is unavailable pending messages to send will be stored
in Tektite so the local stream can continue operating. When the server becomes available again, pending messages will be
sent.
Here's an example of bridging from a local stream to an external topic called external-topic
out-stream := cust-updates -> (filter by area == "USA") ->
(bridge to external-topic props =
("bootstrap.servers" = "mykafka1.foo.com:9092, mykafka2.foo.com:9092"))
Here's an example of a local write-only Tektite topic which immediately forwards its messages on to an external topic:
sales-send-proxy := (topic partitions = 16) ->
(bridge to sales props =
("bootstrap.servers" = "mykafka1.foo.com:9092, mykafka2.foo.com:9092"))
Please note, that the partition number is maintained when sending to the external topic.
I.e. if a message is in partition 12 in the local stream it will be sent to partition 12 in the external topic, so you must ensure that the number of partitions in your local stream matches the number of partitions in the external topic. If it doesn't you should repartition your stream before sending.
For example, if the external topic has 50 partitions:
out-stream := cust-updates -> (filter by area == "USA") ->
(partition by customer_id partitions = 50) ->
(bridge to external-topic props =
("bootstrap.servers" = "mykafka1.foo.com:9092, mykafka2.foo.com:9092"))
The bridge to
operator has some parameters that can be set:
retention
: Maximum time to keep locally stored messages. Default is24h
initial_retry_delay
: When target server is unavailable how long to wait before initially retrying to connect. Default is5s
max_retry_delay
: When reconnecting, retry delay automatically increases up to a maximum of this value. Default is30s
connect_timeout
: How long to wait for a connection before considering it failed. Default is5s
send_timeout
: How long to wait for sending a batch to complete before considering it failed. Default is2s