Xem lại phần 1: Tìm hiểu cơ chế hoạt động của Apache Kafka (phần 1)
Xem lại phần 2: Tìm hiểu cơ chế hoạt động của Apache Kafka (phần 2)
Bước 1:
cài đặt kafka server
Điều kiện tiên quyết là phải cài đặt được Kafka để lấy được thông tin kết nối, về phần cài đặt thì các bạn có thể xem qua hai phần trước, cài đặt thông qua Docker, cài đặt cục bộ trên Windows, hoặc cài trên một server nhân linux như Centos 7.
Bước 2:
dựng project và cấu hình
tạo một project spring boot từ spring starter, có cấu trúc source code như sau
Thực hiện việc cấu hình producer và consumer ở file KafkaConfiguration.
package com.cafeincode.kafka.config;
import lombok.NoArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
@NoArgsConstructor
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.key-serializer}")
private String keySerializer;
@Value("${spring.kafka.value-serializer}")
private String valueSerializer;
@Value("${spring.kafka.key-deserializer}")
private String keyDeSerializer;
@Value("${spring.kafka.value-deserializer}")
private String valueDeSerializer;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.RETRIES_CONFIG, 1);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeSerializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeSerializer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
package com.cafeincode.kafka.constant;
public class Constant {
public interface KAFKA {
public static final String TOPIC_SEND_MESSAGE = "topic_send_sms";
public static final String GROUP_ID = "group_send_sms_id";
}
}
class ConsumerMessage thực hiện việc lắng nghe tin nhắn từ topic, ở đây mình xử lý ghi log khi có tin nhắn được gửi đến topic.
Các bạn hoàn toàn có thể áp dụng để làm các thao tác khác như là đẩy vào queue rồi xử lý logic phù hợp tùy vào dự án của bạn.
package com.cafeincode.kafka.service;
import com.cafeincode.kafka.constant.Constant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class ConsumerMessage {
@KafkaListener(topics = Constant.KAFKA.TOPIC_SEND_MESSAGE, groupId = Constant.KAFKA.GROUP_ID)
public void consume(String jsonData) {
log.info(String.format("#### -> Consumed message-> %s", jsonData));
}
}
class ProducerMessage thực hiện việc gửi tin nhắn vào topic.
package com.cafeincode.kafka.service;
import com.cafeincode.kafka.constant.Constant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class ProducerMessage {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String smsJsonData) {
log.info(String.format("#### -> Producing message -> %s", smsJsonData));
this.kafkaTemplate.send(Constant.KAFKA.TOPIC_SEND_MESSAGE, smsJsonData);
}
}
cấu hình trong file application.properties
, các bạn cài Kafka cục bộ trên máy tính thì trỏ đường dẫn như mình, cài trên server thì điền đường dẫn Kafka server.
spring.kafka.bootstrap-servers: localhost:9092
spring.kafka.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.value-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
server.port=8888
class TestController thì mình viết một cái rest api để thực hiện một tác vụ đơn giản, đó là gửi tin nhắn đến topic và theo dõi xem tin nhắn từ topic đã được xử lý hay chưa.
package com.cafeincode.kafka.controller;
import com.cafeincode.kafka.service.ProducerMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Random;
@RestController
public class TestRestController {
@Autowired
private ProducerMessage producerMessage;
@GetMapping(value = "/send-message")
public String sendSMSToKafka() {
System.out.println("----------------START SEND-----------");
String mes = "Sending message to kafka topic by cafeincode:" + new Random().nextInt(100);
producerMessage.sendMessage(mes);
System.out.println("----------------END SEND-------------");
return "OK";
}
}
Bước 3:
Testing
khởi chạy ứng dụng và gõ vào trình duyệt http://localhost:8888/send-message
Thành công, vậy là mình đã hướng dẫn xong cách dùng Kafka để làm một hệ thống Pub-Sub với spring boot, cám ơn đã theo dõi.
Một số bài viết liên quan:
- Tìm hiểu cơ chế hoạt động của Apache Kafka [Phần 1]
- Tìm hiểu cơ chế hoạt động của Apache Kafka [Phần 2]
- Câu chuyện phỏng vấn online mùa Covid
- Nói sao để được chào đón, làm thế nào để được ghi nhận
- 13 Plugin không thể thiếu khi làm việc với IntellIJ IDEA
- Những plugins Intellij IDEA tốt nhất trong công việc
- Crack Intellij IDEA new versions 2021
- Crack IntellIJ để code như một senior
- Shortcut Intellij hữu ích để làm việc được hiệu quả hơn
- Build hệ thống Pub Sub dùng Hazelcast và Spring boot
- Build hệ thống Pub-Sub bằng Kafka+Spring boot (phần 3)
- Biết sử dụng git cherry-pick để làm việc hiệu quả hơn
- Git stash giúp bạn trở nên chuyên nghiệp như thế nào?
- Git revert với git reset hoạt động như thế nào
- Series tìm hiểu lập trình java
- Active Jrebel để code trong IntellIJ IDEA
- Elasticsearch là gì mà bá đạo đến vậy? [Phần 1]
- Cùng nhau tìm hiểu Docker
- Cài đặt Hazelcast trên server Centos 7
- Elasticsearch và Kibana dựng bằng Docker
- Lập trình viên lúc rảnh rỗi thì nên làm gì?
Code xử lý chi tiết tại đây: cafeincodeio/kafka-example