SpringKafka复用@KafkaListener注解

SpringKafka复用@KafkaListener注解

在使用@KafkaListener的时候,有时候一个Spring工程需要复用@KafkaListener注解。SpringKafka提供了一种主从的方式在同一个Spring工程下复用@KafkaListener的实现。

Config

主类的 Bean 类名字 consumerConfigs、 consumerFactory、kafkaListenerContainerFactory不能做改动。

Primary KafkaFactory Config

    @Bean
    public KafkaListenerContainerFactory<?> batchFactoryForCmd() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        factory.setBatchListener(true);
        return factory;
    }


    @Bean
    @Primary //重要!!!指定该ContainerFactory为主要的容器工厂,kafka消费者默认关联该容器
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, getGroup());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getServers());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

Primary必须有一个,其他的可以有多个实现。

由于Bean类都在同一个的命名空间下,且都是单例的,各个 Bean 的方法名要不同,不然 Spring 的 Bean 类工厂无法初始化。比如 Primary 的 ConsumerFactory名称是 consumerFactory。另外一个实现的名称就要不同,写作 consumerFactoryForConn 用以区别。

Other KafkaFactory Config

    @Bean
    public KafkaListenerContainerFactory<?> batchFactoryForConn() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryForConn());
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        factory.setBatchListener(true);
        return factory;
    }


    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    kafkaListenerContainerFactoryForConn() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryForConn());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactoryForConn() {
        return new DefaultKafkaConsumerFactory<>(ConsumerConfigsForConn());
    }

    @Bean
    public Map<String, Object> ConsumerConfigsForConn() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG,getGroup());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getServers());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

@KafkaListener

配置不同的 containerFactory 就可以实现对不同的 topics 同时进行消费。containerFactory 就是之前 KafkaListenerContainerFactory Bean类 的方法名。id 配置不同的名称来区别。

    @KafkaListener(id = "cmdKafka", topics = "${kafka.cmd.topic}", containerFactory = "batchFactoryForCmd")
    public void listen(List<String> results,Acknowledgment ack) {
        commandProcessTask.publishCommandMessage(results);
        ack.acknowledge(); //手动提交commit
    }
    
    
    @KafkaListener(id = "connKafka", topics = "${kafka.conn.topic}", containerFactory = "batchFactoryForConn")
    public void listen(List<String> results,Acknowledgment ack) {
        dataProcessTask.publishDataMessage(results);
        ack.acknowledge(); //手动提交commit
    }

BeanFacotry

BeanFactory和FactoryBean的区别

BeanFactory是接口 , 一般使用的高级容器 ApplicationContext 接口继承了这个接口(其实是继承了多个接口,其中ListableBeanFactory, HierarchicalBeanFactory 都继承了BeanFactory),所有Bean 由它来管理。

FactoryBean在IOC容器的基础上给Bean的实现加上了一个简单工厂模式和装饰模式,让我们可以获取Bean,是通过反射机制实现的,很多Bean类都实现了FactoryBean

这样使用不同的id就可以获取到各自的Bean实例。

@Autowired
Private ApplicationContext context;
KafkaListenerContainerFactory<?> batchFactoryForCmd = (KafkaListenerContainerFactory<?>) context.getBean("batchFactoryForCmd");
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory =(KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>) context.getBean("kafkaListenerContainerFactory");
ConsumerFactory<Integer, String> consumerFactory = (ConsumerFactory<Integer, String>) context.getBean("consumerFactory");
Map<String, Object> consumerConfigs = (Map<String, Object>)context.getBean("consumerConfigs");


KafkaListenerContainerFactory<?> batchFactoryForConn = (KafkaListenerContainerFactory<?>) context.getBean("batchFactoryForConn");
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryForConn =(KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>) context.getBean("kafkaListenerContainerFactoryForConn");
ConsumerFactory<Integer, String> consumerFactoryForConn = (ConsumerFactory<Integer, String>)context.getBean("consumerFactoryForConn");
Map<String, Object> ConsumerConfigsForConn = (Map<String, Object>)context.getBean("ConsumerConfigsForConn");