Redis入门

Redis 是一个使用 ANSI C 编写的开源、支持网络、基于内存、可选持久性的键值对存储数据库
从 2015 年 6 月开始,Redis 的开发由 Redis Labs 赞助,而2013年5月至2015年6月期间,其开发由 Pivotal 赞助。在2013年5月之前,其开发由 VMware 赞助。
根据月度排行网站 DB-Engines.com 的数据显示,Redis 是最流行的键值对存储数据库

数据模型

Redis 的外围由一个键、值映射的字典构成。与其他非关系型数据库主要不同在于:Redis 中值的类型不仅限于字符串,还支持如下抽象数据类型:

  • 字符串列表
  • 无序不重复的字符串集合
  • 有序不重复的字符串集合
  • 键、值都为字符串的哈希表

值的类型决定了值本身支持的操作。Redis 支持不同无序、有序的列表,无序、有序的集合间的交集、并集等高级服务器端原子操作

更多的基本信息可以查看 wiki:https://zh.wikipedia.org/wiki/Redis

关于缓存&NoSQL

我们所听说的 Redis 都基本是用于缓存,缓存也是非常必要的,除了 Redis 还有一种缓存技术是使用 Memcached ,它出现比 Redis 早,功能比较单一。
但从性能方面来说,也就是从缓存的命中率来说 Memcached 的性能更好,但是和 Redis 的差距并不大,但是 Redis 所提供的功能就更强大了。
重要区别:

  • Memcached 是多线程的
  • Redis 是单线程的

关于 NoSQL 就不多说了,比较出名的 NoSQL 产品还有 MongoDB ,它是基于 JSON 来存储的,这样就能做到不同的记录字段的长度不同


NoSQL 指的是 Not only SQL,也就是不仅仅是 SQL,读作 N、 O、 SQL,不要读 no SQL。
它一般用于高并发的读写,这样的情况传统的关系数据库搞不定,并且有高扩展(横向)、高可用的特性。
具体到 Redis 它虽然查询速度快但是结构性不强,并没有什么两全其美的东西。
应用:
缓存、任务队列、排行榜、数据过期处理、分布式集群的 Session 分离

版本安装

官方其实并没有提供 Windows 版的(厉害),微软基于官方发布的版本进行编译创建了 Windows 的分支,所以一般情况 Windows 版都是比较落后的。
win 下双击运行即可,无需安装,cli 是客户端,默认会进行连接,使用的是 6379 端口,为什么用这个是有一段故事的,想知道就自己 Google 下吧。
另外可以直接安装到服务,这样就不用每次还得打开那个黑窗口了,安装方法在下载包的文档里有现成的,删除服务命令是:sc delete XXName
启动 Redis 后会创建一个 dat 文件,大小和本机的内存一样,但是一般测试没必要搞这么大,在配置文件的 maxmemory 可以修改,例如:maxmemory 200mb
渣渣的我只能先玩 win 版的了….在我下载的 3.2 版本中需要手动进入命令行指定配置文件才能运行:
redis-server.exe "redis.windows.conf"
退出命令:redis-cli.exe shutdown

Linux

下载源码后首先编译 make,为什么编译安装都造,性能好,需要有相应的 gcc 库才可以哦
然后就可以进行安装了:
make PREFIX=/usr/local/redis install
下一步拷贝 conf 配置文件到安装目录就可以了,在配置文件中最好设置为 daemonize yes
这样启动就是后台启动了,启动命令:
redis-server ./redis.conf
可以使用 ps -ef | grep -i redis 来查看是否启动成功。
关闭推荐使用:
redis-cli shutdown ,强制关闭:kill -9 pid

基本命令

除了使用 cli 自动连接,也可以手动进行连接:redis-cli -h 127.0.0.1 -p 6379
然后可以用 ping 来测试下是否正常,正常情况它会回你一个 pong


Redis 默认有 16 个数据库,以数字命名,从 0 开始,并且是不支持修改的,数量可以在配置文件中设置(database)
使用 SELECT 命令来切换数据库,例如:SELECT 0
数据库直接并不是完全隔离的,也就是说当执行 FLUSHALL 命令时,会清空所有数据库的数据,如果只想清空当前数据库的数据,要执行 FLUSHDB
并且,官方建议不设置数据库的密码,安全应该由服务器来保证(并且也不支持设置单个数据库的密码)


