APACHE KAFKA STREAMS API WITH SAMPLE SCENARIOS

METEHAN KARA
4 min readJun 26, 2024

--

Apache Kafka Streams API Scenario

Apache Kafka has revolutionized the way we handle real-time data streams with its robust and scalable architecture. However, processing these streams efficiently requires more than just a powerful message broker. Enter Kafka Streams API, a powerful library that allows developers to build sophisticated stream processing applications with ease. Kafka Streams API enables the real-time processing and analysis of data flowing through Kafka topics, making it an essential tool for applications ranging from log analysis and anomaly detection to real-time analytics and user session management. In this blog post, we will delve into what Kafka Streams API is, how it works, and why it is a game-changer for real-time data processing. Here some example scenarios for Kafka Streams API usage:

1. KStream

A KStream represents an unbounded, continuously updating stream of records where each record is a key-value pair. KStreams are the core abstraction for processing real-time data in Kafka Streams. You can perform stateless operations like filtering, mapping, and flatMapping on KStreams.

Scenario: Real-time log analysis and anomaly detection

In an e-commerce site, user activities (searches, product views, cart additions, purchases, etc.) are continuously recorded and sent to Kafka. These log data can be analyzed in real-time to detect anomalies.

Implementation:

  • User activities are written to a Kafka topic.
  • KStream reads these log data.
  • Anomalies are detected (e.g., a high number of purchases in a very short time).
  • Anomalies are written to another Kafka topic, and alerts are sent if necessary.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> userActivities = builder.stream("user-activities");

KStream<String, String> anomalies = userActivities
.filter((key, value) -> isAnomaly(value));

anomalies.to("anomalies");

2. KTable

A KTable represents a changelog stream, where each record is an update for a specific key. KTables provide a way to handle stateful processing, where the latest value for each key is maintained. They are essentially a materialized view of the latest state.

Scenario: Real-time product inventory management

In an online store, the stock status of each product is continuously updated. KTable holds the most current stock information for each product, providing real-time access.

Implementation:

  • Product stock information is written to a Kafka topic.
  • KTable reads these stock data.
  • The most current stock status for each product is maintained and can be queried.
StreamsBuilder builder = new StreamsBuilder();
KTable<String, Integer> productInventory = builder.table("product-inventory");

productInventory.toStream().foreach((productId, stock) -> {
System.out.println("Product ID: " + productId + " Stock: " + stock);
});

3. Topology

A Topology is a directed acyclic graph (DAG) of stream processing nodes (processors) that represents the entire data processing logic of a Kafka Streams application. Each node in the topology corresponds to a transformation, and edges represent the data flow between transformations.

Scenario: Building a real-time data processing pipeline

A financial services company processes financial transactions from various data sources to generate customer reports. Topology defines this data processing pipeline.

Implementation:

  • Transactions from different sources are written to Kafka topics.
  • These transactions are processed using Topology (filtering, transforming, aggregating, etc.).
  • Processed data are written to another Kafka topic for reporting.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> transactions = builder.stream("transactions");

KStream<String, String> processedTransactions = transactions
.filter((key, value) -> isValidTransaction(value))
.mapValues(value -> processTransaction(value));
processedTransactions.to("processed-transactions");

Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();

4. Processor

A Processor is a node in the topology that performs a specific processing operation on the input data. Kafka Streams provides the Processor API for defining custom processing logic. You can use the Processor API to implement complex transformations and stateful operations.

Scenario: Customized data processing

A news website wants to moderate comments on articles in real-time. A customized Processor can analyze comments and process them based on specific criteria (e.g., inappropriate language).

Implementation:

  • User comments are written to a Kafka topic.
  • Processor analyzes the comments and detects inappropriate ones.
  • Inappropriate comments are forwarded to moderators.
class CommentProcessor extends AbstractProcessor<String, String> {
@Override
public void process(String key, String value) {
if (isInappropriate(value)) {
context().forward(key, value);
}
}
}

StreamsBuilder builder = new StreamsBuilder();
builder.stream("comments")
.process(() -> new CommentProcessor())
.to("moderated-comments");

5. State Stores

State Stores are used to store and query data within a Kafka Streams application. They are essential for stateful processing operations, such as aggregations and joins. State Stores can be in-memory or persistent, and Kafka Streams ensures their fault-tolerance by backing them up to Kafka topics.

Scenario: User session management

A web application wants to manage user sessions and store some state information for each session. State Store locally stores these session details and makes them queryable.

Implementation:

  • User session information is written to a Kafka topic.
  • State Store stores the session information.
  • Session information can be queried when needed.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> userSessions = builder.stream("user-sessions");

KGroupedStream<String, String> groupedSessions = userSessions.groupByKey();
KTable<String, String> sessionStore = groupedSessions.reduce(
(aggValue, newValue) -> newValue,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("session-store")
);

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

ReadOnlyKeyValueStore<String, String> keyValueStore =
streams.store(StoreQueryParameters.fromNameAndType("session-store", QueryableStoreTypes.keyValueStore()));
String sessionInfo = keyValueStore.get("user-id-123");

Summary

Understanding these key concepts — KStream, KTable, Topology, Processor, and State Stores — provides a solid foundation for building robust and scalable stream processing applications with Kafka Streams. By leveraging these abstractions, developers can create complex data processing pipelines that operate in real-time, ensuring timely insights and actions based on streaming data.

  • KStream: Real-time log analysis and anomaly detection.
  • KTable: Real-time product inventory management.
  • Topology: Building a real-time data processing pipeline.
  • Processor: Customized data processing (e.g., comment moderation).
  • State Stores: User session management and state storage

--

--