Hot Posts

hot/hot-posts

Kafka Producer and Consumer



1. Project Setup

  • Create a Spring Boot Project: Use Spring Initializr to generate the project with these dependencies:
    • spring-boot-starter-web: For basic web support (for HTTP endpoints).
    • spring-kafka: For Kafka integration.
    • spring-boot-starter-logging: Includes slf4j-api (for logging).
    • spring-boot-starter-validation: For validating incoming request parameters.

2. Kafka Configuration

Kafka Config Class:

The Kafka configuration can be improved by adding more advanced properties, handling retry logic, and ensuring message serialization is done properly.

java

package com.example.kafka.config; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.listener.config.ConcurrentMessageListenerContainerFactory; import org.springframework.kafka.listener.config.MessageListenerContainerFactory; import org.springframework.kafka.transaction.KafkaTransactionManager; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id}") private String consumerGroup; @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.ACKS_CONFIG, "all"); // Ensures message durability return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Ensure processing from the start return new DefaultKafkaConsumerFactory<>(configProps); } @Bean public ConcurrentMessageListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentMessageListenerContainerFactory<String, String> factory = new ConcurrentMessageListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); // Process messages in parallel return factory; } }

3. Producer Service

To enhance the producer service:

  • Add error handling and logging.
  • Use async message sending for better performance and scalability.
java

package com.example.kafka.service; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Service public class ProducerService { private static final Logger logger = LoggerFactory.getLogger(ProducerService.class); private final KafkaTemplate<String, String> kafkaTemplate; public ProducerService(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendMessage(String topic, String message) { try { kafkaTemplate.send(topic, message) .addCallback( result -> logger.info("Message sent successfully to topic {}: {}", topic, message), ex -> logger.error("Error sending message to topic {}: {}", topic, message, ex) ); } catch (Exception e) { logger.error("Error sending message: {}", message, e); } } }

4. Consumer Service

The consumer service can be enhanced by adding better error handling, logging, and improving message processing.

  • Use @KafkaListener annotation.
  • Enhance error handling and logging.
  • Consider retrying logic if the message processing fails.
java

package com.example.kafka.service; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Service public class ConsumerService { private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class); @KafkaListener(topics = "myTopic", groupId = "myGroup") public void consume(String message) { try { logger.info("Received message: {}", message); // Simulate message processing processMessage(message); } catch (Exception e) { logger.error("Error processing message: {}", message, e); } } private void processMessage(String message) { // Implement your message processing logic here logger.info("Processed message: {}", message); } }

5. Controller to Trigger Message Publishing

You may want to expose a REST endpoint to allow message publishing.

java
package com.example.kafka.controller; import com.example.kafka.service.ProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class MyController { private final ProducerService producerService; @Autowired public MyController(ProducerService producerService) { this.producerService = producerService; } @GetMapping("/publish") public String publish(@RequestParam("message") String message) { producerService.sendMessage("myTopic", message); return "Message published to topic 'myTopic'"; } }

6. Application Properties

In your application.properties, configure Kafka properties:

properties

# Kafka Bootstrap Servers spring.kafka.bootstrap-servers=localhost:9092 # Consumer Group ID spring.kafka.consumer.group-id=myGroup # Kafka Consumer Configuration spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=false # Kafka Producer Configuration spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # Kafka Listener Configuration spring.kafka.listener.concurrency=3

7. Error Handling and Retries

Producer:

To handle retries and backoff for sending messages, you can use Spring Kafka's built-in support for retry mechanisms:

java
@Bean public KafkaTemplate<String, String> kafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory()); kafkaTemplate.setProducerListener(new ProducerListener<String, String>() { @Override public void onError(ProducerRecord record, Exception exception) { logger.error("Error sending record {}: {}", record, exception.getMessage()); } // Additional methods can be overridden for other producer events }); return kafkaTemplate; }

Consumer:

To implement retry logic in consumers, use @Retryable annotation for automatic retries or customize retry behavior in the Kafka listener.

java
import org.springframework.retry.annotation.Retryable;
import org.springframework.retry.annotation.Backoff; @Service public class ConsumerService { @Retryable(maxAttempts = 3, backoff = @Backoff(delay = 5000)) @KafkaListener(topics = "myTopic", groupId = "myGroup") public void consume(String message) { logger.info("Received message: {}", message); processMessage(message); } private void processMessage(String message) { // Your message processing logic } }

8. Scaling and Partitioning

If you expect high throughput, you can scale consumers by configuring Kafka partitions:

  • Partitioning: By default, Kafka creates one partition per topic. You can increase the number of partitions to allow parallel consumption of messages. This helps balance the load among multiple consumer instances.
  • Concurrency: Configure the @KafkaListener container concurrency level to enable parallel message processing.
properties

# Set the number of partitions for topic (on Kafka broker side) spring.kafka.topic.partitions=5

9. Run the Application

  • Start the Spring Boot application using mvn spring-boot:run.
  • Send a message using the /publish endpoint:
    • GET /publish?message=HelloKafka
  • The consumer will receive and process the message.

Conclusion:

This setup enhances your Kafka producer-consumer system in Spring Boot by:

  1. Adding advanced Kafka configuration for better performance and durability.
  2. Improving error handling and logging.
  3. Providing parallel processing with increased concurrency and retry mechanisms.
  4. Allowing for flexible scaling and message processing.

With this approach, you now have a production-grade Kafka producer-consumer system built with Spring Boot.

Post a Comment

0 Comments