Springboot 集成kafka

news/2024/5/6 22:09:47/文章来源:https://blog.csdn.net/axibazZ/article/details/126902871

一、创建项目并导入pom依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

二、修改application.yml配置

1. producer 生产端的配置

spring:#重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中kafka:bootstrap-servers: 192.168.168.160:9092

2. consumer 消费端的配置,需要给consumer配置一个group-id

spring:#重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中kafka:bootstrap-servers: 192.168.168.160:9092#https://kafka.apache.org/documentation/#consumerconfigsconsumer:group-id: auto-dev #消费者组

三、生产者生产消息,消费者消费消息

1. 简单消费

producer生产者中使用自动注入的方式创建KafkaTemplate 对象

@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;@Test
void sendMessage(){// 第一个参数为topic,第二个为消息体kafkaTemplate.send("ifun","hello");
}

consumer消费消息,使用@KafkaListener注解监听topic为ifun中的消息,可以监听多个topic

@Component
@Slf4j
public class ConsumerListener {// 消费监听@KafkaListener(topics = {"ifun"})public void onMessage(ConsumerRecord<String, String> record){// 消费的哪个topic、partition的消息,打印出消息内容log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());}
}

2. 带回调的生产者,两种方式

@Test
void sendCallBackMessageOne(){kafkaTemplate.send("ifun","hello callback one").addCallback(success -> {// 消息发送到的topicString topic = success.getRecordMetadata().topic();// 消息发送到的分区int partition = success.getRecordMetadata().partition();// 消息在分区内的offsetlong offset = success.getRecordMetadata().offset();log.info("send success:topic:{} partition:{} offset:{}",topic,partition,offset);}, failure -> {log.info("send fail:message:{} ", failure.getMessage());});
}@Test
void sendCallBackMessageTwo(){kafkaTemplate.send("ifun", "hello callback two").addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable ex) {log.info("send fail:message:{} ", ex.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {String topic = result.getRecordMetadata().topic();int partition = result.getRecordMetadata().partition();long offset = result.getRecordMetadata().offset();log.info("send success:topic:{} partition:{} offset:{}",topic,partition,offset);}});
}

回调补充,全局回调,需要继承ProducerListener,并重写onSuccess和onError方法

@Component
@Slf4j
public class KafkaSendResultHandler implements ProducerListener {@Overridepublic void onSuccess(ProducerRecord producerRecord,RecordMetadata recordMetadata) {String topic = recordMetadata.topic();int partition = recordMetadata.partition();long offset = recordMetadata.offset();log.info("send success:topic:{} partition:{} offset:{}",topic,partition,offset);}@Overridepublic void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {log.info("send fail : {}", exception.getMessage());}
}

3. 配置自定义分区策略

application.yml中需要指定分区策略的class

spring:#重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中kafka:bootstrap-servers: 192.168.168.160:9092#https://kafka.apache.org/documentation/#producerconfigsproducer:properties:partitioner.class: com.ifun.kafka.producer.config.CustomPartitioner

分区类的实现

@Component
@Slf4j
public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 自定义分区规则(这里假设全部发到0号分区)log.info("自定义分区策略 topic:{} key:{} value:{}",topic,key,value.toString());return 0;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

4. kafka事务提交

如果在发送消息的时候需要创建事务,可以使用KafkaTemplate的executeInTransaction方法来声明事务。

application.yml增加transaction配置

java.lang.IllegalStateException: Producer factory does not support transactionsorg.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record第一个异常是你没有配置transactions
第二个异常是因为你配置的acks不为all
第三个是正常的send方法,但是抛异常了,需要加@Transactional 注解spring:#重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中kafka:bootstrap-servers: 192.168.168.160:9092#https://kafka.apache.org/documentation/#producerconfigsproducer:properties:partitioner.class: com.ifun.kafka.producer.config.CustomPartitioneracks: alltransaction-id-prefix: "IFUN_TX"

发送消息代码

@Test
@Transactional
void sendWithException(){kafkaTemplate.send("ifun","不带事务提交!");kafkaTemplate.executeInTransaction(oper->{oper.send("ifun","带事务的提交");throw new RuntimeException("fail 1");});throw new RuntimeException("fail 2");
}

带事务的提交消息发送失败

 不带事务的消息被成功消费

 5. 消费者配置更详细的配置

@KafkaListener注解说明:

  1. id:唯一标识。如果没有配置,取application.yml中的 consumer.groupId
  2. idIsGroup :默认true,true的话代表该consumer分组group!
  3. groupId:消费者分组。如果不填,取id (idIsGroup=true)作为分组。否则取application.yml中的 consumer.groupId
  4. topic 与 topicPartitions 不能共用。
  5. topic:类似于subscripe订阅模式。
  6. topicPartitions类似于assign手动分配模式。
@KafkaListener(id = "ifun-001",groupId = "ifun-01", topicPartitions={@TopicPartition(topic = "ifun1",partitions = {"0"}),@TopicPartition(topic = "ifun2",partitions = {"0"},partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))}
)
public void onTopicsMessage(ConsumerRecord<String, String> record){log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
}

