⽬录
引⼊依赖创建配置类Kafka 消费者引⼊依赖
因为我的项⽬的 springboot 版本是 1.5.22.RELEASE,所以引的是 1.3.11.RELEASE 的包。读者可以根据下图来⾃⾏选择对应的版本。图⽚更新可能不及时,详情可查看官⽅⽹站。
注:这⾥有个踩坑点,如果引⼊包版本不对,项⽬启动时会抛出org.springframework.core.log.LogAccessor 异常:
java.lang.ClassNotFoundException: org.springframework.core.log.LogAccessor
创建配置类
/**
* kafka 配置类 */
@Configuration @EnableKafka
public class KafkaConsumerConfig {
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig.class); @Value(\"${kafka.bootstrap.servers}\") private String kafkaBootstrapServers; @Value(\"${kafka.group.id}\") private String kafkaGroupId;
@Value(\"${kafka.topic}\") private String kafkaTopic;
public static final String CONFIG_PATH = \"/home/admin/xxx/BOOT-INF/classes/kafka_client_jaas.conf\"; public static final String LOCATION_PATH = \"/home/admin/xxx/BOOT-INF/classes/kafka.client.truststore.jks\";
@Bean
public KafkaListenerContainerFactory factory.setBatchListener(Boolean.TRUE); factory.getContainerProperties().setPollTimeout(30000); return factory; } public ConsumerFactory return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map //设置接⼊点,请通过控制台获取对应Topic的接⼊点。 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); //设置SSL根证书的路径,请记得将XXX修改为⾃⼰的路径。 //与SASL路径类似,该⽂件也不能被打包到jar中。 System.setProperty(\"java.security.auth.login.config\ props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, LOCATION_PATH); //根证书存储的密码,保持不变。 props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, \"KafkaOnsClient\"); //接⼊协议,⽬前⽀持使⽤SASL_SSL协议接⼊。 props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, \"SASL_SSL\"); //SASL鉴权⽅式,保持不变。 props.put(SaslConfigs.SASL_MECHANISM, \"PLAIN\"); // ⾃动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE); //两次Poll之间的最⼤允许间隔。 //消费者超过该值没有返回⼼跳,服务端判断消费者处于⾮存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); //设置单次拉取的量,⾛公⽹访问时,该参数会有较⼤影响。 props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000); props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000); //每次Poll的最⼤数量。 //注意该值不要改得太⼤,如果Poll太多数据,⽽不能在下次Poll之前消费完,则会触发⼀次负载均衡,产⽣卡顿。 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30); //消息的反序列化⽅式。 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, \"org.apache.kafka.common.serialization.StringDeserializer\"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, \"org.apache.kafka.common.serialization.StringDeserializer\"); //当前消费实例所属的消费组,请在控制台申请之后填写。 //属于同⼀个组的消费实例,会负载消费消息。 props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); //Hostname校验改成空。 props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, \"\"); return props; } } 注:此处通过 factory.setConcurrency(5); 配置了并发量为 5 ,假设我们线上的 Topic 有 12 个分区。那么将会是 3 个线程分配到 2 个分区,2 个线程分配到 3 个分区,3 * 2 + 2 * 3 = 12。 Kafka 消费者 /** * kafka 消息消费类 */ @Component public class KafkaMessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class); @KafkaListener(topics = {\"${kafka.topic}\ public void listen(List LOGGER.info(\"Kafka Consume partition:{}, offset:{}\ String value = record.value(); System.out.println(\"value = \" + value); // 处理业务逻辑 ... } } } 因为我在配置类中设置了批量监听,所以此处 listen ⽅法的⼊参是List:List 到此这篇关于Springboot集成Kafka进⾏批量消费及踩坑点的⽂章就介绍到这了,更多相关Springboot Kafka批量消费内容请搜索以前的⽂章或继续浏览下⾯的相关⽂章希望⼤家以后多多⽀持! 因篇幅问题不能全部显示,请点此查看更多更全内容