RocketMQ踩坑
| Java
评论 0 | 点赞 0 | 浏览 666

RocketMQ踩坑




rocketMQ集成springboot使用@RocketMQMessageListener监听消息,实体接收消息使用fastjson的@JSONField注解无法实现下划线自动转驼峰

原因分析


RocketMQAutoConfiguration通过import的方式向容器中注入MessageConverterConfiguration类


@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@ConditionalOnClass({MQAdmin.class})
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server", matchIfMissing = true)
@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, ExtConsumerResetConfiguration.class, RocketMQTransactionConfiguration.class})
@AutoConfigureAfter({MessageConverterConfiguration.class})
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})
public class RocketMQAutoConfiguration implements ApplicationContextAware {
        ...
}

MessageConverterConfiguration类会注入一个类型为RocketMQMessageConverter的bean

@Configuration
@ConditionalOnMissingBean(RocketMQMessageConverter.class)
class MessageConverterConfiguration {
    @Bean
    public RocketMQMessageConverter createRocketMQMessageConverter() {
        return new RocketMQMessageConverter();
    }
}


RocketMQMessageConverter在构造对象的时候执行静态代码块,初始化两个boolean类型的静态常量JACKSON_PRESENT、FASTJSON_PRESENT,只要项目中引入了相关的包就会赋值为true
JACKSON相关包com.fasterxml.jackson.databind.ObjectMapper|com.fasterxml.jackson.core.JsonGenerator
FASTJSON相关包com.alibaba.fastjson.JSON|com.alibaba.fastjson.support.config.FastJsonConfig
在构造方法中会构造一个类型为MessageConverter的list集合,结合中会添加JSON转换的处理方法,List集合是顺序的,JACKSON优先于FASTJSON加入到集合中;


public class RocketMQMessageConverter {

    private static final boolean JACKSON_PRESENT;
    private static final boolean FASTJSON_PRESENT;

    static {
        ClassLoader classLoader = RocketMQMessageConverter.class.getClassLoader();
        JACKSON_PRESENT =ClassUtils.isPresent("com.fasterxml.jackson.databind.ObjectMapper", classLoader) && ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator", classLoader);
        FASTJSON_PRESENT = ClassUtils.isPresent("com.alibaba.fastjson.JSON", classLoader) && ClassUtils.isPresent("com.alibaba.fastjson.support.config.FastJsonConfig", classLoader);
    }

    private final CompositeMessageConverter messageConverter;

    public RocketMQMessageConverter() {
        List<MessageConverter> messageConverters = new ArrayList<>();
        ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter();
        byteArrayMessageConverter.setContentTypeResolver(null);
        messageConverters.add(byteArrayMessageConverter);
        messageConverters.add(new StringMessageConverter());
        if (JACKSON_PRESENT) {
            messageConverters.add(new MappingJackson2MessageConverter());
        }
        if (FASTJSON_PRESENT) {
            try {
                messageConverters.add(
                (MessageConverter)ClassUtils.forName(
                    "com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter",
                    ClassUtils.getDefaultClassLoader()).newInstance());
            } catch (ClassNotFoundException | IllegalAccessException | InstantiationException ignored) {
                //ignore this exception
            }
        }
        messageConverter = new CompositeMessageConverter(messageConverters);
    }

    public MessageConverter getMessageConverter() {
        return messageConverter;
    }
}


在监听消费消息的时候,在进入onMessage方法之前会走到DefaultRocketMQListenerContainer的内部类DefaultMessageListenerConcurrently实现了MessageListenerConcurrently接口,最终会调用DefaultRocketMQListenerContainer#handleMessage方法来处理消息


public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {

        @SuppressWarnings("unchecked")
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt messageExt : msgs) {
                log.debug("received msg: {}", messageExt);
                try {
                    long now = System.currentTimeMillis();
                    handleMessage(messageExt);
                    long costTime = System.currentTimeMillis() - now;
                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                } catch (Exception e) {
                    log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
                    context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }


DefaultRocketMQListenerContainer#handleMessage方法中会调用onMessage方法,并且先调用DefaultRocketMQListenerContainer#doConvertMessage方法反序列化对象

 private void handleMessage(
        MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
        if (rocketMQListener != null) {
            rocketMQListener.onMessage(doConvertMessage(messageExt));
        } else if (rocketMQReplyListener != null) {
            Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
            Message<?> message = MessageBuilder.withPayload(replyContent).build();

            org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
            DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();
            producer.setSendMsgTimeout(replyTimeout);
            producer.send(replyMessage, new SendCallback() {
                @Override public void onSuccess(SendResult sendResult) {
                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                        log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
                    } else {
                        log.debug("Consumer replies message success.");
                    }
                }

                @Override public void onException(Throwable e) {
                    log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
                }
            });
        }
    }

  @SuppressWarnings("unchecked")
    private Object doConvertMessage(MessageExt messageExt) {
        if (Objects.equals(messageType, MessageExt.class) || Objects.equals(messageType, org.apache.rocketmq.common.message.Message.class)) {
            return messageExt;
        } else {
            String str = new String(messageExt.getBody(), Charset.forName(charset));
            if (Objects.equals(messageType, String.class)) {
                return str;
            } else {
                // If msgType not string, use objectMapper change it.
                try {
                    if (messageType instanceof Class) {
                        //if the messageType has not Generic Parameter
                        return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) messageType);
                    } else {
                        //if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
                        //we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
                        return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) messageType).getRawType(), methodParameter);
                    }
                } catch (Exception e) {
                    log.info("convert failed. str:{}, msgType:{}", str, messageType);
                    throw new RuntimeException("cannot convert message to " + messageType, e);
                }
            }
        }
    }


doConvertMessage方法里面最终会调用到CompositeMessageConverter#fromMessage方法,fromMessage方法中会去变了前面初始化bean的时候构建的MessageConverter集合,JACKSON在集合中排在FASTJSON前面,使用JACKSON方式处理完成后就会直接返回rusult然后进入到onMessage方法


	@Override
	@Nullable
	public Object fromMessage(Message<?> message, Class<?> targetClass) {
		for (MessageConverter converter : getConverters()) {
			Object result = converter.fromMessage(message, targetClass);
			if (result != null) {
				return result;
			}
		}
		return null;
	}


至此解释了为什么@JSONField注解在反序列化的时候不起作用

解决方案


1. 使用JACKSON的@JsonProperty(value = "provider_id")来进行重命名
2. 直接使用String接收,手动使用fastjson提供的方法JSON.parseObject(String,Class)转换成对象
3. 去除项目中所有的JACKSON依赖,使用fastjson代替

本文作者:不是好驴
本文链接:https://www.baddonkey.cn/detail/33
版权声明:原创文章,允许转载,转载请注明出处

高谈阔论

留言列表