Springboot2.x集成kafka2.2.0的示例代碼_第1頁
Springboot2.x集成kafka2.2.0的示例代碼_第2頁
Springboot2.x集成kafka2.2.0的示例代碼_第3頁
Springboot2.x集成kafka2.2.0的示例代碼_第4頁
Springboot2.x集成kafka2.2.0的示例代碼_第5頁
已閱讀5頁,還剩2頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

第Springboot2.x集成kafka2.2.0的示例代碼目錄引言基本環(huán)境代碼編寫1、基本引用pom2、基本配置3、實體類4、生產(chǎn)者端5、消費者6、測試效果展示遇到的問題

引言

kafka近幾年更新非常快,也可以看出kafka在企業(yè)中是用的頻率越來越高,在springboot中集成kafka還是比較簡單的,但是應(yīng)該注意使用的版本和kafka中基本配置,這個地方需要信心,防止進入坑中。

版本對應(yīng)地址:https://spring.io/projects/spring-kafka

基本環(huán)境

springboot版本2.1.4

kafka版本2.2.0

jdk1.8

代碼編寫

1、基本引用pom

xmlversion="1.0"encoding="UTF-8"

projectxmlns="/POM/4.0.0"xmlns:xsi="/2001/XMLSchema-instance"

xsi:schemaLocation="/POM/4.0.0/xsd/maven-4.0.0.xsd"

modelVersion4.0.0/modelVersion

parent

groupIdorg.springframework.boot/groupId

artifactIdspring-boot-starter-parent/artifactId

version2.1.4.RELEASE/version

relativePath/!--lookupparentfromrepository--

/parent

groupIdcom.example/groupId

artifactIddemo/artifactId

version0.0.1-SNAPSHOT/version

namekafkademo/name

descriptionDemoprojectforSpringBoot/description

properties

java.version1.8/java.version

/properties

dependencies

dependency

groupIdorg.springframework.boot/groupId

artifactIdspring-boot-starter-web/artifactId

/dependency

dependency

groupIdmysql/groupId

artifactIdmysql-connector-java/artifactId

scoperuntime/scope

/dependency

dependency

groupIdorg.springframework.boot/groupId

artifactIdspring-boot-starter-test/artifactId

scopetest/scope

/dependency

dependency

groupIdorg.springframework.kafka/groupId

artifactIdspring-kafka/artifactId

version2.2.0.RELEASE/version

/dependency

dependency

groupIdcom.google.code.gson/groupId

artifactIdgson/artifactId

version2.7/version

/dependency

/dependencies

build

plugins

plugin

groupIdorg.springframework.boot/groupId

artifactIdspring-boot-maven-plugin/artifactId

/plugin

/plugins

/build

/project

2、基本配置

spring.kafka.bootstrap-servers=:9092

spring.kafka.consumer.group-id=test-consumer-group

spring.kafka.consumer.key-deserializer=mon.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=mon.serialization.StringDeserializer

ducer.key-serializer=mon.serialization.StringSerializer

ducer.value-serializer=mon.serialization.StringSerializer

#logging.level.root=debug

3、實體類

packagecom.example.demo.model;

importjava.util.Date;

publicclassMessages{

privateLongid;

privateStringmsg;

privateDatesendTime;

publicLonggetId(){

returnid;

publicvoidsetId(Longid){

this.id=id;

publicStringgetMsg(){

returnmsg;

publicvoidsetMsg(Stringmsg){

this.msg=msg;

publicDategetSendTime(){

returnsendTime;

publicvoidsetSendTime(DatesendTime){

this.sendTime=sendTime;

}

4、生產(chǎn)者端

packagecom.example.demo.service;

importcom.example.demo.model.Messages;

importcom.google.gson.Gson;

importcom.google.gson.GsonBuilder;

importorg.springframework.beans.factory.annotation.Autowired;

importorg.springframework.kafka.core.KafkaTemplate;

importorg.springframework.kafka.support.SendResult;

importorg.springframework.stereotype.Service;

importorg.springframework.util.concurrent.ListenableFuture;

importjava.util.Date;

importjava.util.UUID;

@Service

publicclassKafkaSender{

@Autowired

privateKafkaTemplateString,StringkafkaTemplate;

privateGsongson=newGsonBuilder().create();

publicvoidsend(){

Messagesmessage=newMessages();

message.setId(System.currentTimeMillis());

message.setMsg("123");

message.setSendTime(newDate());

ListenableFutureSendResultString,Stringtest0=kafkaTemplate.send("newtopic",gson.toJson(message));

}

5、消費者

packagecom.example.demo.service;

importorg.apache.kafka.clients.consumer.ConsumerRecord;

importorg.springframework.kafka.annotation.KafkaListener;

importorg.springframework.stereotype.Service;

importjava.util.Optional;

@Service

publicclassKafkaReceiver{

@KafkaListener(topics={"newtopic"})

publicvoidlisten(ConsumerRecord,record){

OptionalkafkaMessage=Optional.ofNullable(record.value());

if(kafkaMessage.isPresent()){

Objectmessage=kafkaMessage.get();

System.out.println("record="+record);

System.out.println("message="+message);

}

6、測試

在啟動方法中模擬消息生產(chǎn)者,向kafka中發(fā)送消息

packagecom.example.demo;

importcom.example.demo.service.KafkaSender;

importorg.springframework.boot.SpringApplication;

importorg.springframework.boot.autoconfigure.SpringBootApplication;

importorg.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication

publicclassKafkademoApplication{

publicstaticvoidmain(String[]args){

ConfigurableApplicationContextcontext=SpringApplica

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論