Spring Boot Kafka example
The goal of this tutorial is to create a working project with Spring Boot and Kafka to start the development without spending too much time playing with the Kafka configuration. This is useful if you need to create a Proof of Concept or learn / test in your environment
Our focus here is Spring Boot.
You can find the code source of this example on GitHub: https://github.com/marco76/demo-spring-kafka
The example has been update with Spring Boot 3.1.4.
Kafka quick-start with Docker
To quickly start with Kafka we use the docker compose that instantiate the broker, the registry and a UI to monitor the Kafka cluster.
Our yaml is a ‘light’ version of the one provided by Kafka UI available here: https://github.com/provectus/kafka-ui/blob/master/documentation/compose/kafka-ui.yaml
In our yaml we have only one Cluster and we removed Kafka Connect.
version: '2'
services:
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8082:8080
depends_on:
- zookeeper0
- kafka0
- schemaregistry0
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181
KAFKA_CLUSTERS_0_JMXPORT: 9997
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085
zookeeper0:
image: confluentinc/cp-zookeeper:5.2.4
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
kafka0:
image: confluentinc/cp-kafka:5.3.1
depends_on:
- zookeeper0
ports:
- 9092:9092
- 9997:9997
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
JMX_PORT: 9997
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997
schemaregistry0:
image: confluentinc/cp-schema-registry:5.5.0
ports:
- 8085:8085
depends_on:
- zookeeper0
- kafka0
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2181
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
SCHEMA_REGISTRY_HOST_NAME: schemaregistry0
SCHEMA_REGISTRY_LISTENERS: http://schemaregistry0:8085
SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
You can start the services with: docker-compose up
If you are running the Docker Desktop application you can have an overview of the containers:
When everything started you can access the UI: http://localhost:8082/ In the original docker-compose the UI uses the port 8080 but we changed it because it's conflicting with our Spring Boot instance.
The 3 existing Topics are internal and are not relevant for our tutorial.
Spring Boot
Our Spring Boot application simply:
- create a Topic if not existing already
- create a REST controller to receive messages from HTTP POST
- send a test message to the Topic
- receive the messages from the Topic and write it in the output stream
At the start send and initial message and a @Controller
allows to send messages using POST.
In our pom.xml
we have spring-kafka
as dependency. More information are in the official Spring for Apache Kafka documentation
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
Spring Boot Application
Our Application is super simple, we integrated the configuration in the application class for simplify the post, this is not recommended for your application:
@SpringBootApplication
public class DemoKafkaApplication {
private final String TOPIC_NAME = "kafka-spring-demo";
public static void main(String[] args) {
SpringApplication.run(DemoKafkaApplication.class, args);
}
/**
* With NewTopic we create a topic in kafka if it doesn't exist yet
*/
@Bean
public NewTopic topic() {
return TopicBuilder.name(TOPIC_NAME)
.partitions(5)
.replicas(1)
.build();
}
@KafkaListener(id = "kafka-spring-listener", topics = TOPIC_NAME )
public void listen(String message) {
System.out.println("message received:" + message);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send(TOPIC_NAME, "test message at bootstrap");
};
}
}
private final String TOPIC_NAME = "kafka-spring-demo";
we define the name of the Topic used in Kafka, in a production project this should be in the external configuration.
The @Bean topic
initialize a new topic in Kafka if it doesn't exist yet. We ca define some properties, like partitions and replicas. @KafkaListener
is the consumer and it waits for messages added to the topic. When a message is received it publish it in the output stream. In a real project this consumer will call the services that have to process the data received.
@Bean ApplicationRunner
send a test message when the application is started.
As you can notice the KafkaTemplate
is automatically wired by Spring without any explicit declaration.
Spring Boot controller
We can dynamically send messages to Kafka using REST.
In our @Controller
we declare KafkaTemplate
and we use it to send a message to the kafka topic specifying the topic name (‘kafka-spring-demo’) and the message (text).
@Autowired
private KafkaTemplate<Object, Object> template;
/**
* We create a rest controller that receives a text and forwards it to kafka
*/
@PostMapping(path = "/send/message/{text}")
public void sendFoo(@PathVariable String text) {
this.template.send("kafka-spring-demo", text);
}
(No) Configuration
In our application we didn’t do any special configuration. Spring Boot search a Kafka instance using the ‘standard’ ports:
INFO 11756 --- [-listener-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-kafka-spring-listener-1, groupId=kafka-spring-listener] Cluster ID: f0GOdnHLRquHKai9Gf4IfA
INFO 11756 --- [-listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-kafka-spring-listener-1, groupId=kafka-spring-listener] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)
INFO 11756 --- [-listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-kafka-spring-listener-1, groupId=kafka-spring-listener] (Re-)joining group
INFO 11756 --- [-listener-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-kafka-spring-listener-1, groupId=kafka-spring-listener] Resetting offset for partition kafka-spring-demo-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.
INFO 11756 --- [-listener-0-C-1] o.s.k.l.KafkaMessageListenerContainer : kafka-spring-listener: partitions assigned: [kafka-spring-demo-4, kafka-spring-demo-3, kafka-spring-demo-2, kafka-spring-demo-1, kafka-spring-demo-0]
You can override the default behavior in application.properties
:
spring.kafka.bootstrap-servers=localhost:9093
Running the application
After you start the application you can see that the new Topic has been created
and the test message
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send(TOPIC_NAME, "test message at bootstrap");
};
}
has been received by Kafka
… published … and received by the Spring listener
@KafkaListener(id = "kafka-spring-listener", topics = TOPIC_NAME )
public void listen(String message) {
System.out.println("message received:" + message);
}
The Spring Boot listener is visible in the UI in the Consumer category, as subscriber it receives the notification of a new messages from Kafka.
The listener receives the notification from Kafka and read the message, in the terminal you should see the following result:
INFO 75599 --- [-listener-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-kafka-spring-listener-1, groupId=kafka-spring-listener] Resetting offset for partition kafka-spring-demo-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=absent}}.
INFO 75599 --- [-listener-0-C-1] o.s.k.l.KafkaMessageListenerContainer : kafka-spring-listener: partitions assigned: [kafka-spring-demo-4, kafka-spring-demo-3, kafka-spring-demo-2, kafka-spring-demo-1, kafka-spring-demo-0]
message received:"test message at bootstrap"
If now you try to send a POST request to the @Controller curl -X POST
http://localhost:8080/send/message/hello
**
Spring Boot receives the request and send the message to the Topic queue in Kafka.
@PostMapping(path = "/send/message/{text}")
public void sendFoo(@PathVariable String text) {
this.template.send(TOPIC_NAME, text);
}
the terminal should show the initialization of the Spring DispatcherServlet and show the message
INFO 75599 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
INFO 75599 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
INFO 75599 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 17 ms
message received:"hello"
In the Kafka UI we can see that the message has been received