下面就开始看看使用频率很高的命令 :

  • keys
    前面说过 Redis 是以键值对存储的,可以想象为一个大 Map,这个命令也就是查询键了!用法如:
    KEYS * 查询所有;
    也可以使用通配符,有四种,? 、 * 、 [] 以及转义符号,至于什么意思,学过正则的都知道哈;
  • exists
    判断 key 是否存在,存在返回 1 不存在返回 0 。
  • del
    可以删除一个或者多个 key,返回的是删除键的个数,键删除了相应的值自然也删除了,删除多个以空格分开.
  • type
    获得键值的数据类型,返回的值可能是 string、hash、list(列表)、set(集合)、zset(有序集合)

  • rename
    就是 key 的重命名了,特别的如果重命名新名称已存在会直接覆盖,所以 Redis 中还有大量的以 nx 结尾的命令,nx 结尾的命令都会进行一些判断,例如:renamenx a b 当 b 这个 key 已经存在时,此操作就不会生效,返回的是 0.

  • set/get/setex
    设置、获取 key;setex 是 set 和 expire 的简写,可以顺便设置生存时间,例如:setex a 100 a
    类似的还有 psetex 只不过时间单位成了毫秒。
  • getrange
    获取值的指定范围的内容,例如 getrange key 0 2 就是获取第 0 个到第 2 个,是个闭合区间,包含 0 和 2;
  • getset
    特点就是先 get 再 set,相当于在 set 的时候把旧的值拿出来了。
  • expire
    设置 key 的生存时间,默认单位为秒,到期后会自动销毁
  • ttl
    查看 key 剩余的生存时间,默认单位秒,如果返回 -1 表示无限制,返回 -2 表示不存在。
  • randomkey
    获取一个随机的 key
  • flash/flashall
    清空当前(全部)数据库
  • info
    查看 Redis 的一些运行信息
  • help
    帮助命令,就不多说了,教给你命令怎么用,有种用法是:help 空格 [tab]

字符串数据类型

字符串数据类型是 Redis 最基本的数据类型了,它能存储任何形式的字符串,包括二进制数据;允许存储的数据容量最大 512 MB
存取字符串用的就是 set/get 命令了,还有一个 MGET/MSET 这个命令可以批量读取/设置值(MSET k1 v1 k2 v2);特别的这个指令也支持 msetnx ,这有点像事务了,如果其中一个 key 已存在,那么整个批量操作都会失败。
INCR递增命令,并且会返回递增后的值,默认每次递增的是 1,如需特殊指定就是 INCRBY name 2 这样就会每次递增二,如果不存在就会先初始化为 0 然后再递增;
相应的 DECR 就是递减了,比如: DECR id 就是递减 id 这个 key,默认也是每次一,同样也可以指定递减多少,用法和上面一样;
APPEND往尾部追加内容,用在这里就是追加字符串内容,比如:APPEND name 233 ,返回是的追加后的字符串的长度。
STRLEN 获取字符串的长度,没啥可说的。
还有一个是 getset 先获取值再设置值

Hash

其结构可以比作 Java 里的 Map<String, String>
常用的几个命令有:
获取/存储值

1
2
3
4
5
6
hset key name1 value1
hget key name1

# 批量存储/获取
hmset key name1 value1 name2 val2
hmget key name1 name2

其他的一些指令就一起说了吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 获取全面的属性和值
hgetall key

# 删除多个值,删除 key 用 del 哦
hdel key name1 name2

# 递增递减都差不多,举一个栗子,前提是 age 这个值(val)要有,并且是数字类型
hincrby key age 5

# 判断属性是否存在
hexists key name

# 属性的个数
hlen key

# 获取所有的属性名
hkeys key

# 获取所有的值
hvals key

常用的一般就是这些吧

List

同样它也有两种形式,数组的和链表的;特点和数据结构中的也一致

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 两端添加,一个是左边一个是右边
lpush key a b c # c 就在最左边
rpush key a b c

# 两端弹出
rpop key
lpop key

# 查看list
lrange key start end
例如: lrange key 0 -1 # 左边数,第一个到最后一个

# 查看长度
llen key

# 插入到头部,如果 key 不存在就不插入,不会自动创建
lpushx key a
rpushx key a

# 删除,count 的负号表示方向
lrem key count val
lrem key 2 3 # 删除2个3
lrem key -2 1 # 从后面(负号)删除2个1
lrem key 0 2 # 删除所有的2

# 修改值
lset key 3 v # 在第四个位置修改为 v;从 0 开始数

# 插入
linsert key before b ll # 在 b 的前面插入 ll
linsert key after b ll

