RocketMQ踩坑
rocketMQ集成springboot使用@RocketMQMessageListener监听消息,实体接收消息使用fastjson的@JSONField注解无法实现下划线自动转驼峰
原因分析
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@ConditionalOnClass({MQAdmin.class})
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server", matchIfMissing = true)
@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, ExtConsumerResetConfiguration.class, RocketMQTransactionConfiguration.class})
@AutoConfigureAfter({MessageConverterConfiguration.class})
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})
public class RocketMQAutoConfiguration implements ApplicationContextAware {
...
}
MessageConverterConfiguration类会注入一个类型为RocketMQMessageConverter的bean
@Configuration
@ConditionalOnMissingBean(RocketMQMessageConverter.class)
class MessageConverterConfiguration {
@Bean
public RocketMQMessageConverter createRocketMQMessageConverter() {
return new RocketMQMessageConverter();
}
}
RocketMQMessageConverter在构造对象的时候执行静态代码块,初始化两个boolean类型的静态常量JACKSON_PRESENT、FASTJSON_PRESENT,只要项目中引入了相关的包就会赋值为true
JACKSON相关包com.fasterxml.jackson.databind.ObjectMapper|com.fasterxml.jackson.core.JsonGenerator
FASTJSON相关包com.alibaba.fastjson.JSON|com.alibaba.fastjson.support.config.FastJsonConfig
在构造方法中会构造一个类型为MessageConverter的list集合,结合中会添加JSON转换的处理方法,List集合是顺序的,JACKSON优先于FASTJSON加入到集合中;
public class RocketMQMessageConverter {
private static final boolean JACKSON_PRESENT;
private static final boolean FASTJSON_PRESENT;
static {
ClassLoader classLoader = RocketMQMessageConverter.class.getClassLoader();
JACKSON_PRESENT =ClassUtils.isPresent("com.fasterxml.jackson.databind.ObjectMapper", classLoader) && ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator", classLoader);
FASTJSON_PRESENT = ClassUtils.isPresent("com.alibaba.fastjson.JSON", classLoader) && ClassUtils.isPresent("com.alibaba.fastjson.support.config.FastJsonConfig", classLoader);
}
private final CompositeMessageConverter messageConverter;
public RocketMQMessageConverter() {
List messageConverters = new ArrayList<>();
ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter();
byteArrayMessageConverter.setContentTypeResolver(null);
messageConverters.add(byteArrayMessageConverter);
messageConverters.add(new StringMessageConverter());
if (JACKSON_PRESENT) {
messageConverters.add(new MappingJackson2MessageConverter());
}
if (FASTJSON_PRESENT) {
try {
messageConverters.add(
(MessageConverter)ClassUtils.forName(
"com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter",
ClassUtils.getDefaultClassLoader()).newInstance());
} catch (ClassNotFoundException | IllegalAccessException | InstantiationException ignored) {
//ignore this exception
}
}
messageConverter = new CompositeMessageConverter(messageConverters);
}
public MessageConverter getMessageConverter() {
return messageConverter;
}
}
在监听消费消息的时候,在进入onMessage方法之前会走到DefaultRocketMQListenerContainer的内部类DefaultMessageListenerConcurrently实现了MessageListenerConcurrently接口,最终会调用DefaultRocketMQListenerContainer#handleMessage方法来处理消息
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
@SuppressWarnings("unchecked")
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
handleMessage(messageExt);
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
DefaultRocketMQListenerContainer#handleMessage方法中会调用onMessage方法,并且先调用DefaultRocketMQListenerContainer#doConvertMessage方法反序列化对象
private void handleMessage(
MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
if (rocketMQListener != null) {
rocketMQListener.onMessage(doConvertMessage(messageExt));
} else if (rocketMQReplyListener != null) {
Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
Message> message = MessageBuilder.withPayload(replyContent).build();
org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();
producer.setSendMsgTimeout(replyTimeout);
producer.send(replyMessage, new SendCallback() {
@Override public void onSuccess(SendResult sendResult) {
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
} else {
log.debug("Consumer replies message success.");
}
}
@Override public void onException(Throwable e) {
log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
}
});
}
}
@SuppressWarnings("unchecked")
private Object doConvertMessage(MessageExt messageExt) {
if (Objects.equals(messageType, MessageExt.class) || Objects.equals(messageType, org.apache.rocketmq.common.message.Message.class)) {
return messageExt;
} else {
String str = new String(messageExt.getBody(), Charset.forName(charset));
if (Objects.equals(messageType, String.class)) {
return str;
} else {
// If msgType not string, use objectMapper change it.
try {
if (messageType instanceof Class) {
//if the messageType has not Generic Parameter
return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class>) messageType);
} else {
//if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
//we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class>) ((ParameterizedType) messageType).getRawType(), methodParameter);
}
} catch (Exception e) {
log.info("convert failed. str:{}, msgType:{}", str, messageType);
throw new RuntimeException("cannot convert message to " + messageType, e);
}
}
}
}
doConvertMessage方法里面最终会调用到CompositeMessageConverter#fromMessage方法,fromMessage方法中会去变了前面初始化bean的时候构建的MessageConverter集合,JACKSON在集合中排在FASTJSON前面,使用JACKSON方式处理完成后就会直接返回rusult然后进入到onMessage方法
@Override
@Nullable
public Object fromMessage(Message> message, Class> targetClass) {
for (MessageConverter converter : getConverters()) {
Object result = converter.fromMessage(message, targetClass);
if (result != null) {
return result;
}
}
return null;
}
至此解释了为什么@JSONField注解在反序列化的时候不起作用
解决方案
1. 使用JACKSON的@JsonProperty(value = "provider_id")来进行重命名
2. 直接使用String接收,手动使用fastjson提供的方法JSON.parseObject(String,Class)转换成对象
3. 去除项目中所有的JACKSON依赖,使用fastjson代替
高谈阔论