Description
Current implementation (1.0.2.RELEASE) of KafkaListener
doesn't handle messages with null payload correctly. When a message with null payload comes in following exception is thrown:
java.lang.IllegalArgumentException: Payload must not be null
at org.springframework.util.Assert.notNull(Assert.java:115)
at org.springframework.messaging.support.MessageBuilder.createMessage(MessageBuilder.java:191)
at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:78)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:105)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:97)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:597)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1800(KafkaMessageListenerContainer.java:222)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:778)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:745)
This happens even if the payload is marked as a @Payload(required = false)
.
Currently the only way to handle such messages is to create and register own implementation of a message converter.
However messages with null payload are completely valid in Kafka world (for example with a topic compaction enabled a message with a key and a null payload is treated as a delete from the log). Also KafkaTemplate
support them correctly.
So, either remove notNull
check for payload or use different converters depending on @Payload(required = true/false)
flag... or at least provide a message converter implementation which accepts null payload.