Inspections
This section contains inspection rules for Apache Kafka to implement
@KafkaListeners
This annotation is used to configure the Kafka listener. It allows you to define multiple @KafkaListener when repeateable annotations are not supported
Empty @KafkaListeners
This inspection checks if the @KafkaListeners annotation is empty, which means no @KafkaListener is defined. In such a case, the application will ignore:
Method Level
public class Example {
/**
* This is a listener for ConsumerRecords messages
*/
@KafkaListeners
public void listen(ConsumerRecords<String, SendPublicMessageRequest> records) {}
}Class Level
@KafkaListeners
public class Example {
/**
* This is a listener for ConsumerRecords messages
*/
@KafkaHandler
public void listen(ConsumerRecords<String, SendPublicMessageRequest> records) {}
}That means that application behavior will be affected, as no Kafka listeners will be registered and messages will not be processed
Error Level: Warning
@KafkaListener
This annotation is used to configure a Kafka listener. It allows you to define multiple @KafkaListener when repeateable annotations are supported
Empty @KafkaListener
This inspection checks if the @KafkaListener annotation is empty. In such a case, the application will throw an exception:
java.lang.IllegalStateException: topics, topicPattern, or topicPartitions must be providedMethod Level
public class Example {
/**
* This is a listener for ConsumerRecords messages
*/
@KafkaListener
public void listen(ConsumerRecords<String, SendPublicMessageRequest> records) {}
}Class Level
@KafkaListener
public class Example {
/**
* This is a listener for ConsumerRecords messages
*/
@KafkaHandler
public void listen(ConsumerRecords<String, SendPublicMessageRequest> records) {}
}That means that application won't start
Error Level: Error
@KafkaListener with non unique group ID
This inspection checks if the @KafkaListener annotation has a non-unique group ID
While it is possible to have multiple @KafkaListener on a same topic it's important to have a unique group ID for each listener
Method Level
public class Example {
/**
* This is a listener #1 for ConsumerRecords messages
*/
@KafkaListener(topics = "messages.1", groupId = "group_1")
public void listen(ConsumerRecords<String, SendPublicMessageRequest> records) {}
/**
* This is a listener #2 for ConsumerRecords messages
*/
@KafkaListener(topics = "messages.1", groupId = "group_1")
public String listenAndReply(ConsumerRecords<String, SendPublicMessageRequest> records) {}
}Class Level
@KafkaListener(topics = "messages.1", groupId = "group_1")
@KafkaListener(topics = "messages.1", groupId = "group_1")
public class Example {
/**
* This is a listener #1 for ConsumerRecords messages
*/
@KafkaHandler
public void listen(ConsumerRecords<String, SendPublicMessageRequest> records) {}
}Otherwise, application behavior will be different from expected
Error Level: Warning
@KafkaListener with ambiguous configuration
This inspection checks if the @KafkaListener annotation has ambiguous configuration
It's important to respect the framework configuration processing guidelines to avoid unexpected behavior
Spring
@KafkaListenerconfiguration processing guidelines:
topicsmutually exclusive withtopicPatternandtopicPartitionstopicPatternmutually exclusive withtopicsandtopicPartitionstopicPartitionsmutually exclusive withtopicsandtopicPattern
Method Level
public class Example {
/**
* This is a listener #1 for ConsumerRecords messages
*/
@KafkaListener(topics = "messages.1", groupId = "group_1", topicPattern = "messages.*")
public void listen(ConsumerRecords<String, SendPublicMessageRequest> records) {}
}Class Level
@KafkaListener(topics = "messages.1", groupId = "group_1", topicPattern = "messages.*")
public class Example {
/**
* This is a listener #1 for ConsumerRecords messages
*/
@KafkaHandler
public void listen(ConsumerRecords<String, SendPublicMessageRequest> records) {}
}Otherwise, application behavior will be different from expected
Error Level: Warning
@KafkaHandler
This annotation is used to define a method that will be invoked when a message is received. This inspection is applicable to both configuration variants:
- Class Level
@KafkaListeners - Class Level repeatable or one
@KafkaListener
Empty @KafkaHandler
This inspection checks if the @KafkaHandler annotation is empty.
@KafkaListener(topics = "messages.1")
public class Example {
/**
* This is a listener #1 for ConsumerRecords messages
*/
@KafkaHandler
public void listen() {}
}Error Level: Error
@KafkaHandler with an ambiguous payload type
It is important to understand that when a message arrives, the method selection depends on the payload type. The type is matched with a single non-annotated parameter, or one that is annotated with
@Payload. There must be no ambiguity - the system must be able to select exactly one method based on the payload type.
Multiple arguments without @Payload annotation
@KafkaListener(topics = "messages.1")
public class Example {
/**
* This is a listener #1 for ConsumerRecords messages
*/
@KafkaHandler
public void listen(String message, MessageHeaders headers) {}
}Error Level: Error
More than one argument with @Payload annotation
@KafkaListener(topics = "messages.1")
public class Example {
/**
* This is a listener #1 for ConsumerRecords messages
*/
@KafkaHandler
public void listen(@Payload String message1, @Payload String message2) {}
}Error Level: Error
@ConsumerRecord or @ConsumerRecords with multiple arguments
@KafkaListener(topics = "messages.1")
public class Example {
/**
* This is a listener #1 for ConsumerRecord messages
*/
@KafkaHandler
public void listen(ConsumerRecord<String, SendPublicMessageRequest> record, @Headers Map<String, Object> headers) {}
/**
* This is a listener #2 for ConsumerRecords messages
*/
@KafkaHandler
public void listen(ConsumerRecords<String, SendPublicMessageRequest> records, @Headers Map<String, Object> headers) {}
}Error Level: Error
Identical @KafkaHandler methods
@KafkaListener(topics = "messages.1")
public class Example {
/**
* This is a listener #1 for ConsumerRecord messages
*/
@KafkaHandler
public void listen(ConsumerRecord<String, SendPublicMessageRequest> record) {}
/**
* This is a listener #2 for ConsumerRecord messages
*/
@KafkaHandler
public void listen(ConsumerRecord<String, SendPublicMessageRequest> record) {}
}Error Level: Error
@SendTo
TODO: finish analysis of incorrect or missed @Beans required for @SendTo support
Only void @KafkaHandler
When all @KafkaHandler methods are void
@KafkaListener(topics = {"messages.blue"})
@KafkaListener(topics = {"messages.green"})
@SendTo({"messages.*.replies", "messages.*.replies-2"})
public class MultiMessagesListener {
@KafkaHandler
public void listen(Object message) {}
}Error Level: Warning