Kafka with Spring Boot With Project Setup

Kafka with Spring Boot

Starting a Spring Boot project with Kafka integration involves setting up Maven dependencies, configuring Kafka, and creating producers and consumers to establish a functional messaging system. This powerful combination allows for robust, scalable, and real-time event streaming applications.

Spring Boot Project Setup

To set up a Spring Boot project with Kafka support:

  1. Add dependencies to your pom.xml:


<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter</artifactId>

</dependency>

<dependency>

    <groupId>org.springframework.kafka</groupId>

    <artifactId>spring-kafka</artifactId>

</dependency>

  1. Create a KafkaConfig class for configurations:


@Configuration

@EnableKafka

public class KafkaConfig {

    @Bean

    public ConsumerFactory<String, String> consumerFactory() {

        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");

        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(config);

    }
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
  1. Create a KafkaProducerService:


@Service

public class KafkaProducerService {

    @Autowired

    private KafkaTemplate<String, String> kafkaTemplate;
private static final String TOPIC = "sample_topic";
public void sendMessage(String message) {
kafkaTemplate.send(TOPIC, message);
}
}
  1. Create a REST controller:

@RestController
@RequestMapping("/api/kafka")
public class KafkaController {
    @Autowired
    private KafkaProducerService producerService;
@GetMapping("/publish")
public String publishMessage(@RequestParam("message") String message) {
producerService.sendMessage(message);
return "Message Published!";
}
}
  1. Start Kafka and Zookeeper:

Starting Zookeeper:

.binwindowszookeeper-server-start.bat .configzookeeper.properties

Starting Kafka:

.binwindowskafka-server-start.bat .configserver.properties
  1. Run your Spring Boot application, ensuring your application.properties file has the correct Kafka settings.
Developer setting up Spring Boot project with Kafka dependencies

Creating Kafka Configuration

To configure Kafka more thoroughly:

  1. Configure ConsumerFactory:


@Bean

public ConsumerFactory<String, String> consumerFactory() {

    Map<String, Object> config = new HashMap<>();

    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

    config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");

    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

    return new DefaultKafkaConsumerFactory<>(config);

}

  1. Set up Kafka Listener:


@Bean

public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(consumerFactory());

    factory.setConcurrency(3);

    return factory;

}

  1. Configure ProducerFactory:


@Bean

public ProducerFactory<String, String> producerFactory() {

    Map<String, Object> config = new HashMap<>();

    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    config.put(ProducerConfig.ACKS_CONFIG, "all");

    config.put(ProducerConfig.RETRIES_CONFIG, 10);

    config.put(ProducerConfig.LINGER_MS_CONFIG, 1);

    return new DefaultKafkaProducerFactory<>(config);

}

  1. Create KafkaTemplate:


@Bean

public KafkaTemplate<String, String> kafkaTemplate() {

    return new KafkaTemplate<>(producerFactory());

}

These configurations enhance the stability and performance of your Kafka-based messaging system. By fine-tuning parameters such as retries, acknowledgments, and concurrency, you can optimize your application for different use cases and requirements.

Building Kafka Producer

To build a robust Kafka Producer for sending messages:

  1. Set up the KafkaProducerService:


@Service

public class KafkaProducerService {

    @Autowired

    private KafkaTemplate<String, String> kafkaTemplate;
private static final String TOPIC = "sample_topic";
public void sendMessage(String message) {
kafkaTemplate.send(TOPIC, message);
System.out.println("Message sent to topic: " + TOPIC);
}
}
  1. Create a RESTful controller:


@RestController

@RequestMapping("/api/kafka")

public class KafkaController {

    @Autowired

    private KafkaProducerService producerService;
@GetMapping("/publish")
public ResponseEntity<String> publishMessage(@RequestParam("message") String message) {
try {
producerService.sendMessage(message);
return ResponseEntity.ok("Message Published Successfully");
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Failed to Publish Message");
}
}
}
  1. Configure application.properties:


spring.kafka.bootstrap-servers=127.0.0.1:9092

spring.kafka.consumer.group-id=group_id

spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.consumer.enable-auto-commit=true

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

  1. Start your Spring Boot application.
  2. Test the producer using curl:


curl -X GET "http://localhost:8080/api/kafka/publish?message=Hello Kafka from Spring Boot"

This setup provides a functional Kafka Producer in a Spring Boot application with a RESTful endpoint for message publishing. The producer is designed to handle exceptions gracefully, returning appropriate HTTP status codes based on the success or failure of message publishing.

“Apache Kafka has become the de facto standard for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.” – Apache Kafka documentation

Building Kafka Consumer

To create a Kafka Consumer that will listen to and process messages from Kafka topics, we’ll set up a consumer class using the KafkaListener annotations. This allows our Spring Boot application to automatically handle incoming Kafka messages.

Define a new service class, KafkaConsumerService:



@Service

public class KafkaConsumerService {
@KafkaListener(topics = "sample_topic", groupId = "group_id")
public void listen(String message) {
System.out.println("Received Message: " + message);
// Implement your message processing logic here
}
}

The listen method is annotated with @KafkaListener, specifying which topic to listen to and the groupId for load sharing among multiple application instances.

Ensure your consumer configuration is set up in the KafkaConfig class:



@Configuration

@EnableKafka

public class KafkaConfig {
@Bean
public ConsumerFactory consumerFactory() {
Map config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
return factory;
}
}

This setup ensures the Kafka consumer is integrated and ready to handle incoming messages.

Running and Testing

To test your Kafka setup, follow these steps:

  1. Start Zookeeper:

    .binwindowszookeeper-server-start.bat .configzookeeper.properties
  2. Start Kafka:

    .binwindowskafka-server-start.bat .configserver.properties
  3. Run your Spring Boot application:

    mvn spring-boot:run
  4. Test the producer by sending a message:

    curl -X GET "http://localhost:8080/api/kafka/publish?message=TestMessageForKafka"
  5. Check your Spring Boot application logs for the consumer output:

    Received Message: TestMessageForKafka

This confirms that the consumer has successfully received and processed the message sent by the producer.

To further test the system, try sending multiple messages in succession and verify that the consumer handles them correctly. You can also experiment with different topics to ensure isolated message processing.

“Kafka’s distributed nature makes it an excellent choice for building scalable and fault-tolerant messaging systems.”

This setup provides a foundation for integrating Kafka with your Spring Boot application, enabling efficient message handling and scalability. With Kafka’s robust architecture, you can handle high-throughput data streams and build real-time data pipelines1.