springboot集成kafka消费数据
文章目录
- springboot集成kafka消费数据
- 1.引入pom依赖
- 2.添加配置文件
- 2.1.添加KafkaConsumerConfig.java
- 2.2.添加KafkaIotCustomProperties.java
- 2.3.添加application.yml配置
- 3.消费者代码
1.引入pom依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.1.2</version></dependency>
2.添加配置文件
2.1.添加KafkaConsumerConfig.java
@Configuration
@EnableConfigurationProperties(KafkaIotCustomProperties.class)
@Slf4j
public class KafkaConsumerConfig {@AutowiredKafkaIotCustomProperties kafkaIotCustomProperties;@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.setBatchListener(true);ContainerProperties containerProperties = factory.getContainerProperties();containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}private ConsumerFactory<String, String> consumerFactory() {Map<String, Object> consumerConfigs = consumerConfigs();log.info("消费者的配置信息:{}",JSONObject.toJSONString(consumerConfigs));return new DefaultKafkaConsumerFactory<>(consumerConfigs);}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaIotCustomProperties.getBootstrapServers());propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaIotCustomProperties.isEnableAutoCommit());propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getAutoCommitInterval());propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaIotCustomProperties.getSessionTimeOut());propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaIotCustomProperties.getKeyDeserializer());propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaIotCustomProperties.getValueDeserializer());propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getHeartbeatInterval());propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaIotCustomProperties.getGroupId());propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaIotCustomProperties.getAutoOffsetReset());propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaIotCustomProperties.getMaxPollRecords());propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getMaxPollInterval());return propsMap;}}
2.2.添加KafkaIotCustomProperties.java
@Component
@ConfigurationProperties(prefix = "fxyh.realdata.kafka")
@Data
public class KafkaIotCustomProperties {private List<String> topics;private String groupId;private String sessionTimeOut;private String bootstrapServers;private String autoOffsetReset;private boolean enableAutoCommit;private String autoCommitInterval;private String fetchMinSize;private String fetchMaxWait;private String maxPollRecords;private String maxPollInterval;private String heartbeatInterval;private String keyDeserializer;private String valueDeserializer;
}
2.3.添加application.yml配置
fxyh:realdata:kafka:bootstrapServers: 192.168.80.251:9092topics: ["test1","test2"]groupId: shengtingrealdatagroupsessionTimeOut: 30000enableAutoCommit: falseautoCommitInterval: 1000fetchMinSize: 1fetchMaxWait: 500maxPollRecords: 50maxPollInterval: 300000heartbeatInterval: 10000keyDeserializer: org.apache.kafka.common.serialization.StringDeserializervalueDeserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: latest
3.消费者代码
@Slf4j
@Component
public class DeviceDataConsumer {@Autowiredprivate KafkaIotCustomProperties kafkaIotCustomProperties;@KafkaListener(topics = {"#{@kafkaIotCustomProperties.topics}"}, groupId = "#{@kafkaIotCustomProperties.groupId}", containerFactory = "kafkaListenerContainerFactory",properties = {"#{@kafkaIotCustomProperties.autoOffsetReset}"})public void topicTest(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {for (ConsumerRecord<String, String> record : records) {log.info("topic_test 消费了: Topic:" + record.topic() + ",groupId:" + kafkaIotCustomProperties.getGroupId() + ",Message:" + record.value());ack.acknowledge();}}
}