• SpringCloudStreamkafka接收jsonarray字符串失败


    场景现象

    • kafka作为消息队列,作为前端设备数据到后端消费的渠道,也被多个不同微服务消费
    • 一个服务与前端边缘计算设备建立socket消息,接收实时交通事件推送,再将事件发送到kafka里面。此处使用的是Spring Kafka,普通的将事件列表数据转化为字符串后发送
    • 事件信息,需要入库和实时推送,也需要被第三方服务调用,第三方也采取消费kafka的方式
    • 这个服务,作为kafka的消费者,读取事件数据,发送短信给指定用户,但此处使用的是Spring.cloud.stream,函数式编程,绑定kafka的topic
    • 此服务消费时,加了一些打印。但是事件推送到kafka后,看不到任何日志打印,也没有报错。但是使用kafka命令查看,却已经被该消费者消费掉了
    • 消费方,配置文件如下:
    spring:
      cloud:
        stream:
          function:
            definition: vipRouteGpsConsumer;offlineEventConsumer;boxEventTopicConsumer
          bindings:
            vipRouteGpsConsumer-in-0:
              destination: vipRouteGps
            offlineEventConsumer-in-0:
              destination: offlineEvent
            boxEventTopicConsumer-in-0:
              destination: boxTopic
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 消费的源代码如下:
    @Configuration
    public class KafkaConfiguration {
        @Bean
        public Consumer<String> boxEventTopicConsumer() {
            return value -> {
                log.info("Consumer Received (boxTopic): " + value);
                // todo : do somethings
            };
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    Spring Cloud Stream介绍

    • Spring Cloud Stream是一个用于构建消息驱动的微服务应用程序的框架
    • 它基于Spring Boot和Spring Integration,提供了一种简单而灵活的方式来连接消息代理和应用程序
    • Spring Cloud Stream的核心概念包括:
      • Binder:Binder是Spring Cloud Stream的核心组件,它提供了与消息代理交互的适配器。Spring Cloud Stream提供了对多种消息代理的支持,如Kafka、RabbitMQ等。
      • Destination:Destination代表了消息的目的地,它可以是一个主题(Topic)或者一个队列(Queue)。
      • Message:Message是Spring Cloud Stream中的基本单位,它包含了消息的负载和一些元数据。
      • Producer:Producer用于向消息代理发送消息。
      • Consumer:Consumer用于从消息代理接收消息
    • Spring Cloud Stream提供了两种方式来定义消息驱动的应用程序:
      • 使用注解:通过在方法上添加注解来定义消息的生产者和消费者。
      • 使用函数式编程模型:通过定义输入和输出的Binding来创建消息的生产者和消费者
    • Spring Cloud Stream还提供了一些附加功能,如消息转换、消息分组、消息持久化等,以满足不同场景下的需求。通过使用Spring Cloud Stream,开发人员可以更加轻松地构建和管理消息驱动的微服务应用程序

    问题处理

    • 消费某个多分区topic时,发现只有其中一个分区数据有打印,其他都没有,但是查看却发现都消费掉了
    2023-09-21 13:45:13.706  INFO 2184 --- [container-0-C-1] c.newatc.data.kafka.KafkaConfiguration   : Consumer Received (boxTopic): {"equipmentNo":0,"laneNum":4,"statisticsCycle":60,"time":1695275100110,"vehicleLanes":[{"averageParkingNum":0.5,"averageSpeed":6.1,"lane":1,"laneFlow":2,"maxQueueLength":12,"nonVehicleAverageTravelTime":0.0,"overStopLineFlow":2,"pedestrianAverageTravelTime":0,"vehicleAverageDelay":15.3,"vehicleAverageTravelTime":0.2},{"averageParkingNum":0.0,"averageSpeed":35.7,"lane":2,"laneFlow":1,"maxQueueLength":0,"nonVehicleAverageTravelTime":0.0,"overStopLineFlow":1,"pedestrianAverageTravelTime":0,"vehicleAverageDelay":0.0,"vehicleAverageTravelTime":0.0},{"averageParkingNum":0.0,"averageSpeed":0.0,"lane":3,"laneFlow":3,"maxQueueLength":0,"nonVehicleAverageTravelTime":0.0,"overStopLineFlow":0,"pedestrianAverageTravelTime":0,"vehicleAverageDelay":0.0,"vehicleAverageTravelTime":0.0},{"averageParkingNum":0.0,"averageSpeed":25.4,"lane":4,"laneFlow":1,"maxQueueLength":0,"nonVehicleAverageTravelTime":0.0,"overStopLineFlow":5,"pedestrianAverageTravelTime":0,"vehicleAverageDelay":0.0,"vehicleAverageTravelTime":1.6}]}
    
    • 1
    • 最终测试发现,列表转化为jsonarray字符串,无法打印。但是单条数据json字符串却可以
    • 将代码改为String[]后发现报错,代码如下
        @Bean
        public Consumer<String[]> boxEventTopicConsumer(HWTrafficEventManager hwTrafficEventManager) {
            log.info("Consumer Received (boxTopic): " + hwTrafficEventManager);
            return value -> {
                log.info("Consumer Received (boxTopic): " + value.length);
    
            };
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 报错如下:
    2023-09-21 11:47:59.684 ERROR 9112 --- [container-0-C-1] o.s.kafka.listener.DefaultErrorHandler   : Backoff none exhausted for boxTopic-3@29941
    
    org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot deserialize value of type `[Ljava.lang.String;` from Object value (token `JsonToken.START_OBJECT`)
     at [Source: (byte[])"[{"equipmentNo":0,"eventCode":8,"eventType":1,"laneTurn":0,"relationNo":1,"time":1695267981221}]"; line: 1, column: 2] (through reference chain: java.lang.Object[][0]); nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `[Ljava.lang.String;` from Object value (token `JsonToken.START_OBJECT`)
     at [Source: (byte[])"[{"equipmentNo":0,"eventCode":8,"eventType":1,"laneTurn":0,"relationNo":1,"time":1695267981221}]"; line: 1, column: 2] (through reference chain: java.lang.Object[][0]), failedMessage=GenericMessage [payload=byte[96], headers={kafka_offset=29941, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3c31888e, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=3, kafka_receivedMessageKey=[B@3fcf7bd1, kafka_receivedTopic=boxTopic, kafka_receivedTimestamp=1695267871945, contentType=application/json, kafka_groupId=data-center}]
    	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2683)
    	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2649)
    	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2609)
    	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2536)
    	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2427)
    	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2305)
    	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1979)
    	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1364)
    	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1355)
    	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1247)
    	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    	at java.base/java.lang.Thread.run(Thread.java:834)
    Caused by: org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot deserialize value of type `[Ljava.lang.String;` from Object value (token `JsonToken.START_OBJECT`)
     at [Source: (byte[])"[{"equipmentNo":0,"eventCode":8,"eventType":1,"laneTurn":0,"relationNo":1,"time":1695267981221}]"; line: 1, column: 2] (through reference chain: java.lang.Object[][0]); nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `[Ljava.lang.String;` from Object value (token `JsonToken.START_OBJECT`)
     at [Source: (byte[])"[{"equipmentNo":0,"eventCode":8,"eventType":1,"laneTurn":0,"relationNo":1,"time":1695267981221}]"; line: 1, column: 2] (through reference chain: java.lang.Object[][0])
    	at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:237)
    	at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:113)
    	at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:185)
    	at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:176)
    	at org.springframework.cloud.function.context.config.SmartCompositeMessageConverter.fromMessage(SmartCompositeMessageConverter.java:48)
    	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputMessageIfNecessary(SimpleFunctionRegistry.java:1282)
    	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputIfNecessary(SimpleFunctionRegistry.java:1057)
    	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:696)
    	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:551)
    	at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:84)
    	at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:754)
    	at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:586)
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216)
    	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:397)
    	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:83)
    	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:454)
    	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:428)
    	at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:125)
    	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
    	at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:119)
    	at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:42)
    	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2629)
    	... 12 common frames omitted
    Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `[Ljava.lang.String;` from Object value (token `JsonToken.START_OBJECT`)
     at [Source: (byte[])"[{"equipmentNo":0,"eventCode":8,"eventType":1,"laneTurn":0,"relationNo":1,"time":1695267981221}]"; line: 1, column: 2] (through reference chain: java.lang.Object[][0])
    	at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
    	at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1741)
    	at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1515)
    	at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1420)
    	at com.fasterxml.jackson.databind.DeserializationContext.extractScalarFromObject(DeserializationContext.java:932)
    	at com.fasterxml.jackson.databind.deser.std.StdDeserializer._parseString(StdDeserializer.java:1292)
    	at com.fasterxml.jackson.databind.deser.std.StringArrayDeserializer.deserialize(StringArrayDeserializer.java:166)
    	at com.fasterxml.jackson.databind.deser.std.StringArrayDeserializer.deserialize(StringArrayDeserializer.java:25)
    	at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
    	at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674)
    	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3723)
    	at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:223)
    	... 45 common frames omitted
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 通过日志可以看出,是在接收数据的解析时发生了问题,jsonarray的字符串无法接受解析
    • 在消息发生时,在正常消息后面,加一个“-”,可以正常接收了,就是解析时需要先删掉尾字符再转json
    • 后面又发现,如果是作为字符数组接收,再转为字符串,是可以的
    • 相应配置
    spring:
      cloud:
        stream:
          function:
            definition: boxEventTopicConsumer
          bindings:
            boxEventTopicConsumer-in-0:
              destination: boxTopic
              group: data-center
              contentType: application/json
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 消费者消费代码
    @Configuration
    public class KafkaConfiguration {
        @Bean
        public Consumer<Object> boxEventTopicConsumer() {
            return value -> {
                String str = new String((byte[])value, java.nio.charset.StandardCharsets.UTF_8);
                log.info("Consumer Received (boxTopic): " + str);
                // todo : do somethings
            };
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  • 相关阅读:
    笔记本Kali系统主屏幕损坏后利用屏幕2启动失败修复
    CV科研知识点总结(个人研究所有相关知识点)
    数学建模Matlab之评价类方法
    Linux ln命令:建立链接文件
    oracle plsql如何debug触发器
    php组装数据批量插入,比单条循环插入数据快很多
    怎么将pdf转换成word?
    Jira Bug管理工具的操作步骤 提Bug的流程
    P1002 过河卒:图论动态规划入门
    Python绘制X-bar图和R图 | 统计过程控制SPC
  • 原文地址:https://blog.csdn.net/u010882234/article/details/133438214