# 其他
rpoplpush key1 key2 # 把 key1 的右边弹出,添加到 key2 的左边

push 返回的是长度,pop 返回的是弹出的元素,rpoplpush 这种命令非常适合用于 MQ

Set

相当于无序的 List,并且是唯一的
常用的命令有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 添加删除
sadd key a b c
srem key c

# 查看值
smembers key

# 是否存在?存在返回 1,不存在返回 0
sismember key a

# 相差比较,和顺序有关,可以理解为 key1 - key2
sdiff key1 key2

# 求交集、并集
sinter k1 k2
sunion k1 k2

# 查看数量
scard key

# 获取一个随机值
srandmember key

# 移除一个随机元素,并返回(比如可做订单号缓存池)
spop key

# 其他
sdiffstore k1 k2 k3 # k2 和 k3 的交集存到 k1
sunionstore k1 k2 k3 # k2 和 k3 的并集存到 k1

在做并集交集的处理时非常有优势,因为服务器端的聚合效率更高。
内部还是使用哈希表来实现,时间复杂度是 O(1)

Sorted-set

它存的都是字符串的内容,并且有一个分数与之关联,可用来排序,分数是可重复的,值不可以

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 增加、删除数据(返回插入元素的个数,已有的值分数会被覆盖)
zadd key 10 n1 20 n2
zrem key n1 n2

# 获取分数
zscore key name

# 获取成员的数量
zcard key

# 范围查找
zrange key 0 -1
zrange key 0 -1 withscores # 显示分数
zrevrange key 0 -1 # 从大到小进行排序,默认是从小到大

# 按照范围删除(插入顺序)
zremrangebyrank key 0 4

# 按照分数的范围删除
zremrangebyscore 80 100

# 按照分数排序
zrangebyscore key 0 100
zrangebyscore key 0 100 withscores limit 0 2 # 显示部分数据

# 操作分数
zincrby key 3 name # 给 name 的分数 +3

# 计算区间,80-90 之间有几个
zcount key 80 90

可以用来做热点话题和游戏排名之类的;
同上,使用哈希表实现,通过分数来保证顺序(默认从小到大),时间复杂度也是 O(1)

生存时间

前面说过,Redis 基本是用来做缓存的,并且它是基于内存的,所以当然有必要设置生存时间了;
设置生存时间(PEXPIRE 可以设置毫秒):
EXPIRE Key seconds
然后可以使用 TTL 来查询,返回的是剩余的生存时间,单位是秒;如果是没有限制返回的是 -1;数据已删除是 -2
清除生存时间(重新设置值也会清除生存时间):
PERSIST key

Java客户端

然后下面就说说在 Java 中的使用,有很多的 Java 客户端支持 Redis,当然这都不是官方的,其中用的比较多的是 jedis;它的使用也非常的简单,导入相应的依赖:

1
2
3
4
5
6
7
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
<type>jar</type>
<scope>compile</scope>
</dependency>

然后最简单是使用:

1
2
3
Jedis jedis = new Jedis("localhost");
jedis.set("foo", "bar");
String value = jedis.get("foo");

它有一个好处是方法和命令的名字是一致的,这是用起来最爽的,后面还会涉及到连接池和集群,具体可以参考 GitHub 上的示例代码
如果使用的是集群(下面会讲)就使用下面的代码操作:

1
2
3
4
Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();
jedisClusterNodes.add(new HostAndPort("127.0.0.1",6379));
JedisCluster jc = new JedisCluster(jedisClusterNodes);
jc.set("foo","bar");

这里的 jc 不需要手动关闭,内部已经自动关闭了

持久化

Redis 提供了两种持久化的方式,一种是 RDB (默认开启),另一种是 AOF,它们既可以单独使用也可以混合使用。
RDB 方式是通过快照完成的,当符合一定条件时Redis会自动将内存中的所有数据进行快照并且存储到硬盘上。进行快照的条件在配置文件中指定,有2个参数构成:时间和改动的键的个数,当在指定时间内被更改的键的个数大于指定数值时就会进行快照。
默认在配置文件中已经有一些配置了,就是 save 开头的;除了自动也可以手动保存,使用 SAVE 或者 BGSAVE 命令,区别就是一个在主进程(会阻塞)一个会 fork 一个子线程进行(需要的物理内存是 Redis 设置的内存的一倍)