解释:这里定义了消费者id为ifun-001,消费者组id为ifun-01,同时监听两个topic,ifun1和ifun2,其中监听ifun1的0号分区,ifun2的0号和1号分区,其中1号分区开始的offset为8,也就是说如果next-offset大于8就会消费,小于8不会消费。

6. 消费者批量消费

需要在application.yml中开启批量消费,sping.kafka.listener.type: batch 监听类型为batch,spring.kafka.consumer.max-poll-records 批量消费每次最多消费多少条消息,接收消息的时候需要使用List来接收。

spring:#重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中kafka:bootstrap-servers: 192.168.168.160:9092#https://kafka.apache.org/documentation/#consumerconfigslistener:#batch singletype: batchconsumer:group-id: auto-dev #消费者组max-poll-records: 3
@KafkaListener(topics = {"ifun"})
public void onBatchMessage(List<ConsumerRecord<String, String>> records){log.info("批量消费");for (ConsumerRecord<String, String> record : records) {log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());}
}

 7. 消费端手动ack

设置spring.kafka.consumer.enable-auto-commit 为false 的时候 spring.kafka.listener.ack-mode 才会生效,设置为手动的manual表示手动

spring:#重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中kafka:bootstrap-servers: 192.168.168.160:9092#https://kafka.apache.org/documentation/#consumerconfigslistener:#batch singletype: batch# 手动确认模式 RECORD,  BATCH,  TIME, COUNT, COUNT_TIME, MANUAL, MANUAL_IMMEDIATE;ack-mode: manualconsumer:#消费者组 idgroup-id: auto-dev max-poll-records: 3#是否自动提交偏移量offsetenable-auto-commit: false 

消费代码

@KafkaListener(topics = {"ifun"})
public void onBatchMessage(List<ConsumerRecord<String, String>> records, Acknowledgment ack){try{log.info("批量消费");for (ConsumerRecord<String, String> record : records) {log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());}}finally {ack.acknowledge();}
}

如果没有ack,那么会出现如下情况:

  1. 如果在消费kafka的数据过程中,一直没有提交offset,那么在此程序运行的过程中它不会重复消费。但是如果重启之后,就会重复消费之前没有提交offset的数据。
  2. 如果在消费的过程中有几条或者一批数据数据没有提交offset,后面其他的消息消费后正常提交offset,那么服务端会更新为消费后最新的offset,不会重新消费,就算重启程序也不会重新消费。
  3. 消费者如果没有提交offset,程序不会阻塞或者重复消费,除非在消费到这个你不想提交offset的消息时你尝试重新初始化一个客户端消费者,即可再次消费这个未提交offset的数据。因为客户端也记录了当前消费者的offset信息,所以程序会在每次消费了数据之后,自己记录offset,而手动提交到服务端的offset与这个并没有关系,所以程序会继续往下消费。在你重新初始化客户端消费者之后,会从服务端得到最新的offset信息记录到本地。所以说如果当前的消费的消息没有提交offset,此时在你重新初始化消费者之后,可得到这条未提交消息的offset,从此位置开始消费。
     

9. 消费异常捕获

配置ConsumerAwareListenerErrorHandler 处理类,在listener上设置errorHandler属性为ConsumerAwareListenerErrorHandler的BeanName

@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {return (message, exception, consumer) -> {System.out.println("消费异常:"+message.getPayload());return null;};
}
@KafkaListener(topics = {"ifun"}, errorHandler = "consumerAwareErrorHandler")
public void onBatchMessage(List<ConsumerRecord<String, String>> records, Acknowledgment ack){try{log.info("批量消费");for (ConsumerRecord<String, String> record : records) {log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());throw new RuntimeException("消费异常");}}finally {ack.acknowledge();}
}

显示结果如下:

10. 配置消息过滤器

消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。

需要为监听器工厂配置一个RecordFilterStrategy,返回true的时候消息将被抛弃,返回false会正常抵达监听器。

然后在监听器上设置containerFactory属性为配置的过滤器工厂类

