如何在Spring Boot應用程式中使用Apache Kafka?
第1步:生成我們的專案:ofollow,noindex" target="_blank">Spring Initializr 來生成我們的專案。我們的專案將提供Spring MVC / Web支援和Apache Kafka支援。
第2步:釋出/讀取Kafka主題中的訊息:
<b>public</b> <b>class</b> User { <b>private</b> String name; <b>private</b> <b>int</b> age; <b>public</b> User(String name, <b>int</b> age) { <b>this</b>.name = name; <b>this</b>.age = age; } }
第3步:通過application.yml配置檔案配置Kafka:
我們需要建立配置檔案。我們需要以某種方式配置我們的Kafka生產者和消費者,以便能夠釋出和讀取與主題相關的訊息。相比建立一個使用@Configuration標註的Java類,我們可以直接使用配置檔案application.properties或application.yml。Spring Boot讓我們避免像過去一樣編寫的所有樣板程式碼,同時為我們提供了更加智慧的配置應用程式的方法,如下所示:
server: port: 9000 spring: kafka: consumer: bootstrap: localhost:9092 group-id: group_id auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: bootstrap: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
第4步:建立一個生產者,建立生產者會將我們的訊息寫入該主題。
<b>public</b> <b>class</b> Producer { <b>private</b> <b>static</b> <b>final</b> Logger logger = LoggerFactory.getLogger(Producer.<b>class</b>); <b>private</b> <b>static</b> <b>final</b> String TOPIC = <font>"users"</font><font>; @Autowired <b>private</b> KafkaTemplate<String, String> kafkaTemplate; <b>public</b> <b>void</b> sendMessage(String message) { logger.info(String.format(</font><font>"#### -> Producing message -> %s"</font><font>, message)); <b>this</b>.kafkaTemplate.send(TOPIC, message); } } </font>
自動連線autowire到KafkaTemplate ,使用它將訊息釋出到主題 - 這就是訊息的生產者!
第5步:建立一個消費者,消費者是負責根據您自己的業務邏輯的需求閱讀處理訊息的訊息的服務。要進行設定,請輸入以下內容:
@Service <b>public</b> <b>class</b> Consumer { <b>private</b> <b>final</b> Logger logger = LoggerFactory.getLogger(Producer.<b>class</b>); @KafkaListener(topics = <font>"users"</font><font>, groupId = </font><font>"group_id"</font><font>) <b>public</b> <b>void</b> consume(String message) throws IOException { logger.info(String.format(</font><font>"#### -> Consumed message -> %s"</font><font>, message)); } } </font>
在這裡,我們告訴我們的方法void consume(String message)訂閱使用者的主題,並將每條訊息傳送到應用程式日誌。在您的實際應用程式中,您可以按照業務需要的方式處理訊息。
第6步:建立REST控制器,們已經擁有了能夠消費Kafka訊息所需的全部內容。
為了充分展示我們建立的所有內容的工作原理,我們需要建立一個具有單一端點的控制器。訊息將釋出到此端點,然後由我們的生產者處理。然後,我們的消費者將通過登入到控制檯來捕獲並處理它。
@RestController @RequestMapping(value = <font>"/kafka"</font><font>) <b>public</b> <b>class</b> KafkaController { <b>private</b> <b>final</b> Producer producer; @Autowired KafkaController(Producer producer) { <b>this</b>.producer = producer; } @PostMapping(value = </font><font>"/publish"</font><font>) <b>public</b> <b>void</b> sendMessageToKafkaTopic(@RequestParam(</font><font>"message"</font><font>) String message) { <b>this</b>.producer.sendMessage(message); } } </font>
讓我們使用cURL將訊息傳送給Kafka:
curl -X POST -F 'message=test' http://localhost:9000/kafka/publish
基本上就是這樣!在不到10個步驟中,您瞭解了將Apache Kafka新增到Spring Boot專案是多麼容易。如果您遵循本指南,您現在知道如何將Kafka整合到Spring Boot專案中,並且您已準備好使用這個超級工具!