Redis 的 AOF 持久化策略是将发送到Redis服务端的每一条命令都记录下来,并且保存到硬盘中的AOF文件,AOF文件的位置和RDB文件的位置相同,都是通过dir参数设置,默认的文件名是appendonly.aof,可以通过appendfilename参数修改。
可以使用 BGREWRITEAOF 命令来重写 AOF 文件

参数介绍
auto-aof-rewrite-percentage 100
当前的AOF文件大小超过上一次重写时的AOF文件大小的百分之多少时会再次进行重写,如果之前没有重写过,则以启动时的AOF文件大小为依据。
auto-aof-rewrite-min-size 64mb
限制了允许重写的最小AOF文件大小,通常在AOF文件很小的时候即使其中有些冗余的命令也是可以忽略的。

文件写入默认情况下会先写入到系统的缓存中,系统每30秒同步一次,才是真正的写入到硬盘,如果在这30秒服务器宕机那数据也会丢失的,Redis可以通过配置来修改同步策略:
appendfsync always 每次都同步 (最安全但是最慢)
appendfsync everysec 每秒同步 (默认的同步策略)
appendfsync no 不主动同步,由操作系统来决定 (最快但是不安全)

主从复制(读写分离)

和数据库的读写分离是类似的,主要是解决读取压力过大的问题,以及….避免宕机
相比来说,比 MySQL 数据库要简单的多,如果是在一台机器做测试,除了修改端口 pidfile 也要改,不能相同

设置主从:

  • redis.conf 配置文件中设置 slaveof
  • 客户端内执行:slaveof <masterip> <masterport>
    重启后将会失效

然后可以使用 NFO replication 查看关系,默认的从库只能读取不能写入,这样更合理些,除了设置一主多从还可以设置主从从的架构,就是 A 的从是 B ,B 的从是 C,这样就减轻了主库的压力
同步的原理:

  1. 当从库和主库建立MS关系后,会向主数据库发送SYNC命令;
  2. 主库接收到SYNC命令后会开始在后台保存快照(RDB持久化过程),并将期间接收到的写命令缓存起来;
  3. 当快照完成后,主Redis会将快照文件和所有缓存的写命令发送给从Redis;
  4. 从Redis接收到后,会载入快照文件并且执行收到的缓存的命令;
  5. 之后,主Redis每当接收到写命令时就会将命令发送从Redis,从而保证数据的一致;

如果硬盘的 IO 不好会拖累性能,从 2.8.18 的版本开始,Redis 支持无盘复制,就是直接通过网络传输,不过这个功能目前好像不是很稳定,开启无磁盘复制:repl-diskless-sync yes

哨兵

当从库宕机后,只需要重启就可以了,会自动进行同步(增量同步,主库会记录上一次同步的偏移量)
当主库宕机就比较麻烦了,解决方案是:

  1. 在从数据库中执行 SLAVEOF NO ONE 命令,断开主从关系并且提升为主库继续服务
  2. 将主库重新启动后,执行 SLAVEOF 命令,将其设置为其他库的从库,这时数据就能更新回来

然后手动敲不能保证正确(避免人肉运维),所以 Redis 就提供了哨兵的功能,就是对所有的 Redis 进行监控,可以设置多个哨兵,它们也会互相监控,看看对方是不是挂了,哨兵肯定是独立的线程
具体的配置就不贴了,太多了,总之就是检查到主库宕机后会自动执行上面的方案

关于集群

现在一般有两种,一种是官方的,一种是分片式的,当然是官方的好了,但是由于在 3.0+ 的版本官方才支持,所以在以前都是玩分片式的集群

分片式集群

原理其实就是计算 key 的哈希值来进行存储(到相应的 Redis 数据库),这样就会有一个问题:无法动态的增加、减少服务节点,因为毕竟节点的数量涉及到哈希的计算,其实在读取的时候也会涉及到哈希的计算,要不然它怎么知道去那一台找

官方提供的集群

需要 3.0 + 的版本哦,并且需要前面的主从复制知识
如果是在一台机器上测试,只需要拷贝不同的配置文件,然后启动的时候到相应的目录指定即可
设置集群主要是在配置文件开启:

1
2
3
4
# 开启集群
cluster-enabled yes
# 指定集群的配置文件
cluster-config-file "nodes-xxxx.conf"