@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory);// 被过滤的消息将被丢弃factory.setAckDiscarded(true);// 消息过滤策略factory.setRecordFilterStrategy(consumerRecord -> {if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {return false;}//返回true消息则被过滤return true;});return factory;
}
@KafkaListener(topics = {"ifun"},containerFactory = "filterContainerFactory")
public void onMessage(ConsumerRecord<String, String> record){log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
}

过滤结果

11. 消息转发

Topic A 收到消息后将消息转发给Topic B,使用@SendTo注解即可

@KafkaListener(topics = {"ifun"})
@SendTo("ifun1")
public String onMessage(ConsumerRecord<String, String> record){log.info("topic {} 收到需要转发的消息:{}",record.topic(), record.value());return record.value()+" 【forward message】";
}@KafkaListener(topics = {"ifun1"})
public void onIFun1Message(ConsumerRecord<String, String> record){log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
}

 结果如下,可以看到消息先记过第一个topic,然后转发给了第二个topic

 12. 设置json序列化方式生产和消费消息

消费端配置如下

spring:#重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中kafka:bootstrap-servers: 192.168.168.160:9092#https://kafka.apache.org/documentation/#consumerconfigsconsumer:#消费者组 idgroup-id: auto-devkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.springframework.kafka.support.serializer.JsonDeserializerproperties:spring:json:trusted:# 配置json反序列化信任的包packages: '*'

生产端配置如下

spring:#重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中kafka:bootstrap-servers: 192.168.168.160:9092#https://kafka.apache.org/documentation/#producerconfigsproducer:#key的编解码方法key-serializer: org.apache.kafka.common.serialization.StringSerializer#value的编解码方法value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

编写消息实体类

@Data
public class UserInfo implements Serializable {private Long id;private String name;private Integer age;
}

发送消息

@Test
void sendMessage(){UserInfo userInfo = new UserInfo();userInfo.setAge(21);userInfo.setId(1L);userInfo.setName("Jack");kafkaTemplate.send("ifun",userInfo);
}

消费消息

@KafkaListener(topics = {"ifun"})
public void onMessage(UserInfo userInfo){log.info("消息:{}",userInfo);
}

消费结果如下:

 注意:发送的类要和消费的类的全类名一致才行,不能是类名一样,字段一样,但是包名不一样,这样会抛异常。

四、kafka其他配置

spring:#重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中kafka:bootstrap-servers: xx.xx.xx.xx:9092#https://kafka.apache.org/documentation/#producerconfigs#生产者配置producer:bootstrap-servers: xx.xx.xx.xx:9092#生产者发送消息失败重试次数retries: 1# 同一批次内存大小(默认16K)batch-size: 16384 #生产者内存缓存区大小(300M = 300*1024*1024)buffer-memory: 314572800#acks=0:无论成功还是失败,只发送一次。无需确认#acks=1:即只需要确认leader收到消息#acks=all或-1:ISR + Leader都确定收到acks: 1#key的编解码方法key-serializer: org.apache.kafka.common.serialization.StringSerializer #value的编解码方法value-serializer: org.apache.kafka.common.serialization.StringSerializer #开启事务,但是要求ack为all,否则无法保证幂等性#transaction-id-prefix: "IFUN_TX"#额外的,没有直接有properties对应的参数,将存放到下面这个Map对象中,一并初始化properties:#自定义拦截器,注意,这里是classes(先于分区器)interceptor.classes: cn.com.controller.TimeInterceptor#自定义分区器#partitioner.class: com.alibaba.cola.kafka.test.customer.inteceptor.MyPartitioner#即使达不到batch-size设定的大小,只要超过这个毫秒的时间,一样会发送消息出去linger.ms: 1000#最大请求大小,200M = 200*1024*1024max.request.size: 209715200#Producer.send()方法的最大阻塞时间(115秒)max.block.ms: 115000#该配置控制客户端等待请求响应的最长时间。#如果超时之前仍未收到响应,则客户端将在必要时重新发送请求,如果重试次数(retries)已用尽,则会使请求失败。 #此值应大于replica.lag.time.max.ms(broker配置),以减少由于不必要的生产者重试而导致消息重复的可能性。request.timeout.ms: 115000#等待send回调的最大时间。常用语重试,如果一定要发送,retries则配Integer.MAX#如果超过该时间:TimeoutException: Expiring 1 record(s) .. has passed since batch creationdelivery.timeout.ms: 120000#https://kafka.apache.org/documentation/#consumerconfigs#消费者配置consumer:bootstrap-servers: xx.xx.xx.xx:9092#消费者组idgroup-id: default-group#消费方式: earliest:从头开始消费 latest:从最新的开始消费,默认latestauto-offset-reset: earliest #是否自动提交偏移量offsetenable-auto-commit: false#前提是 enable-auto-commit=true。自动提交的频率auto-commit-interval: 1s #key 解码方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer#value 解码方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer#最大消费记录数max-poll-records: 2properties:#如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}session.timeout.ms: 120000#最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡max.poll.interval.ms: 300000#配置控制客户端等待请求响应的最长时间。 #如果在超时之前没有收到响应,客户端将在必要时重新发送请求,#或者如果重试次数用尽,则请求失败。request.timeout.ms: 60000#服务器返回的最大数据量,不能超过admin的message.max.bytes单条数据最大大小max.partition.fetch.bytes: 1048576#订阅或分配主题时,允许自动创建主题。0.11之前,必须设置falseallow.auto.create.topics: true# 如果设置的json解码器,需要配置所信任的包名spring:json:trusted:packages: '*'#监听器配置listener:#当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效#manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交ack-mode: manual_immediate#如果至少有一个topic不存在,true启动失败。false忽略missing-topics-fatal: true #单条消费 single 批量消费batch #批量消费需要配合 consumer.max-poll-recordstype: batch#配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲concurrency: 2 template:default-topic: "default-topic"

