springboot如何整合kafka
需要安装kafka,安装请见:springboot基础(50):linux安装kafka
<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafkaartifactId>
dependency>
@RestController
@RequestMapping("/msg")
public class MessageController {
@Autowired
private MessageService messageService;
@GetMapping("{id}")
public String sendMessage(@PathVariable String id){
messageService.sendMessage(id);
return "success";
}
}
public interface MessageService {
void sendMessage(String id);
String doMessage();
}
@Service
public class MessageKafkatmqServiceImpl implements MessageService {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Override
public void sendMessage(String id) {
System.out.println("待发送消息加入到队列(kafka )id:"+id);
kafkaTemplate.send("hello",id);//hello是前面已经创建的topic
}
@Override
public String doMessage() {
return null;
}
}
/**
* 监听器
*/
@Component
public class KafkaMessageListener {
@KafkaListener(topics = "hello")
public void onMessage222(ConsumerRecord<String,String> consumerRecord) {
System.out.println("完成短信的发送,id :"+consumerRecord.value());
}
}
spring:
kafka:
bootstrap-servers: 106.13.2.249:9092
consumer:
group-id: abc #设置消费者的groupid




2022-07-20 15:48:28.701 INFO 15924 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-abc-1, groupId=abc] Cluster ID: LFbHxG8qSSu7PyPKXoDD4g
2022-07-20 15:48:28.703 INFO 15924 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-abc-1, groupId=abc] Discovered group coordinator ls-bptysztw:9092 (id: 2147483647 rack: null)
2022-07-20 15:48:30.990 WARN 15924 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-abc-1, groupId=abc] Error connecting to node ls-bptysztw:9092 (id: 2147483647 rack: null)
java.net.UnknownHostException: ls-bptysztw
at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) ~[na:1.8.0_144]
at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928) ~[na:1.8.0_144]
at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323) ~[na:1.8.0_144]
at java.net.InetAddress.getAllByName0(InetAddress.java:1276) ~[na:1.8.0_144]
at java.net.InetAddress.getAllByName(InetAddress.java:1192) ~[na:1.8.0_144]
at java.net.InetAddress.getAllByName(InetAddress.java:1126) ~[na:1.8.0_144]
at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:468) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) ~[kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:988) [kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301) [kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:575) [kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:854) [kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:830) [kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) [kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) [kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) [kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) [kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) [kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) [kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) [kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) [kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:246) [kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.coordinatorUnknownAndUnready(ConsumerCoordinator.java:459) [kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:487) [kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262) [kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) [kafka-clients-3.1.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) [kafka-clients-3.1.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1522) [spring-kafka-2.8.7.jar:2.8.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1512) [spring-kafka-2.8.7.jar:2.8.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1340) [spring-kafka-2.8.7.jar:2.8.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252) [spring-kafka-2.8.7.jar:2.8.7]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_144]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_144]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
解决办法
配置C:\Windows\System32\drivers\etc\hosts文件
123.123.123.123 ls-bptysztw