然后就是使用一个官方提供的 Ruby 脚本,运行下就好了。
分布式的原理为通过插槽的分配确定存储位置,特点有:

  • 所有的redis节点彼此互联(PING-PONG机制),内部使用二进制协议优化传输速度和带宽.
  • 节点的fail是通过集群中超过半数的节点检测失效时才生效
  • 客户端与redis节点直连,不需要中间proxy层.客户端不需要连接集群所有节点,连接集群中任何一个可用节点即可
  • redis-cluster把所有的物理节点映射到[0-16383] slot(插槽)上, cluster 负责维护 node <–>slot <–> value

一般情况插槽会平均分配到各个 Redis,存储数据的时候根据 key 来计算插槽值(当 key 有闭合的大括号时,大括号中的数据为有效值),然后做相应的存储,这样就需要在使用客户端的时候加一个 -c 的命令,设置为自动跟踪重定向,也就是当插槽值不在当前数据库时自动切换,所以直连一个就可以了

当一半以上的服务器 PING 不通某一个服务器(当一个服务器 PING 不通就将其标记为疑似下线),这个服务器就会被标记为下线,同时插槽出现空档,整个集群被标记为不可用。
解决方案可以和前面的主从联系起来,将每个节点设置为主从架构,这样就能保证高可用和负载均衡。
集群中的节点只能使用 0 号数据库,切换数据库(SELECT)会报错

拓展

下面都是在 Java 中经常使用的一些代码了,使用的客户端为 jedis

与Spring整合

在 Spring 的配置文件中定义下面几个 bean,就不需要进行实例化了,之间注入使用即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd" >

<!-- 连接池配置 -->
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxTotal" value="${redis.MaxTotal}" />
</bean>

<bean id="shardedJedisPool" class="redis.clients.jedis.ShardedJedisPool">
<constructor-arg index="0" ref="jedisPoolConfig" />
<constructor-arg index="1" >
<list>
<bean class="redis.clients.jedis.JedisShardInfo">
<constructor-arg index="0" value="${redis.node1.host}" />
<constructor-arg index="1" value="${redis.node1.port} "/>
</bean>
</list>
</constructor-arg>
</bean>
</beans>
<!--
redis.MaxTotal=50
redis.node1.host=127.0.0.1
redis.node1.port=6379
-->

然后创建一个 Service 来统一操作 Redis:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@Service
public class RedisService {

@Autowired(required=false) // 设置为不是必须的,这样即使没有配相关的 bean 也不会抛异常
private ShardedJedisPool shardedJedisPool;

// 对重复的代码进行封装
private <T> T execute(Function<T, ShardedJedis> fun) {
ShardedJedis shardedJedis = null;
try {
// 从连接池中获取到jedis分片对象
shardedJedis = shardedJedisPool.getResource();
// 从redis中获取数据后进行回调,这时输入已经确定
return fun.callback(shardedJedis);
} finally {
if (null != shardedJedis) {
// 关闭,检测连接是否有效,有效则放回到连接池中,无效则重置状态
shardedJedis.close();
}
}
}

// 保存字符串到数据库(set操作)
public String set(String key, String value) {
// 这里就确定了返回值
return this.execute(new Function<String, ShardedJedis>() {
@Override
public String callback(ShardedJedis e) {
return e.set(key, value);
}
});
}

// 保存字符串到数据库(set操作),并且设置生存时间
public String set(String key, String value, int seconds) {
return this.execute(new Function<String, ShardedJedis>() {
@Override
public String callback(ShardedJedis e) {
String result = e.set(key, value);
e.expire(key, seconds);
return result;
}
});
}

// 从数据库获取数据
public String get(String key) {
return this.execute(new Function<String, ShardedJedis>() {
@Override
public String callback(ShardedJedis e) {
return e.get(key);
}
});
}

// 删除一条(缓存)数据
public Long del(String key) {
return this.execute(new Function<Long, ShardedJedis>() {
@Override
public Long callback(ShardedJedis e) {
return e.del(key);
}
});
}

// 设置生存时间, 单位秒
public Long expire(String key, int seconds) {
return this.execute(new Function<Long, ShardedJedis>() {
@Override
public Long callback(ShardedJedis e) {
return e.expire(key, seconds);
}
});
}
}

其中为了封装重复的代码达到复用的目的,使用了 js 的回调思想(什么设计模式来….),涉及的接口很简单:

1
2
3
4
public interface Function<T, E> {
// 简单理解为 T 为输出;E 为输入
public T callback(E e);
}

其实还是很好理解的。
嗯,到这里就差不多了吧,应该很全了,如果不全等以后用到了再来补充吧

评论框加载失败,无法访问 Disqus

你可能需要魔法上网~~