Redis学习
作为一个程序员,你没有办法不学Redis
redis是一个NoSql的(远程字典服务的,key_value的数据库)
redis 能干嘛
- 内存存储,持久化,内存中是断电就失去,所有说持久化很重要
- 效率高,可以用于高速缓存
- 发布订阅系统
- 地图信息统计
- 计时器,记数器
- 。。。。
特性
- 多样的数据类型(HashMap ,set ,String ,List ,Zset)
- 持久化
- 集群
- 事务
- 。。。。
如何去学习redis?
redis官网 : https://www.redis.net.cn/
安装Redis :
下载redis
docker pull redis创建实例并启动
mkdir -p /mydata/redis/conf touch redis.confdocker run -p 6379:6379 --name redis -v /mydata/redis/data:/data \-v /mydata/redis/conf/redis.conf:/etc/redis/redis.conf \-d redis redis-server /etc/redis/redis.confls 查看当前文件夹下的文件cat redis.conf测试redis docker exec -it redis redis-cli---由于以前的数据全存在内存中,从而我们重复读取get a的时候会导致失败
[root@localhost conf]# cat redis.conf
[root@localhost conf]# docker exec -it redis redis-cli
127.0.0.1:6379> set a b
OK
127.0.0.1:6379> get a
"b" //从而在一次上来的时候这块是null
127.0.0.1:6379> exit--让redis 持久化 ---> pwd
127.0.0.1:6379> exit
[root@localhost conf]# pwd
/mydata/redis/conf
[root@localhost conf]# ls
redis.conf
[root@localhost conf]# vi redis.conf
i
appendonly yes #开启持久化
esc :wqdocker restart redis //重启redis
--设置开机自启动docker 里边容器
sudo docker update <容器名> --restart=always
Redis的基础知识
select 选择数据库keys * 查看所有的keyexists key 查看key是否存在move key id 根据id和key值移除flushdb 清除当前数据库flushAll 清除全部数据库中所有数据set key value 设置Key和valueget key 获得key的值expire key 时间 设置过期时间单位秒ttl key 查看过期时间type key 查看当前key存到value的数据类型
为什么redis端口号为6379。(作者使用一个明星的名字)
redis是单线程的
由于redis很快,官方认为,redis是基于内存操作,CPU并不是Redis的性能瓶颈,Redis的瓶颈是根据机械的内存和网络带宽,既然可以使用单线程来实现,那么就使用单线程了。
Redis为啥使用单线程还是这么快?
- 误区,高性能的服务器不一点全是多线程的。
- 误区2,多线程并不一定都比单线程效率高。 redis是将数据存放在内存中的,故而使用单线程去操作就比较快,多线程会进行CPU上下文切换,耗时,对于内存系统来说,如果没有上下文切换效率就是最高的,多次读写都是在一个cpu上,在内存情况下,这个就是最佳方案。
Redis指令大全
Redis-Key
select 选择数据库keys * 查看所有的keyexists key 查看key是否存在move key id 根据id和key值移除flushdb 清除当前数据库flushAll 清除全部数据库中所有数据set key value 设置Key和valueget key 获得key的值expire key 时间 设置过期时间单位秒ttl key 查看过期时间type key 查看当前key存到value的数据类型incr key每次执行都进行加一操作desc key 每次执行都进行减一操作getRange key 0 3 截取字符串【0,3】getRange key 0 -1 截取全部的字符串和get key 是一样的setrange key 1 xx 替换指定位置开始的字符串settex (set with expire) key 过期时间 value 设置过期时间且值为valuesetnx (set if not expire) key 不存在的时候设置(在分布式中会经常使用)也就是说,不存在的时候会自动创建一个key,如果他存在的话,那么就会创建失败mset key1 value1 key2 value2 ... 可以创建多对key和valuemget key1 key2 key3 ...可以同时获取多个值msetnx k1 v1 k2 v2 ...是一个原子性操作,要么一起成功,要么一起失败。
Redis的数据类型(String,hash,list,set,zset)
Redis是一个开源(BSD许可),内存存储的数据结构服务器,可用作数据库,高速缓存和消息队列代理。它支持字符串、哈希表、列表、集合、有序集合,位图,hyperloglogs等数据类型。内置复制、Lua脚本、LRU收回、事务以及不同级别磁盘持久化功能,同时通过Redis Sentinel提供高可用,通过Redis Cluster提供自动分区。
Redis 键(key) 命令
命令 | 描述 |
---|---|
Type 命令 | 返回 key 所储存的值的类型。 |
PEXPIREAT | 设置 key 的过期时间亿以毫秒计。 |
PEXPIREAT | 设置 key 过期时间的时间戳(unix timestamp) 以毫秒计 |
Rename | 修改 key 的名称 |
PERSIST | 移除 key 的过期时间,key 将持久保持。 |
Move | 将当前数据库的 key 移动到给定的数据库 db 当中。 |
RANDOMKEY | 从当前数据库中随机返回一个 key 。 |
Dump | 序列化给定 key ,并返回被序列化的值。 |
TTL | 以秒为单位,返回给定 key 的剩余生存时间(TTL, time to live)。 |
Expire | seconds 为给定 key 设置过期时间。 |
DEL | 该命令用于在 key 存在是删除 key。 |
Pttl | 以毫秒为单位返回 key 的剩余的过期时间。 |
Renamenx | 仅当 newkey 不存在时,将 key 改名为 newkey 。 |
EXISTS | 检查给定 key 是否存在。 |
Expireat | EXPIREAT 的作用和 EXPIRE 类似,都用于为 key 设置过期时间。 不同在于 EXPIREAT 命令接受的时间参数是 UNIX 时间戳(unix timestamp)。 |
Keys | 找所有符合给定模式( pattern)的 key 。 |
Redis 字符串(String) 命令
命令 | 描述 |
---|---|
Setnx | 只有在 key 不存在时设置 key 的值。 |
Getrange | 返回 key 中字符串值的子字符 |
Mset | 同时设置一个或多个 key-value 对。 |
Setex | 将值 value 关联到 key ,并将 key 的过期时间设为 seconds (以秒为单位)。 |
SET | 设置指定 key 的值 |
Get | 获取指定 key 的值。 |
Getbit | 对 key 所储存的字符串值,获取指定偏移量上的位(bit)。 |
Setbit | 对 key 所储存的字符串值,设置或清除指定偏移量上的位(bit)。 |
Decr | 将 key 中储存的数字值减一。 |
Decrby | key 所储存的值减去给定的减量值(decrement) 。 |
Strlen | 返回 key 所储存的字符串值的长度。 |
Msetnx | 同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在。 |
Incrby | 将 key 所储存的值加上给定的增量值(increment) 。 |
Incrbyfloat | 将 key 所储存的值加上给定的浮点增量值(increment) 。 |
Setrange | 用 value 参数覆写给定 key 所储存的字符串值,从偏移量 offset 开始。 |
Psetex | 这个命令和 SETEX 命令相似,但它以毫秒为单位设置 key 的生存时间,而不是像 SETEX 命令那样,以秒为单位。 |
Append | 如果 key 已经存在并且是一个字符串, APPEND 命令将 value 追加到 key 原来的值的末尾。 |
Getset | 将给定 key 的值设为 value ,并返回 key 的旧值(old value)。 |
Mget | 获取所有(一个或多个)给定 key 的值。 |
Incr | 将 key 中储存的数字值增一。 |
Redis 哈希(Hash) 命令
命令 | 描述 |
---|---|
Hmset | 同时将多个 field-value (域-值)对设置到哈希表 key 中。 |
Hmget | 获取所有给定字段的值 |
Hset | 将哈希表 key 中的字段 field 的值设为 value 。 |
Hgetall | 获取在哈希表中指定 key 的所有字段和值 |
Hget | 获取存储在哈希表中指定字段的值/td> |
Hexists | 查看哈希表 key 中,指定的字段是否存在。 |
Hincrby | 为哈希表 key 中的指定字段的整数值加上增量 increment 。 |
Hlen | 获取哈希表中字段的数量 |
Hdel | 删除一个或多个哈希表字段 |
Hvals | 获取哈希表中所有值 |
Hincrbyfloat | 为哈希表 key 中的指定字段的浮点数值加上增量 increment 。 |
Hkeys | 获取所有哈希表中的字段 |
Hsetnx | 只有在字段 field 不存在时,设置哈希表字段的值。 |
Redis 列表(List) 命令
命令 | 描述 |
---|---|
Lindex | 通过索引获取列表中的元素 |
Rpush | 在列表中添加一个或多个值 |
Lrange | 获取列表指定范围内的元素 |
Rpoplpush | 移除列表的最后一个元素,并将该元素添加到另一个列表并返回 |
Blpop | 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。 |
Brpop | 移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。 |
Brpoplpush | 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。 |
Lrem | 移除列表元素 |
Llen | 获取列表长度 |
Ltrim | 对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区间之内的元素都 |
Lpop | 出并获取列表的第一个元素 |
Lpushx | 将一个或多个值插入到已存在的列表头部 |
Linsert | 在列表的元素前或者后插入元素 |
Rpop | 移除并获取列表最后一个元素 |
Lset | 通过索引设置列表元素的值 |
Lpush | 将一个或多个值插入到列表头部 |
Rpushx | 为已存在的列表添加值 |
Redis 集合(Set) 命令
命令 | 描述 |
---|---|
Sunion | 返回所有给定集合的并集 |
Scard | 获取集合的成员数 |
Srandmember | 返回集合中一个或多个随机数 |
Smembers | 返回集合中的所有成员 |
Sinter | 返回给定所有集合的交集 |
Srem | 除集合中一个或多个成员 |
Smove | 将 member 元素从 source 集合移动到 destination 集合 |
Sadd | 向集合添加一个或多个成员 |
Sismember | 判断 member 元素是否是集合 key 的成员 |
Sdiffstore | 返回给定所有集合的差集并存储在 destination 中 |
Sdiff | 返回给定所有集合的差集 |
Sscan | 迭代集合中的元素 |
Sinterstore | 返回给定所有集合的交集并存储在 destination 中 |
Sunionstore | 所有给定集合的并集存储在 destination 集合中 |
Spop | 移除并返回集合中的一个随机元素 |
Redis 有序集合(sorted set) 命令
命令 | 描述 |
---|---|
Zrevrank | 返回有序集合中指定成员的排名,有序集成员按分数值递减(从大到小)排序 |
Zlexcount | 在有序集合中计算指定字典区间内成员数量 |
Zunionstore | 计算给定的一个或多个有序集的并集,并存储在新的 key 中 |
Zremrangebyrank | 移除有序集合中给定的排名区间的所有成员 |
Zcard | 获取有序集合的成员数 |
Zrem | 移除有序集合中的一个或多个成员 |
Zinterstore | 计算给定的一个或多个有序集的交集并将结果集存储在新的有序集合 key 中 |
Zrank | 返回有序集合中指定成员的索引 |
Zincrby | 有序集合中对指定成员的分数加上增量 increment |
Zrangebyscore | 通过分数返回有序集合指定区间内的成员 |
Zrangebylex | 通过字典区间返回有序集合的成员 |
Zscore | 返回有序集中,成员的分数值 |
Zremrangebyscore | 移除有序集合中给定的分数区间的所有成员 |
Zscan | 迭代有序集合中的元素(包括元素成员和元素分值) |
Zrevrangebyscore | 返回有序集中指定分数区间内的成员,分数从高到低排序 |
Zremrangebylex | 移除有序集合中给定的字典区间的所有成员 |
Zrevrange | 返回有序集中指定区间内的成员,通过索引,分数从高到底 |
Zrange | 通过索引区间返回有序集合成指定区间内的成员 |
Zcount | 计算在有序集合中指定区间分数的成员数 |
Zadd | 向有序集合添加一个或多个成员,或者更新已存在成员的分数 |
Redis 连接 命令
命令 | 描述 |
---|---|
Echo | 打印字符串 |
Select | 切换到指定的数据库 |
Ping | 查看服务是否运行 |
Quit | 关闭当前连接 |
Auth | 验证密码是否正确 |
Redis 服务器 命令
命令 | 描述 |
---|---|
Client Pause | 在指定时间内终止运行来自客户端的命令 |
Debug Object | 获取 key 的调试信息 |
Flushdb | 删除当前数据库的所有key |
Save | 异步保存数据到硬盘 |
Showlog | 管理 redis 的慢日志 |
Lastsave | 返回最近一次 Redis 成功将数据保存到磁盘上的时间,以 UNIX 时间戳格式表示 |
Config Get | 获取指定配置参数的值 |
Command | 获取 Redis 命令详情数组 |
Slaveof | 将当前服务器转变为指定服务器的从属服务器(slave server) |
Debug Segfault | 让 Redis 服务崩溃 |
Flushall | 删除所有数据库的所有key |
Dbsize | 返回当前数据库的 key 的数量 |
Bgrewriteaof | 异步执行一个 AOF(AppendOnly File) 文件重写操作 |
Cluster Slots | 获取集群节点的映射数组 |
Config Set | 修改 redis 配置参数,无需重启 |
Command Info | 获取指定 Redis 命令描述的数组 |
Shutdown | 异步保存数据到硬盘,并关闭服务器 |
Sync | 用于复制功能(replication)的内部命令 |
Client Kill | 关闭客户端连接 |
Role | 返回主从实例所属的角色 |
Monitor | 实时打印出 Redis 服务器接收到的命令,调试用 |
Command Getkeys | 获取给定命令的所有键 |
Client Getname | 获取连接的名称 |
Config Resetstat | 重置 INFO 命令中的某些统计数据 |
Command Count | 获取 Redis 命令总数 |
Time | 返回当前服务器时间 |
Info | 获取 Redis 服务器的各种信息和统计数值 |
Config rewrite | 对启动 Redis 服务器时所指定的 redis.conf 配置文件进行改写 |
Client List | 获取连接到服务器的客户端连接列表 |
Client Setname | 设置当前连接的名称 |
Bgsave | 在后台异步保存当前数据库的数据到磁盘 |
Redis 脚本 命令
命令 | 描述 |
---|---|
Script kill | 杀死当前正在运行的 Lua 脚本。 |
Script Load | 将脚本 script 添加到脚本缓存中,但并不立即执行这个脚本。 |
Eval | 执行 Lua 脚本。 |
Evalsha | 执行 Lua 脚本。 |
Script Exists | 查看指定的脚本是否已经被保存在缓存当中。 |
Script Flush | 从脚本缓存中移除所有脚本。 |
Redis 事务 命令
命令 | 描述 |
---|---|
Exec | 执行所有事务块内的命令。 |
Watch | 监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。 |
Discard | 取消事务,放弃执行事务块内的所有命令。 |
Unwatch | 取消 WATCH 命令对所有 key 的监视。 |
Multi | 标记一个事务块的开始。 |
Redis HyperLogLog 命令
命令 | 描述 |
---|---|
Pgmerge | 将多个 HyperLogLog 合并为一个 HyperLogLog |
Pfadd | 添加指定元素到 HyperLogLog 中。 |
Pfcount | 返回给定 HyperLogLog 的基数估算值。 |
Redis 发布订阅 命令
命令 | 描述 |
---|---|
Unsubscribe | 指退订给定的频道。 |
Subscribe | 订阅给定的一个或多个频道的信息。 |
Pubsub | 查看订阅与发布系统状态。 |
Punsubscribe | 退订所有给定模式的频道。 |
Publish | 将信息发送到指定的频道。 |
Psubscribe | 订阅一个或多个符合给定模式的频道。 |
Redis 地理位置(geo) 命令
命令 | 描述 |
---|---|
GEOHASH | 返回一个或多个位置元素的 Geohash 表示 |
GEOPOS | 从key里返回所有给定位置元素的位置(经度和纬度) |
GEODIST | 返回两个给定位置之间的距离 |
GEORADIUS | 以给定的经纬度为中心, 找出某一半径内的元素 |
GEOADD | 将指定的地理空间位置(纬度、经度、名称)添加到指定的key中 |
GEORADIUSBYMEMBER | 找出位于指定范围内的元素,中心点是由给定的位置元素决定 |
如何在Redis中使用悲观锁和乐观锁
乐观锁一般采取的方式:
获取版本号,也就是version
然后更新的时候比较版本号
Redis检测测试
127.0.0.1:6379> select 2 选择数据库
OK
127.0.0.1:6379[2]> set money 100
OK
127.0.0.1:6379[2]> set out 0
OK
127.0.0.1:6379[2]> watch money #监视money对象
OK
127.0.0.1:6379[2]> multi # 事务正常结束,数据没有发生变动,那么就可以正常执行成功
OK
127.0.0.1:6379[2](TX)> decrby money 20
QUEUED
127.0.0.1:6379[2](TX)> incrby out 20
QUEUED
乐观锁
流程
- 先watch key 监视
- 然后 multi 监测事务,监测事务是否发生变化,如果没有变化那就可以执行成功,如果发生变化,那么对不起,执行失败
- 如果发现事务执行失败,那么就先解锁 也就是unwatch
- 获取最新的值,再次监视,select version , watch key . decrby key 值 减少 . incrby key 值 增加
Jedis
🔽是java操作redis中的中间件
<!--导入jedis的包--><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>4.2.1</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.41</version></dependency>public class Test {public static void main(String[] args) {//创建一个new Jedis对象Jedis jedis = new Jedis("192.168.1.100",6379);//jedis所有的指令就是我们学的所有的指令System.out.println(jedis.ping());}
输出:
PONG 说明链接成功
Redis事务
/*
redis的事务处理*/
public class Redis事务 {public static void main(String[] args) {//开启链接Jedis jedis = new Jedis("192.168.1.100",6379);//清空当前数据库jedis.flushDB();//将数据转换为json格式,目前公司大部分使用的均是jsonJSONObject jsonObject = new JSONObject();jsonObject.put("u1","我喜欢");jsonObject.put("u2","你");//开启事务Transaction multi = jedis.multi();String result = jsonObject.toJSONString();// System.out.println(jedis.ping());//如果我需要监控,那么我就需要加一个乐观锁jedis.watch(result);//加一个锁try {multi.set("u1",result);multi.set("u2",result);int t = 1/0;multi.exec();//如果成功执行事务} catch (Exception e) {multi.discard(); //如果失败了,那么就事务回滚,或者事务放弃throw new RuntimeException(e);} finally {System.out.println(jedis.get("u1"));System.out.println(jedis.get("u2"));jedis.close();}//multi.set("u1","老胡");/*HashMap map = new HashMap();String ke = "k1";map.put(ke,"小黑");map.put(ke,"你好");System.out.println(map);System.out.println(map.size());{k1=你好}1
*///连接关闭}
}
SpringBoot整合Redis
springBoot操作数据: spring-data jpa JDBC MONGODB Redis
注意在springBoot 2.x之后,原来的redis都被替换为lettuce
Jedis : 采用的直连,多个线程操作的话是不安全的,如果想要避免不安全,使用jedis pool链接池。BIO
lettuce:采用netty,实例可以在多个线程下使用,不存在线程不安全的情况,可以减少线程数据,更加像NIO模式。
-
导入依赖
<!--导入jedis的包--><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>4.2.1</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.41</version></dependency>
-
配置文件
#配置redisspring:redis:host: 192.168.1.100port: 6379
- 测试
源码分析
@Configuration(proxyBeanMethods = false)@ConditionalOnClass({RedisOperations.class})@EnableConfigurationProperties({RedisProperties.class})@Import({LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class})public class RedisAutoConfiguration {public RedisAutoConfiguration() {}@Bean@ConditionalOnMissingBean( //只有在没有的时候使用,故而,我们可以自定义一个RedisTemplatename = {"redisTemplate"})//默认的,redisTemplate ,没有过多的设置,redis对象都是需要序列化//两个泛型读书Object,Object的类型,我们使用需要强制转换为<String,Obeject>public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {RedisTemplate<Object, Object> template = new RedisTemplate();template.setConnectionFactory(redisConnectionFactory);return template;}@Bean@ConditionalOnMissingBean//由于String是Redis的最常使用的类型,所以但对提出来了一个beanpublic StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {StringRedisTemplate template = new StringRedisTemplate();template.setConnectionFactory(redisConnectionFactory);return template;}}
@SpringBootTestclass RedisSpringBootApplicationTests {@Autowiredprivate RedisTemplate redisTemplate ;@Testvoid contextLoads() {/*redisTemplate 操作不同的数据类型。api和我们的指令是一样的opsForValue 操作字符串,类似StringopsForList 操作list 相当于listopsForSetopsForHashopsForZSetopsForGeoopsForHyperLogLogLog除了基本的操作,我们常用方法都可以使用redisTemplate操作比如事务和基本的操作*//*获取redis的链接对象RedisConnection redisConnection = redisTemplate.getConnectionFactory().getConnection();redisConnection.flushDb();redisConnection.flushAll();*/redisTemplate.opsForValue().set("k",123);redisTemplate.opsForValue().set("k1","罗小黑");System.out.println(redisTemplate.opsForValue().get("k"));System.out.println(redisTemplate.opsForValue().get("k1"));}}
XML 配置信息;
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xh</groupId><artifactId>Redis_SpringBoot</artifactId><version>0.0.1-SNAPSHOT</version><name>Redis_SpringBoot</name><description>Redis_SpringBoot</description><properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.3.7.RELEASE</spring-boot.version></properties><dependencies><!-- 重点jackson,是我们将redis的格式转换为redis格式,方便我们最直观的查看 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.9.6</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId><version>2.9.6</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.9.6</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis-reactive</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>com.taobao.arthas</groupId><artifactId>arthas-spring-boot-starter</artifactId><version>3.4.8</version><scope>runtime</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.3.7.RELEASE</version><configuration><mainClass>com.xh.redis_springboot.RedisSpringBootApplication</mainClass></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build></project>
自定义RedisTemplate
/** @Title 自定义一个Template* @Description * @author 罗小黑* @param null* @return * @date 2022/10/12 19:34* @email */@Configurationpublic class Redis_self_RedisTemplate {@Bean@SuppressWarnings("all")public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory)throws UnknownHostException {RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();template.setConnectionFactory(factory);// 序列化配置// json序列化配置Jackson2JsonRedisSerializer objectJackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);ObjectMapper objectMapper = new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);// String序列化配置StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();template.setHashValueSerializer(objectJackson2JsonRedisSerializer);template.setValueSerializer(objectJackson2JsonRedisSerializer);template.setHashKeySerializer(stringRedisSerializer);template.setKeySerializer(stringRedisSerializer);template.afterPropertiesSet();return template;}}
自定义RedisUtil工具类
package com.xh.redis_springboot.Util;import java.util.*;import java.util.concurrent.TimeUnit;import org.springframework.data.redis.connection.RedisConnection;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Component;import javax.annotation.Resource;/** @Title 我们在使用Redis的时候通常使用我们自己的工具类* @Description 这是一种思想,一种封装重用的思想,避免我们直接去使用redis自带的指令* @author 罗小黑* @param null* @return* @date 2022/10/12 20:19*/@Componentpublic class RedisUtil {@Resourceprivate RedisTemplate<String, Object> redisTemplate;//============================Common=============================/*** 关闭*/public void close() {//获取链接然后导入RedisConnection redisConnection = Objects.requireNonNull(redisTemplate.getConnectionFactory()).getConnection();redisConnection.close();}/*** 清空*/public void flushDB() {RedisConnection redisConnection = Objects.requireNonNull(redisTemplate.getConnectionFactory()).getConnection();redisConnection.flushDb();}/*** 指定缓存失效时间** @param key 键* @param time 时间(秒)* @return*/public void expire(String key, Long time) {try {if (time > 0) {redisTemplate.expire(key, time, TimeUnit.SECONDS);}} catch (Exception e) {e.printStackTrace();}}/*** 根据key 获取过期时间** @param key 键 不能为null* @return 时间(秒) 返回0代表为永久有效*/public Long getExpire(String key) {return redisTemplate.getExpire(key, TimeUnit.SECONDS);}/*** 判断key是否存在** @param key 键* @return true 存在 false不存在*/public Boolean hasKey(String key) {try {return redisTemplate.hasKey(key);} catch (Exception e) {e.printStackTrace();return false;}}/*** 删除缓存** @param key 可以传一个值 或多个*/public void del(String... key) {if (key != null && key.length > 0) {if (key.length == 1) {redisTemplate.delete(key[0]);} else {redisTemplate.delete(Arrays.asList(key));}}}//============================String=============================/*** 普通缓存获取** @param key 键* @return 值*/public Object get(String key) {return key == null ? null : redisTemplate.opsForValue().get(key);}/*** 普通缓存放入** @param key 键* @param value 值* @return true成功 false失败*/public Boolean set(String key, Object value) {try {redisTemplate.opsForValue().set(key, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 普通缓存放入并设置时间** @param key 键* @param value 值* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期* @return true成功 false 失败*/public Boolean set(String key, Object value, Long time) {try {if (time > 0) {redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);} else {set(key, value);}return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 递增** @param key 键* @param delta 要增加几(大于0)* @return*/public Long incr(String key, Long delta) {if (delta < 0) {throw new RuntimeException("递增因子必须大于0");}return redisTemplate.opsForValue().increment(key, delta);}/*** 递减** @param key 键* @param delta 要减少几(小于0)* @return*/public Long decr(String key, Long delta) {if (delta < 0) {throw new RuntimeException("递减因子必须大于0");}return redisTemplate.opsForValue().increment(key, -delta);}//================================Map=================================/*** HashGet** @param key 键 不能为null* @param item 项 不能为null* @return 值*/public Object hget(String key, String item) {return redisTemplate.opsForHash().get(key, item);}/*** 获取hashKey对应的所有键值** @param key 键* @return 对应的多个键值*/public Map<Object, Object> hmget(String key) {return redisTemplate.opsForHash().entries(key);}/*** HashSet** @param key 键* @param map 对应多个键值* @return true 成功 false 失败*/public Boolean hmset(String key, Map<String, Object> map) {try {redisTemplate.opsForHash().putAll(key, map);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** HashSet 并设置时间** @param key 键* @param map 对应多个键值* @param time 时间(秒)* @return true成功 false失败*/public Boolean hmset(String key, Map<String, Object> map, Long time) {try {redisTemplate.opsForHash().putAll(key, map);if (time > 0) {expire(key, time);}return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 向一张hash表中放入数据,如果不存在将创建** @param key 键* @param item 项* @param value 值* @return true 成功 false失败*/public Boolean hset(String key, String item, Object value) {try {redisTemplate.opsForHash().put(key, item, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 向一张hash表中放入数据,如果不存在将创建** @param key 键* @param item 项* @param value 值* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间* @return true 成功 false失败*/public Boolean hset(String key, String item, Object value, Long time) {try {redisTemplate.opsForHash().put(key, item, value);if (time > 0) {expire(key, time);}return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 删除hash表中的值** @param key 键 不能为null* @param item 项 可以使多个 不能为null*/public void hdel(String key, Object... item) {redisTemplate.opsForHash().delete(key, item);}/*** 判断hash表中是否有该项的值** @param key 键 不能为null* @param item 项 不能为null* @return true 存在 false不存在*/public Boolean hHasKey(String key, String item) {return redisTemplate.opsForHash().hasKey(key, item);}/*** hash递增 如果不存在,就会创建一个 并把新增后的值返回** @param key 键* @param item 项* @param by 要增加几(大于0)* @return*/public double hincr(String key, String item, double by) {return redisTemplate.opsForHash().increment(key, item, by);}/*** hash递减** @param key 键* @param item 项* @param by 要减少记(小于0)* @return*/public double hdecr(String key, String item, double by) {return redisTemplate.opsForHash().increment(key, item, -by);}//============================set=============================/*** 根据key获取Set中的所有值** @param key 键* @return*/public Set<Object> sGet(String key) {try {return redisTemplate.opsForSet().members(key);} catch (Exception e) {e.printStackTrace();return null;}}/*** 根据value从一个set中查询,是否存在** @param key 键* @param value 值* @return true 存在 false不存在*/public Boolean sHasKey(String key, Object value) {try {return redisTemplate.opsForSet().isMember(key, value);} catch (Exception e) {e.printStackTrace();return false;}}/*** 将数据放入set缓存** @param key 键* @param values 值 可以是多个* @return 成功个数*/public Long sSet(String key, Object... values) {try {return redisTemplate.opsForSet().add(key, values);} catch (Exception e) {e.printStackTrace();return 0L;}}/*** 将set数据放入缓存** @param key 键* @param time 时间(秒)* @param values 值 可以是多个* @return 成功个数*/public Long sSetAndTime(String key, Long time, Object... values) {try {Long count = redisTemplate.opsForSet().add(key, values);if (time > 0) {expire(key, time);}return count;} catch (Exception e) {e.printStackTrace();return 0L;}}/*** 获取set缓存的长度** @param key 键* @return*/public Long sGetSetSize(String key) {try {return redisTemplate.opsForSet().size(key);} catch (Exception e) {e.printStackTrace();return 0L;}}/*** 移除值为value的** @param key 键* @param values 值 可以是多个* @return 移除的个数*/public Long setRemove(String key, Object... values) {try {return redisTemplate.opsForSet().remove(key, values);} catch (Exception e) {e.printStackTrace();return 0L;}}//===============================list=================================/*** 获取list缓存的内容** @param key 键* @param start 开始* @param end 结束 0 到 -1代表所有值* @return*/public List<Object> lGet(String key, Long start, Long end) {try {return redisTemplate.opsForList().range(key, start, end);} catch (Exception e) {e.printStackTrace();return null;}}/*** 获取list缓存的长度** @param key 键* @return*/public Long lGetListSize(String key) {try {return redisTemplate.opsForList().size(key);} catch (Exception e) {e.printStackTrace();return 0L;}}/*** 通过索引 获取list中的值** @param key 键* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推* @return*/public Object lGetIndex(String key, Long index) {try {return redisTemplate.opsForList().index(key, index);} catch (Exception e) {e.printStackTrace();return null;}}/*** 将list放入缓存** @param key 键* @param value 值* @return*/public Boolean lSet(String key, Object value) {try {redisTemplate.opsForList().rightPush(key, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 将list放入缓存** @param key 键* @param value 值* @param time 时间(秒)* @return*/public Boolean lSet(String key, Object value, Long time) {try {redisTemplate.opsForList().rightPush(key, value);if (time > 0) {expire(key, time);}return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 将list放入缓存** @param key 键* @param value 值* @return*/public Boolean lSet(String key, List<Object> value) {try {redisTemplate.opsForList().rightPushAll(key, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 将list放入缓存** @param key 键* @param value 值* @param time 时间(秒)* @return*/public Boolean lSet(String key, List<Object> value, Long time) {try {redisTemplate.opsForList().rightPushAll(key, value);if (time > 0) {expire(key, time);}return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 根据索引修改list中的某条数据** @param key 键* @param index 索引* @param value 值* @return*/public Boolean lUpdateIndex(String key, Long index, Object value) {try {redisTemplate.opsForList().set(key, index, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 移除N个值为value** @param key 键* @param count 移除多少个* @param value 值* @return 移除的个数*/public Long lRemove(String key, Long count, Object value) {try {return redisTemplate.opsForList().remove(key, count, value);} catch (Exception e) {e.printStackTrace();return 0L;}}}
所有的Redis操作,其实都很简单,主要的是理解redis的思想和其中每一种数据结构的用处和作用。
Redis.conf详细
下载redis docker pull redis创建实例并启动mkdir -p /mydata/redis/conf touch redis.confdocker run -p 6379:6379 --name redis -v /mydata/redis/data:/data \-v /mydata/redis/conf/redis.conf:/etc/redis/redis.conf \-d redis redis-server /etc/redis/redis.confls 查看当前文件夹下的文件cat redis.conf测试redis docker exec -it redis redis-cli---由于以前的数据全存在内存中,从而我们重复读取get a的时候会导致失败[root@localhost conf]# cat redis.conf[root@localhost conf]# docker exec -it redis redis-cli127.0.0.1:6379> set a bOK127.0.0.1:6379> get a"b" //从而在一次上来的时候这块是null127.0.0.1:6379> exit--让redis 持久化 ---> pwd127.0.0.1:6379> exit[root@localhost conf]# pwd/mydata/redis/conf[root@localhost conf]# lsredis.conf[root@localhost conf]# vi redis.conf iappendonly yesesc :wqdocker restart redis //重启redis--设置开机自启动docker 里边容器sudo docker update <容器名> --restart=always
启动的时候通过配置文件处理;
单位
[root@localhost conf]# pwd
/mydata/redis/conf
[root@localhost conf]# ls
redis.conf
[root@localhost conf]# vi redis.conf
- 配置文件unit单位对不敏感
- 包含
- 网络
bind 127.0.0.1 #绑定的IPprotected-mode yes #包含模式port 6379 #端口配置,不推荐修改
- 通用
daemonize yes #以守护进程的方式运行,默认是no,我们需要自己开启为yes!pidfile /var/run/redis_6379.pid #如果以后台程序运行,我们就需要给一个pid文件loglevel notice # 日志文件logfile "" #日志文件位置名databases 16 #数据库的数量,默认是16个数据库always-show-logo yes #是否显示logo
- 快照
持久化,在规定的时间内,执行了多少次操作,则会持久化到文件 .rdb.aof
redis是内存数据库,如果没有持久化,那么数据断电就会丢失
#如果在900秒内,如果有至少一个key 进行修改,我们就需要进行持久化操作
save 900 1
#如果在300秒内,有10个key进行修改,我们就需要进行持久化
save 300 10
#如果60秒内。至少10000key进行修改,我们进行持久化操作
save 60 10000
#之后我们可以自己自定义stop-writes-on-bgsave-error yes #持久化如果出错,是否还需要继续工作!rdbcompression yes #是否压缩rdb文件,需要占用一定的cpu的资源rdbchecksum yes #是否保持rdb文件的时候,进行错误的检查校验dir ./ #rdb文件保存路径#=============================================# 设置密码 config set requirepass "密码"#此时就需要权限了,也就是需要密码了config get requirepass #输入密码auth 密码#此时就可以查看config get requirepass
- 限制客户端
maxclients 10000 #设置能链接上的redis的最大客户端数量
maxmemory #redis 配置最大的内存容量
maxmemory-policy noeviction #内存到达上限之后的处理策略
volatile-lru,针对设置了过期时间的key,使用lru算法进行淘汰。
allkeys-lru,针对所有key使用lru算法进行淘汰。
volatile-random,从所有设置了过期时间的key中使用随机淘汰的方式进行淘汰。
allkeys-random,针对所有的key使用随机淘汰机制进行淘汰。
volatile-ttl,针对设置了过期时间的key,越早过期的越先被淘汰。
noeviction,不会淘汰任何数据,当使用的内存空间超过 maxmemory 值时,再有写请求来时返回错误。
- AOF : APPENDONLY 模式
appendonly no #默认是不开启模式的aof默认是使用rdb的,大部分情况下rdb够用了
appendfilename “appendonly.aof” #持久化的文件的名字 .rdb
#appendfsync always 每次修改都会 sync消耗性能
appendsync everysec #每秒执行一个sync 可能会丢失这个ls的数据
#appendfsync no 不执行sync这个是操作系统自己同步数据,速度最快
redis持久化
AOF:
将我们所有的命令记录下来,history,恢复的时候就把这个文件全部执行一次。
[root@localhost conf]# vi redis.conf i appendonly yes esc :wq
一般是默认不开启的,我们需要手动去开启。
我们一般需要将appendonly 改成yes就行,然后我们就开启了
更新后我们需要重启服务器,
我们可以直接使用vim
appendonly.aof查看我们配置后产生的文件,但是当我们将aof文件进行一定的更改就会导致redis启动不起来了,这个时候我们就需要修复这个aof文件redis给我们提供了一个工具 redis-check-aof --fix
然后按照指令操作,我们就可以完成aof的文件修复。
优点和缺点
优点:
- 每一次修改都同步,文件的完整会更好
- 每秒同步一次,可能会丢失一秒的数据
- 从不同步,效率是最高的
缺点:
- 相对于数据文件来说,AOF大于RDB,修复的速度也比RDB慢!
- AOF运行效率也会比RDB慢,故而我们需要Redis默认的RDB持久化
如果AOF文件大于64MB,也就是太大了,fork一个新进程将对我们的文件进行重写!
RDB:
dump.rdb
redis默认RDB
- save的规则满足的情况下,会自动触发rdb规则
- 执行flushAll命令,也会触发我们rdb规则
- 退出redis也会产生rdb文件
备份就产生一个dump.rdb文件
如何恢复rdb文件
- 只要将rdb文件放在我们redis启动目录就可以,redis启动的时候会自动检查dump,rdb文件,然后恢复文件
- 查看我们需要存在的位置
一般上我们默认使用RDB
优点:
- 适合大规模的数据恢复
- 对数据的完整性要求不高
缺点
- 需要一定的时间间隔进程操作,如果redis意外宕机了,这个最后一次修改的数据就没有了
- fork进程的时候需要占用一定的内存空间
Redis发布订阅
Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。
注意我们需要开启两个客户端,一个做订阅的一个做推送的。
Redis 客户端可以订阅任意数量的频道。
下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:
以下实例演示了发布订阅是如何工作的,需要开启两个 redis-cli 客户端。
在我们实例中我们创建了订阅频道名为 runoobChat:
SUBSCRIBE 名字
redis 127.0.0.1:6379> SUBSCRIBE runoobChat #订阅一个频道,名字runoobChatReading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "runoobChat"
3) (integer) 1
#等待读取推送的消息
现在,我们先重新开启个 redis 客户端,然后在同一个频道 runoobChat 发布两次消息,订阅者就能接收到消息。
PUBLISH 名字 信息
redis 127.0.0.1:6379> PUBLISH runoobChat "Redis PUBLISH test"
(integer) 1
redis 127.0.0.1:6379> PUBLISH runoobChat "Learn redis by runoob.com"
(integer) 1
# 订阅者的客户端会显示如下消息1) "message" #消息
2) "runoobChat" #那个频道的消息
3) "Redis PUBLISH test" # 消息的具体内容1) "message"
2) "runoobChat"
3) "Learn redis by runoob.com"
gif 演示如下:
-
开启本地 Redis 服务,开启两个 redis-cli 客户端。
-
在第一个 redis-cli 客户端输入 SUBSCRIBE runoobChat,意思是订阅 runoobChat 频道。
-
在第二个 redis-cli 客户端输入 PUBLISH runoobChat “Redis PUBLISH test” 往 runoobChat 频道发送消息,这个时候在第一个 redis-cli 客户端就会看到由第二个 redis-cli 客户端发送的测试消息。
1 PSUBSCRIBE pattern 订阅一个或多个符合给定模式的频道。
2 PUBSUB subcommand [argument 查看订阅与发布系统状态。
3 PUBLISH channel message 将信息发送到指定的频道。
4 PUNSUBSCRIBE 退订所有给定模式的频道。
5 SUBSCRIBE channe 订阅给定的一个或多个频道的信息。
6 UNSUBSCRIBE 指退订给定的频道。
redis主从复制(事实上这部分属于redis集群分布)
Redis Replication是一种 master-slave 模式的复制机制,这种机制使得 slave 节点可以成为与 master 节点完全相同的副本,可以采用一主多从或者级联结构 。
主从复制的配置要点:
(1)配从库不配主,从库配置:slaveof 主库IP 主库端口
(2)查看redis的配置信息:info replication
Redis为什么需要主从复制?
使用Redis主从复制的原因主要是单台Redis节点存在以下的局限性:
(1)Redis虽然读写的速度都很快,单节点的Redis能够支撑QPS大概在5w左右,如果上千万的用户访问,Redis就承载不了,成为了高并发的瓶颈。
(2)单节点的Redis不能保证高可用,当Redis因为某些原因意外宕机时,会导致缓存不可用
(3)CPU的利用率上,单台Redis实例只能利用单个核心,这单个核心在面临海量数据的存取和管理工作时压力会非常大。
主从复制的好处:
> (1)数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。
>
> (2)故障恢复:如果master宕掉了,使用哨兵模式,可以提升一个 slave 作为新的 master,进而实现故障转移,实现高可用
>
> (3)负载均衡:可以轻易地实现横向扩展,实现读写分离,一个 master 用于写,多个 slave 用于分摊读的压力,从而实现高并发;
主从复制的缺点:
由于所有的写操作都是先在Master上操作,然后同步更新到Slave上,所以从Master同步到Slave服务器有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,Slave机器数量的增加也会使这个问题更加严重
环境配置:
只配置从库,不配置主库
info replication
查看相关配置库
vim redis79.conf
进入配置文件进行更改改端口port,改后台服务名字pid,改日志,改后台运行,改dbfilename dump.rdb
redis-server kconfig/redis.conf
我们启动相应配置后的redis从库
ps -ef|grep redis
查看后台启动信息
一主二从
刚刚配置后其每个数据库都是主库,只有经过配置后才能保证redis的某个库变成从库。
默认情况之下。每台Redis服务器都是主表
redis-cli -p 6379 #启动redis的客户端
#注意这个是在你要指定作为从机的客户端窗口里边执行的,他选择老大,而不是老大选择他
slaveof 127.0.0.1 6379 #配置主表 简称任命老大
我们通命令配置的,只能是暂时的。
我们可以通Redis.conf完成配置达到永久配置的方式,具体可以百度解决。
replicaof <masterip> ip地址 <masterport>端口号
配置在从机的配置文件内,当服务重新启动,这个表就作为从机了
注意:
当我们配置好主机和从机后,主机可以进行set添加数据,而添加的数据可以直接在从表中查到,但是从表不能添加数据,因为他是从表。
没有配置哨兵的时候:当主机断开链接的时候,从机依旧连接到主机,但是没有写操作,只有读操作,当这个时候主机回来了,那么从机还是保持链接,主机添加数据从机照样读取主机的数据。
如果是使用命令行,那么重新启动后就又会变回主机,但这个时候,只要变回从机,那么就可以立马拿到主机的数据
复制原理
slave启动启动成功链接到master后会发送一个sync同步命令
Master接受到命令后,启动后台的存盘进程,同时收集所有接收到用于修改数据集命令,在后台进程执行完毕后,master将传送整个数据文件到slave,并完成一次完全同步。
全量复制:而slave服务在接到数据库文件数据后,将其存盘并加载到内存中
增量复制:Master继续将新的所有收集到的修改命令依次传给slave,完成同步。
但是只要是重新链接Master,一次完全同步(全部复制)将会自动执行。
层层连接
A->B->C
宕机手动配置主机
如果我们的主机挂掉了,那么我们如何将从机配置成为主机?
我们使用的是slaveof no one让从表自己变成主机,那么其他节点就可以手动连接到最新的这个主机,在哨兵没有创建的时候,一般需要我们手动去配置。如果这个时候老大修复了,那么我们就需要重写连接。
哨兵自动配置
Redis的主从复制模式下, 一旦主节点由于故障不能提供服务, 需要人工将从节点晋升为主节点, 同时还要通知应用方更新主节点地址,
对于很多应用场景这种故障处理的方式是无法接受的。 可喜的是Redis从2.8开始正式提供了Redis Sentinel(哨兵)
架构来解决这个问题。
总结:
Redis主从复制的缺点:没有办法对master进行动态选举,需要使用Sentinel机制完成动态选举
- 哨兵模式介绍
- Sentinel(哨兵)进程是用于监控redis集群中Master主服务器工作的状态在Master主服务器发生故障的时候,可以实现Master和Slave服务器的切换,保证系统的高可用(HA)其已经被集成在redis2.6+的版本中,Redis的哨兵模式到了2.8版本之后就稳定了下来。
- 哨兵进程的作用
- 监控(Monitoring):
- 哨兵(sentinel) 会不断地检查你的Master和Slave是否运作正常。
- 提醒(Notification):
- 当被监控的某个Redis节点出现问题时, 哨兵(sentinel) 可以通过 API 向管理员或者其他应用程序发送通知。
- 监控(Monitoring):
- 自动故障迁移(Automatic failover):
- 当一个Master不能正常工作时,哨兵(sentinel) 会开始一次自动故障迁移操作。
- 它会将失效Master的其中一个Slave升级为新的Master, 并让失效Master的其他Slave改为复制新的Master;
- 当客户端试图连接失效的Master时,集群也会向客户端返回新Master的地址,使得集群可以使用现在的Master替换失效Master。
- Master和Slave服务器切换后,Master的redis.conf、Slave的redis.conf和sentinel.conf的配置文件的内容都会发生相应的改变,即,Master主服务器的redis.conf配置文件中会多一行slaveof的配置,sentinel.conf的监控目标会随之调换。
- 哨兵进程的工作方式
-
每个Sentinel(哨兵)进程以每秒钟一次的频率向整个集群中的Master主服务器,Slave从服务器以及其他Sentinel(哨兵)进程发送一个 PING 命令。
-
如果一个实例(instance)距离最后一次有效回复 PING 命令的时间超过 down-after-milliseconds 选项所指定的值,则这个实例会被 Sentinel(哨兵)进程标记为主观下线(SDOWN)。
-
如果一个Master主服务器被标记为主观下线(SDOWN),则正在监视这个Master主服务器的所有
-
Sentinel(哨兵)进程要以每秒一次的频率确认Master主服务器的确进入了主观下线状态。
-
当有足够数量的 Sentinel(哨兵)进程(大于等于配置文件指定的值)在指定的时间范围内确认Master主服务器进入了主观下线状态(SDOWN), 则Master主服务器会被标记为客观下线(ODOWN)。
-
在一般情况下, 每个Sentinel(哨兵)进程会以每 10 秒一次的频率向集群中的所有Master主服务器、Slave从服务器发送 INFO 命令。
-
当Master主服务器被 Sentinel(哨兵)进程标记为客观下线(ODOWN)时,Sentinel(哨兵)进程向下线的 Master主服务器的所有 Slave从服务器发送 INFO 命令的频率会从 10 秒一次改为每秒一次。
-
若没有足够数量的 Sentinel(哨兵)进程同意 Master主服务器下线, Master主服务器的客观下线状态就会被移除。若 Master主服务器重新向 Sentinel(哨兵)进程发送 PING 命令返回有效回复,Master主服务器的主观下线状态就会被移除。
#配置哨兵
vim sentinel.conf#进入配置文件 按i进入编辑模式
sentinel monitor 名字 ip地址 端口号 1 # 1代表主机挂了之后是谁成为主机
-
1代表slave投票,让谁成为主机票数最多的就是主机
我们关闭主机后,那么哨兵自动将会替我们从表中选择主机。同时也会在日志里边显示slave0 ,slave1等都是从机,master是主机。这个时候如果主机回来了,那么主机就只能当从机了,农民翻身做主人了,你想要成为主人,不太可能了,除非关闭其他从机,然后保留你自己,在关闭主机,这样方可成为主机。
优点与缺点
优点:
- 哨兵集群,基于主从复制模式,所有的主从配置优点他全有
- 主从可以切换,故障可以转移,系统的可用性就会更好
- 哨兵模式就是主从模式的升级,手动到自动,更加健壮
缺点:
- Redis不好在线扩容,集权容量一旦达到上限,在线扩容就会很麻烦
- 实现哨兵模式的配置其实是很麻烦的,里边有很多的选择。
哨兵模式的配置
# Example sentinel.conf# 哨兵sentinel实例运行的端口 默认26379
port 26379# 哨兵sentinel的工作目录
dir /tmp# 哨兵sentinel监控的redis主节点的 ip port
# master-name 可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。
# quorum 配置多少个sentinel哨兵统一认为master主节点失联 那么这时客观上认为主节点失联了# sentinel monitor <master-name> <ip> <redis-port> <quorum>sentinel monitor mymaster 127.0.0.1 6379 2# 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供密码
# 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码# sentinel auth-pass <master-name> <password>
sentinel auth-pass mymaster MySUPER--secret-0123passw0rd# 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒# sentinel down-after-milliseconds <master-name> <milliseconds>
sentinel down-after-milliseconds mymaster 30000# 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行 同步,这个数字越小,完成failover所需的时间就越长,但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。# sentinel parallel-syncs <master-name> <numslaves>
sentinel parallel-syncs mymaster 1# 故障转移的超时时间 failover-timeout 可以用在以下这些方面:
#1. 同一个sentinel对同一个master两次failover之间的间隔时间。
#2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那里同步数据时。
#3.当想要取消一个正在进行的failover所需要的时间。
#4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了
# 默认三分钟
# sentinel failover-timeout <master-name> <milliseconds>
sentinel failover-timeout mymaster 180000# SCRIPTS EXECUTION#配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知相关人员。
#对于脚本的运行结果有以下规则:
#若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10
#若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。
#如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。
#一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。#通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),将会去调用这个脚本,这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常运行的信息。调用该脚本时,将传给脚本两个参数,一个是事件的类型,一个是事件的描述。如果sentinel.conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentinel无法正常启动成功。
#通知脚本# sentinel notification-script <master-name> <script-path>sentinel notification-script mymaster /var/redis/notify.sh# 客户端重新配置主节点参数脚本
# 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已经发生改变的信息。
# 以下参数将会在调用脚本时传给脚本:
# <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
# 目前<state>总是“failover”,
# <role>是“leader”或者“observer”中的一个。
# 参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的slave)通信的
# 这个脚本应该是通用的,能被多次调用,不是针对性的。#一般都是运维来配置:
# sentinel client-reconfig-script <master-name> <script-path>sentinel client-reconfig-script mymaster /var/redis/reconfig.sh
Redis缓存穿透和雪崩
缓存穿透:
用户想查询一个数据,发现redis内存数据库没有,也就是缓存没有命中,于是向着持久层数据库查询,发现也没有,故而本次查询失败,当用户很多的时候,缓存都没有命中,于是都去请求了持久层数据库,这样就会使持久层造成很大压力,这个时候就出现了缓存穿透。
或者
是指查询一个数据库一定不存在的数据。正常的使用缓存流程大致是,数据查询先进行缓存查询,如果key不存在或者key已经过期,再对数据库进行查询,并把查询到的对象,放进缓存。如果数据库查询对象为空,则不放进缓存.
缓存穿透解决思路:
- 如果查询数据库也为空,直接设置一个默认值存放到缓存,这样第二次到缓冲中获取就有值了,而不会继续访问数据库,这种办法最简单粗暴。
- 根据缓存数据Key的规则。例如我们公司是做机顶盒的,缓存数据以Mac为Key,Mac是有规则,如果不符合规则就过滤掉,这样可以过滤一部分查询。在做缓存规划的时候,Key有一定规则的话,可以采取这种办法。这种办法只能缓解一部分的压力,过滤和系统无关的查询,但是无法根治。
- 采用布隆过滤器,将所有可能存在的数据哈希到一个足够大的BitSet中,不存在的数据将会被拦截掉,从而避免了对底层存储系统的查询压力。关于布隆过滤器,详情查看:基于BitSet的布隆过滤器(Bloom Filter)
缓存击穿,
是指一个key非常热点,在不停的扛着大并发,大并发集中对这一个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞。
缓存雪崩
就是指缓存由于某些原因(比如宕机、cache服务挂了或者不响应)整体crash掉了,导致大量请求到达后端数据库,从而导致数据库崩溃,整个系统崩溃,发生灾难。
下面的就是一个雪崩的简单过程:
1、redis集群彻底崩溃
2、缓存服务大量对redis的请求hang住,占用资源
3、缓存服务大量的请求打到源头服务去查询mysql,直接打死mysql
4、源头服务因为mysql被打死也崩溃,对源服务的请求也hang住,占用资源
5、缓存服务大量的资源全部耗费在访问redis和源服务无果,最后自己被拖死,无法提供服务
6、nginx无法访问缓存服务,redis和源服务,只能基于本地缓存提供服务,但是缓存过期后,没有数据提供
7、网站崩渍
缓存雪崩的解决方案
1,采用加锁计数,或者使用合理的队列数量来避免缓存失效时对数据库造成太大的压力。这种办法虽然能缓解数据库的压力,但是同时又降低了系统的吞吐量。
2,分析用户行为,尽量让失效时间点均匀分布。避免缓存雪崩的出现。
3,如果是因为某台缓存服务器宕机,可以考虑做主备,比如:redis主备,但是双缓存涉及到更新事务的问题,update可能读到脏数据,需要好好解决。
什么是分布式 ?
分布式系统一定是由多个节点组成的系统。
其中,节点指的是计算机服务器,而且这些节点一般不是孤立的,而是互通的。
这些连通的节点上部署了我们的节点,并且相互的操作会有协同。
分布式系统对于用户而言,他们面对的就是一个服务器,提供用户需要的服务而已,
而实际上这些服务是通过背后的众多服务器组成的一个分布式系统,因此分布式系统看起来像是一个超级计算机一样。
分布式与集群的区别 ?
集群
集群是指在几个服务器上部署相同的应用程序来分担客户端的请求。
它是同一个系统部署在不同的服务器上,比如一个登陆系统部署在不同的服务器上。
好比 多个人一起做同样的事。
集群主要的使用场景是为了分担请求的压力。
但是,当压力进一步增大的时候,可能在需要存储的部分,比如mysql无法面对大量的“写压力”。
因为在mysql做成集群之后,主要的写压力还是在master的机器上,其他slave机器无法分担写压力,这时,就引出了“分布式”。
分布式
分布式是指多个系统协同合作完成一个特定任务的系统。
它是不同的系统部署在不同的服务器上,服务器之间相互调用。
好比 多个人一起做不同的事。
分布式是解决中心化管理的问题,把所有的任务叠加到一个节点处理,太慢了。
所以把一个大问题拆分为多个小问题,并分别解决,最终协同合作。
分布式的主要工作是分解任务,把职能拆解。
分布式的主要应用场景是单台机器已经无法满足这种性能的要求,必须要融合多个节点,并且节点之间的相关部分是有交互的。
相当于在写mysql的时候,每个节点存储部分数据(分库分表),这就是分布式存储的由来。
存储一些非结构化数据:静态文件、图片、pdf、小视频 … 这些也是分布式文件系统的由来。