五、总结

kafka的简单使用就到此结束了,和rabbitmq还是有挺大的区别的。大家快去试试吧。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_9374.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis介绍和安装

Redis介绍 Redis是一个开源的、基于Key-Value(键-值&#xff09;存储的NoSQL数据库。Redis因其丰富的数据结构、极快的速度、齐全的功能而为人所知&#xff0c;它是目前内存数据库方面的事实标准&#xff0c;是目前使用广泛的开源缓存中间件。 Redis特点 结构丰富&#xff0…

CS231a课程笔记:Lecture2 Camera Models

关于齐次坐标&#xff1a;(15条消息) 为什么要引入齐次坐标&#xff0c;齐次坐标的意义&#xff08;一&#xff09;_追求卓越583的博客-CSDN博客_齐次坐标的意义(15条消息) 为什么要引入齐次坐标&#xff0c;齐次坐标的意义&#xff08;二&#xff09;_追求卓越583的博客-CSDN博…

DNS 解析流程

一、背景 最近&#xff0c;在S3协议项目中调研通过DNS域名解析处理流量负载均衡问题。原来对dns也有一些粗浅的了解&#xff0c;知道通过DNS可以将域名转换为IP地址&#xff0c;也可以做负载均衡。但是DNS的解析流程以及缓存等机制&#xff0c;只是一知半解。正好&#xff0c;…

windows安装nginx并设置开机自启动

在macOS和linux中使用nginx我早已经轻车熟路。突然切到windows的环境中&#xff0c;我反而不会用了。 之前写了《windows使用nginx探索笔记》内容比较冗长&#xff0c;所以本文尽量精简一下。 环境 操作系统&#xff1a;windows 2008R2 Datacenter 已经安装的软件&#xff1…

C语言中malloc(),free(),calloc(),realloc()

申请内存malloc()在申请内存时不会对内存进行初始化赋值 在申请内存后&#xff0c;没有对内存进行初始化的话&#xff0c;这段内存中就存储着系统随机值。 int n 5; int* p (int*)malloc(n * sizeof(int));malloc(size):size就是你想开辟的内存的字节大小。我们通常想要用这段…

SpringCloud基础6——分布式事务,Seata

用于复习快速回顾。 目录 1.分布式事务问题 1.1.本地事务&#xff0c;ACID原则 1.2.分布式事务 1.3.演示分布式事务问题 2.理论基础 2.1.CAP定理 2.1.1.一致性&#xff0c;数据同步 2.1.2.可用性&#xff0c;节点正常访问 2.1.3.分区容错 2.1.4.矛盾 2.2.BASE理论 …

vulnhub-xxe lab: 1

ifconfig nmap 192.168.61.0/24 找到192.168.61.145 目录扫描&#xff08;御剑&#xff09; 192.168.61.145/xxe 192.168.61.145/admin.php 无法访问&#xff0c;但是robots.txt里面写的应该不会是无效网站&#xff0c;所以可能是被拒绝访问了 抓xxe的包 可以发现是用xml写的…

[ web基础篇 ] Burp Suite 爆破 Basic 认证密码

&#x1f36c; 博主介绍 &#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 _PowerShell &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【数据通信】 【通讯安全】 【web安全】【面试分析】 &#x1f389;点赞➕评论➕收藏 养成习…

层次选择器

层次选择器 后代选择器简介后代选择器可以选择作为某元素后代的元素(包括儿子,孙子,重孙子) 两个元素之间的层次间隔可以是无限的示例<!DOCTYPE html> <html lang="en"><head><meta charset="UTF-8"><title>Title</t…

怎么把握住股票每天的最佳交易时机?

每个股民都希望自己能够在每天的股价最高点卖出&#xff0c;然后在最低点再买回来&#xff1b;但是怎么去判断最好的交易时机呢&#xff0c;很多人会想很多方法去识别判断最佳交易点&#xff0c;今天给大家分享一种方法&#xff1b;我一直在思考股票交易的底层逻辑是啥&#xf…

如何在基础镜像中安装指定python版本

背景 由于规范要求要使用指定的镜像版本,但是由于该镜像中的python与我使用的版本有差异,怕引起一些不必要的兼容问题,所以我需要自己按基础镜像基础上安装对应版本的python。 Dockerfile 直接上最终dockerfile,为什么这样写,后面说到。 FROM centos:7 # 指定工作目录 WOR…

【2022中国高校计算机大赛 微信大数据挑战赛】Top 1-6 方案总结

前段时间参加了 2022中国高校计算机大赛 微信大数据挑战赛&#xff0c;比赛链接&#xff1a;https://algo.weixin.qq.com/。 由于时间原因精力有限&#xff0c;我们队伍的方案做的比较简陋&#xff1a; 【初赛&#xff1a;rank-18&#xff0c;复赛&#xff1a;rank-40&#xff…

网课查题接口 搜题公众号对接题库教程 (附赠题库接口)

网课查题接口 搜题公众号对接题库教程 &#xff08;附赠题库接口&#xff09; 本平台优点&#xff1a; 多题库查题、独立后台、响应速度快、全网平台可查、功能最全&#xff01; 1.想要给自己的公众号获得查题接口&#xff0c;只需要两步&#xff01; 2.题库&#xff1a; 查…

bm19bm7

为什么不定义如果两点相等呢 等于的话峰值统一取右 以右来比较 波峰就行 不一定是最大的 在这里插入代码片 import java.util.*;public class Solution {/*** 代码中的类名、方法名、参数名已经指定&#xff0c;请勿修改&#xff0c;直接返回方法规定的值即可** * param nums…

微信小程序转为App并上架应用市场

先说说背景吧&#xff0c;笔者开发了一款微信工具类小程序&#xff0c;刚开始&#xff0c;小程序的日访问量和用户数都还可以&#xff0c;但后面慢慢的发现&#xff0c;受限于微信小程序平台规则&#xff0c;很难对用户进行更深入的运营&#xff0c;用户流失问题也将逐渐凸显出…

‘std::thread‘ has not been declared

出现这个问题的原因就是 目前MinGW GCC64还不支持std::thread 这是我的gcc版本 PS D:\MyCode> gcc --version gcc.exe (x86_64-win32-seh-rev0, Built by MinGW-W64 project) 8.1.0 Copyright © 2018 Free Software Foundation, Inc. This is free software; see the s…

七、RequestResponse

Request&Response 第一章 Request 1. 目标 了解Request的概念了解Request的组成部分掌握Request获取请求行的信息掌握Request获取请求头的信息掌握Request获取请求参数掌握解决请求参数乱码掌握Request域对象掌握请求转发 2. 内容 2.1 Request概述 2.1.1 Request的概…

Part16:Pandas的分层索引MultiIndex怎么用?【详解】

Pandas的分层索引Multilndex 1为什么要学习分层索引Multilndex? 1、分层索引:在一个轴向上拥有多个索引层级&#xff0c;可以表达更高维度数据的形式; 2、方便的进行数据筛选&#xff0c;如果有序则性能更好; 3、groupby等操作的结果&#xff0c;如果是多KEY&#xff0c;结…

元宇宙产业委常务副主任委员甘华鸣:关于术语“元宇宙”以及相关问题

【央链知播-编者按&#xff1a;元宇宙产业委常务副主任委员甘华鸣就全国科学技术名词审定委员会元宇宙及核心术语概念研讨会提出的一个观点&#xff0c;发表自己的看法&#xff0c;写了《关于术语“元宇宙”以及相关问题》一文&#xff0c;现转发供元宇宙产业和学术界思考】 以…

老鼠出迷宫

老鼠出迷宫 现有一个图形如下&#xff1a; 要求老鼠在左边第一个位置&#xff0c;走到绿色标的出口橙色为边界不能走。 表盘可以看做是一个[8][7]大小的二维数组&#xff0c;可以用1表示边界&#xff0c;0表示可以走 int [][] arrMap new int[8][7];得到一个数组&#xff1…