Xem lại phần 1: Tìm hiểu cơ chế hoạt động của Apache Kafka (phần 1)

Xem lại phần 2: Tìm hiểu cơ chế hoạt động của Apache Kafka (phần 2)

Bước 1: cài đặt kafka server

Điều kiện tiên quyết là phải cài đặt được Kafka để lấy được thông tin kết nối, về phần cài đặt thì các bạn có thể xem qua hai phần trước, cài đặt thông qua Docker, cài đặt cục bộ trên Windows, hoặc cài trên một server nhân linux như Centos 7.

Bước 2: dựng project và cấu hình

tạo một project spring boot từ spring starter, có cấu trúc source code như sau

Thực hiện việc cấu hình producer và consumer ở file KafkaConfiguration.

package com.cafeincode.kafka.config;

import lombok.NoArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
@NoArgsConstructor
public class KafkaConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.key-serializer}")
    private String keySerializer;

    @Value("${spring.kafka.value-serializer}")
    private String valueSerializer;

    @Value("${spring.kafka.key-deserializer}")
    private String keyDeSerializer;

    @Value("${spring.kafka.value-deserializer}")
    private String valueDeSerializer;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);

        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }


    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeSerializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeSerializer);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
package com.cafeincode.kafka.constant;

public class Constant {

    public interface KAFKA {
        public static final String TOPIC_SEND_MESSAGE = "topic_send_sms";
        public static final String GROUP_ID = "group_send_sms_id";
    }
    
}

class ConsumerMessage thực hiện việc lắng nghe tin nhắn từ topic, ở đây mình xử lý ghi log khi có tin nhắn được gửi đến topic.

Các bạn hoàn toàn có thể áp dụng để làm các thao tác khác như là đẩy vào queue rồi xử lý logic phù hợp tùy vào dự án của bạn.

package com.cafeincode.kafka.service;

import com.cafeincode.kafka.constant.Constant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class ConsumerMessage {

@KafkaListener(topics = Constant.KAFKA.TOPIC_SEND_MESSAGE, groupId = Constant.KAFKA.GROUP_ID)
public void consume(String jsonData) {
log.info(String.format("#### -> Consumed message-> %s", jsonData));
}
}

class ProducerMessage thực hiện việc gửi tin nhắn vào topic.

package com.cafeincode.kafka.service;

import com.cafeincode.kafka.constant.Constant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class ProducerMessage {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String smsJsonData) {
log.info(String.format("#### -> Producing message -> %s", smsJsonData));
this.kafkaTemplate.send(Constant.KAFKA.TOPIC_SEND_MESSAGE, smsJsonData);
}
}

cấu hình trong file application.properties, các bạn cài Kafka cục bộ trên máy tính thì trỏ đường dẫn như mình, cài trên server thì điền đường dẫn Kafka server.

spring.kafka.bootstrap-servers: localhost:9092
spring.kafka.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.value-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
server.port=8888

class TestController thì mình viết một cái rest api để thực hiện một tác vụ đơn giản, đó là gửi tin nhắn đến topic và theo dõi xem tin nhắn từ topic đã được xử lý hay chưa.

package com.cafeincode.kafka.controller;

import com.cafeincode.kafka.service.ProducerMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Random;

@RestController
public class TestRestController {

@Autowired
private ProducerMessage producerMessage;

@GetMapping(value = "/send-message")
public String sendSMSToKafka() {
System.out.println("----------------START SEND-----------");
String mes = "Sending message to kafka topic by cafeincode:" + new Random().nextInt(100);
producerMessage.sendMessage(mes);
System.out.println("----------------END SEND-------------");
return "OK";
}
}

Bước 3: Testing

khởi chạy ứng dụng và gõ vào trình duyệt http://localhost:8888/send-message

Thành công, vậy là mình đã hướng dẫn xong cách dùng Kafka để làm một hệ thống Pub-Sub với spring boot, cám ơn đã theo dõi.

Một số bài viết liên quan:

Code xử lý chi tiết tại đây: cafeincodeio/kafka-example