作者: 京东物流 刘丽侠 姚再毅 康睿 刘斌 李振

一、Redis的特性

1.1 Redis为什么快?

  • 依据内存操作,操作不需求跟磁盘交互,单次履行很快
  • 指令履行是单线程,由所以依据内存操作,单次履行的时刻快于线程切换时刻,一起通讯选用多路复用
  • Redis本身便是一个k-v结构,相似于hashMap,所以查询功能接近于O(1)
  • 一起redis自己底层数据结构支撑,比方跳表、SDS
  • lO多路复用,单个线程中经过记载盯梢每一个sock(I/O流)的状况来管理多个I/O流

1.2 Redis其他特性

  • 更丰富的数据类型,虽然都是k、v结构,value能够存储很多的数据类型
  • 完善的内存管理机制、确保数据一致性:耐久化机制、过期战略
  • 支撑多种编程言语
  • 高可用,集群、确保高可用

1.3 Redis高可用

  • 很完善的内存管理机制,过期、筛选、耐久化
  • 集群模式,主从、岗兵、cluster集群

二、Redis数据类型以及运用场景

Redis的数据类型有String、Hash、Set、List、Zset、bitMap(依据String类型)、 Hyperloglog(依据String类型)、Geo(地理方位)、Streams流。

2.1 String

2.1.1 根本指令

// 批量设置
> mset key1 value1 key2 value2
// 批量获取
> mget key1 key2
// 获取长度
> strlen key  
//  字符串追加内容
> append key xxx
// 获取指定区间的字符
> getrange key 0 5
// 整数值递加 (递加指定的值)
> incr intkey (incrby intkey 10)
// 当key 存在时将掩盖
> SETEX key seconds value
// 将 key 的值设为 value ,当且仅当 key 不存在。
> SETNX key value

2.1.2 运用场景

  1. 缓存相关场景 缓存、 token(跟过期属性完美符合)
  1. 线程安全的计数场景 (软限流、分布式ID等)

2.2 Hash

2.2.1 根本指令

// 将哈希表 key 中的域 field 的值设为 value 。
> HSET key field value
// 回来哈希表 key 中给定域 field 的值。
>  HGET key field
// 回来哈希表 key 中的一切域。
> HKEYS key
// 回来哈希表 key 中一切域的值。
>  HVALS key
// 为哈希表 key 中的域 field 的值加上增量 increment 。
> HINCRBY key field increment
// 查看哈希表 key 中,给定域 field 是否存在。
> HEXISTS key field

2.2.2 运用场景

  1. 存储目标类的数据(官网说的)比方一个目标下有多个字段
  1. 统计类的数据,能够对单个统计数据进行独自操作

2.3 List

存储有序的字符串列表,元素能够重复。列表的最大长度为 2^32 – 1 个元素(4294967295,每个列表超越 40 亿个元素)。

2.3.1 根本指令

// 将一个或多个值 value 刺进到列表 key 的表头
> LPUSH key value [value ...]
// 将一个或多个值 value 刺进到列表 key 的表尾(最右边)。
> RPUSH key value [value ...]
// 移除并回来列表 key 的头元素。
> LPOP key
// 移除并回来列表 key 的尾元素。
> RPOP key
// BLPOP 是列表的堵塞式(blocking)弹出原语。
> BLPOP key [key ...] timeout
// BRPOP 是列表的堵塞式(blocking)弹出原语。
> BRPOP key [key ...] timeout
// 获取点拨方位元素
> LINDEX key index

2.3.2 运用场景

  1. 用户消息时刻线

由于list是有序的,所以咱们能够先进先出,先进后出的列表都能够做

2.4 Set

2.4.1 根本指令

// 将一个或多个 member 元素加入到调集 key 当中,现已存在于调集的 member 元素将被疏忽。
> SADD key member [member ...]
// 回来调集 key 中的一切成员。
> SMEMBERS key
// 回来调集 key 的基数(调集中元素的数量)。
> SCARD key
// 假设指令履行时,只供给了 key 参数,那么回来调集中的一个随机元素。
> SRANDMEMBER key [count]
// 移除并回来调集中的一个随机元素。
> SPOP key
// 移除调集 key 中的一个或多个 member 元素,不存在的 member 元素会被疏忽。
> SREM key member [member ...]
// 判别 member 元素是否调集 key 的成员。
> SISMEMBER key member
// 获取前一个调集有而后边1个调集没有的
> sdiff huihuiset huihuiset1
// 获取交集
> sinter huihuiset huihuiset1
// 获取并集
> sunion huihuiset huihuiset1

2.4.2 运用场景

  1. 抽奖 spop跟srandmember随机弹出或许获取元素
  1. 点赞、签到等,sadd调集存储
  1. 交集并集 关注等场景

2.5 ZSet(SortedSet)

sorted set,有序的set,每个元素有个score。

score相一起,依照key的ASCII码排序。

2.5.1 根本指令

//将一个或多个 member 元素及其 score 值加入到有序集 key 当中。
> ZADD key score member [[score member] [score member] ...]
// 回来有序集 key 中,指定区间内的成员。其间成员的方位按 score 值递加(从小到大)来排序
> ZRANGE key start stop [WITHSCORES]
// 回来有序集 key 中,指定区间内的成员。其间成员的方位按 score 值递减(从大到小)来排列。
> ZREVRANGE key start stop [WITHSCORES]
// 回来有序集 key 中,一切 score 值介于 min 和 max 之间(包含等于 min 或 max )的成员。有序集成员按 score 值递加(从小到大)次序排列。
>  ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
// 移除有序集 key 中的一个或多个成员,不存在的成员将被疏忽。
> ZREM key member [member ...]
// 回来有序集 key 的基数。
> ZCARD key
// 为有序集 key 的成员 member 的 score 值加上增量 increment 。
> ZINCRBY key increment member
// 回来有序集 key 中, score 值在 min 和 max 之间(默许包含 score 值等于 min 或 max )的成员的数量。
> ZCOUNT key min max
// 回来有序集 key 中成员 member 的排名。其间有序集成员按 score 值递加(从小到大)次序排列。
> ZRANK key member

2.5.2 运用场景

  1. 排行榜

三、Redis的业务

3.1 业务根本操作

// 1. multi指令敞开业务,exec指令提交业务
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> exec
1) OK
2) OK
// 2. 组队的过程中能够经过discard来抛弃组队。
 127.0.0.1:6379> multi
OK
 127.0.0.1:6379(TX)> set k3 v3
QUEUED
 127.0.0.1:6379(TX)> set k4 v4
QUEUED
 127.0.0.1:6379(TX)> discard
OK

3.2 Redis业务特性

  1. 独自的阻隔操作

业务中的一切指令都会序列化、按次序地履行。

业务在履行过程中,不会被其他客户端发送来的指令恳求所打断。

  1. 没有阻隔级别的概念

队列中的指令没有提交之前,都不会被实际地履行,由于业务提交之前任何指令都不会被实际履行,也就不存在“业务内的查询要看到业务里的更新,在业务外查询不能看到”这个让人万分头疼的问题。

  1. 不确保原子性

Redis同一个业务中假设有一条指令履行失利,其后的指令仍然会被履行,没有回滚。

四、Redis 分布式锁

4.1 Lua 完结分布式锁

local lockSet = redis.call('exists', KEYS[1])
if lockSet == 0 
	then
		redis.call('set', KEYS[1], ARG[1])
		redis.call('expire', KEYS[1], ARG[2])
	end
return lockSet

五、Redis总体数据结构

5.1 Redis dict字典

5.1.1 RedisDb数据接口(server.h文件)

数据最外层的结构为RedisDb

typedef struct redisDb {
    dict *dict; /* The keyspace for this DB */ //数据库的键
    dict *expires; /* Timeout of keys with a timeout set */ //设置了超时时刻的键
    dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ //客户端等候的keys
    dict *ready_keys; /* Blocked keys that received a PUSH */
    dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
    int id; /* Database ID */ //所在数 据库ID
    long long avg_ttl; /* Average TTL, just for tats */ //平均TTL,仅用于统计
    unsigned long expires_cursor; /* Cursor of the active expire cycle. */
    list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;

5.1.2 dict数据结构(dict.h文件)

咱们发现数据存储在dict中

typedef struct dict {
    dictType *type; //理解为面向目标思维,为支撑不同的数据类型对应dictType抽象办法,不同的数据类型能够不同完结
    void *privdata; //也可不同的数据类型相关,不同类型特定函数的可选参数。
    dictht ht[2]; //2个hash表,用来数据存储 2个dictht
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */ // rehash符号 -1代表不再rehash
    unsigned long iterators; /* number of iterators currently running */
} dict;

5.1.3 dictht结构(dict.h文件)

typedef struct dictht {
    dictEntry **table; //dictEntry 数组
    unsigned long size; //数组巨细 默许为4 #define DICT_HT_INITIAL_SIZE 4
    unsigned long sizemask; //size-1 用来取模得到数据的下标值
    unsigned long used; //改hash表中已有的节点数据
} dictht;

5.1.4 dictEntry节点结构(dict.h文件)

typedef struct dictEntry {
    void *key; //key
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v; //value
	struct dictEntry *next; //指向下一个节点
} dictEntry;

5.1.5 RedisObject(service.h文件)

/* Objects encoding. Some kind of objects like Strings and
Hashes can be
* internally represented in multiple ways. The 'encoding'
field of the object
* is set to one of this fields for this object. */
#define OBJ_ENCODING_RAW 0 /* Raw representation */
#define OBJ_ENCODING_INT 1 /* Encoded as integer */
#define OBJ_ENCODING_HT 2 /* Encoded as hash table */
#define OBJ_ENCODING_ZIPMAP 3 /* Encoded as zipmap */
#define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old
list encoding. */
#define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define OBJ_ENCODING_INTSET 6 /* Encoded as intset */
#define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string
encoding */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list
of ziplists */
#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree
of listpacks */
#define LRU_BITS 24
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of
obj->lru */
#define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution
in ms */
#define OBJ_SHARED_REFCOUNT INT_MAX /* Global object
never destroyed. */
#define OBJ_STATIC_REFCOUNT (INT_MAX-1) /* Object
allocated in the stack. */
#define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT
typedef struct redisObject {
unsigned type:4; //数据类型 string hash list
unsigned encoding:4; //底层的数据结构 跳表
unsigned lru:LRU_BITS; /* LRU time (relative to global
lru_clock) or
* LFU data (least significant
8 bits frequency
* and most significant 16 bits
access time). */
int refcount; //用于回收,引用计数法
void *ptr; //指向详细的数据结构的内存地址 比方 跳表、紧缩列表
} robj;

5.2 Redis数据结构图

一文搞懂Redis

5.3 Redis扩容机制

由于咱们的dictEntry数组默许巨细是4,假设不进行扩容,那么咱们一切的数据都会以链表的方式增加至数组下标 跟着数据量越来越大,之前只需求hash取模就能得到下标方位,现在得去循环我下标的链表,所以功能会越来越慢,当咱们的数据量抵达必定程度后,咱们就得去触发扩容操作。

5.3.1 什么时分扩容

5.3.1.1 扩容机制

  1. 当没有fork子进程在进行RDB或AOF耐久化时,ht[0]的used大于size时,触发扩容
  1. 假设有子进程在进行RDB或许AOF时,ht[0]的used大于等于size的5倍的时分,会触发扩容

5.3.1.2 源码验证

扩容,必定是在增加数据的时分才会扩容,所以咱们找一个增加数据的进口。

  1. 进口,增加或替换dictReplace (dict.c文件)
int dictReplace(dict *d, void *key, void *val) {
    dictEntry *entry, *existing, auxentry;
    /* Try to add the element. If the key
    * does not exists dictAdd will succeed. */
    entry = dictAddRaw(d,key,&existing);
    if (entry) {
        dictSetVal(d, entry, val);
        return 1;
    }
    /* Set the new value and free the old one. Note that
    it is important
    * to do that in this order, as the value may just be
    exactly the same
    * as the previous one. In this context, think to
    reference counting,
    * you want to increment (set), and then decrement
    (free), and not the
    * reverse. */
    auxentry = *existing;
    dictSetVal(d, existing, val);
    dictFreeVal(d, &auxentry);
    return 0;
}
  1. 进入dictAddRaw办法 (dict.c文件)
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing){
    long index;
    dictEntry *entry;
    dictht *ht;
    if (dictIsRehashing(d)) _dictRehashStep(d); //假设正在Rehash 进行渐进式hash 按步rehash
    /* Get the index of the new element, or -1 if
    * the element already exists. */
    if ((index = _dictKeyIndex(d, key, dictHashKey(d,key),existing)) == -1)
    	return NULL;
    /* Allocate the memory and store the new entry.
    * Insert the element in top, with the assumption that
    in a database
    * system it is more likely that recently added
    entries are accessed
    * more frequently. */
    //假设在hash 放在ht[1] 否则放在ht[0]
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    entry = zmalloc(sizeof(*entry));
    //选用头插法刺进hash桶
    entry->next = ht->table[index];
    ht->table[index] = entry;
    //已运用++
    ht->used++;
    /* Set the hash entry fields. */
    dictSetKey(d, entry, key);
    return entry;
}
  1. 得到数据下标的办法 _dictKeyIndex (dict.c文件)
static long _dictKeyIndex(dict *d, const void *key,uint64_t hash, dictEntry **existing){
    unsigned long idx, table;
    dictEntry *he;
    if (existing) *existing = NULL;
    /* Expand the hash table if needed */
    //扩容假设需求
    if (_dictExpandIfNeeded(d) == DICT_ERR)
    	return -1;
    //比较是否存在这个值
    for (table = 0; table <= 1; table++) {
        idx = hash & d->ht[table].sizemask;
        /* Search if this slot does not already contain
        the given key */
        he = d->ht[table].table[idx];
        while(he) {
            if (key==he->key || dictCompareKeys(d, key,he->key)) {
                if (existing) *existing = he;
                	return -1;
                }
            he = he->next;
        }
        if (!dictIsRehashing(d)) break;
    }
    return idx;
}
  1. _dictExpandIfNeeded 验证是否需求扩容 (dict.c文件)
/* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d){
    /* Incremental rehashing already in progress. Return.*/
    if (dictIsRehashing(d)) return DICT_OK; //正在rehash,回来
    /* If the hash table is empty expand it to the initialsize. */
    //假设ht[0]的长度为0,设置默许长度4 dictExpand为扩容的要害办法
    if (d->ht[0].size == 0) 
        	return dictExpand(d,DICT_HT_INITIAL_SIZE);
    //扩容条件 hash表中已运用的数量大于等于 hash表的巨细
    //而且dict_can_resize为1 或许 已运用的巨细大于hash表巨细的 5倍,大于等于1倍的时分,下面2个满意一个条件即可
    if (d->ht[0].used >= d->ht[0].size &&
        (dict_can_resize || d->ht[0].used/d->ht[0].size >dict_force_resize_ratio)){
        //扩容成原本的2倍
        return dictExpand(d, d->ht[0].used*2);
    }
    return DICT_OK;
}

5.3.2 怎样扩容

5.3.2.1 扩容过程

  1. 当满意我扩容条件,触发扩容时,判别是否在扩容,假设在扩容,或许 扩容的巨细跟我现在的ht[0].size相同,这次扩容不做
  1. new一个新的dictht,巨细为ht[0].used * 2(可是必须向上2的幂,比 如6 ,那么巨细为8) ,而且ht[1]=新创建的dictht
  1. 咱们有个更大的table了,可是需求把数据搬迁到ht[1].table ,所以将 dict的rehashidx(数据搬迁的偏移量)赋值为0 ,代表能够进行数据迁 移了,也便是能够rehash了
  1. 等候数据搬迁完结,数据不会立刻搬迁,而是选用渐进式rehash,渐渐的把数据搬迁到ht[1]
  1. 当数据搬迁完结,ht[0].table=ht[1] ,ht[1] .table = NULL、ht[1] .size = 0、ht[1] .sizemask = 0、 ht[1] .used = 0
  1. 把dict的rehashidex=-1

5.3.2.2 源码验证

  1. dictExpand办法在_dictExpandIfNeeded 办法中调用 (dict.c文件),为扩容的要害办法
int dictExpand(dict *d, unsigned long size){
    /* the size is invalid if it is smaller than the
    number of
    * elements already inside the hash table */
    //正在扩容,不允许第二次扩容,或许ht[0]的数据量大于扩容后的数量,没有意义,这次不扩容
    if (dictIsRehashing(d) || d->ht[0].used > size)
    	return DICT_ERR;
    dictht n; /* the new hash table */
    //得到最接近2的幂 跟hashMap思维相同
    unsigned long realsize = _dictNextPower(size);
    /* Rehashing to the same table size is not useful. */
    //假设跟ht[0]的巨细一致 直接回来错误
    if (realsize == d->ht[0].size) return DICT_ERR;
    /* Allocate the new hash table and initialize all
    pointers to NULL */
    //为新的dictht赋值
    n.size = realsize;
    n.sizemask = realsize-1;
    n.table = zcalloc(realsize*sizeof(dictEntry*));
    n.used = 0;
    /* Is this the first initialization? If so it's not
    really a rehashing
    * we just set the first hash table so that it can
    accept keys. */
    //ht[0].table为空,代表是初始化
    if (d->ht[0].table == NULL) {
        d->ht[0] = n;
        return DICT_OK;
    }
    /* Prepare a second hash table for incremental rehashing */
    //将扩容后的dictht赋值给ht[1]
    d->ht[1] = n;
    //符号为0,代表能够开始rehash
    d->rehashidx = 0;
    return DICT_OK;
}

5.3.3 数据怎样搬迁(渐进式hash)

5.3.3.1 搬迁方式

假设一次性把数据悉数搬迁会很耗时刻,会让单条指令等候好久,会构成堵塞。

所以,redis选用渐进式Rehash,所谓渐进式,便是渐渐的,不会一次性把一切数据搬迁。

那什么时分会进行渐进式数据搬迁?

  1. 每次进行key的crud操作都会进行一个hash桶的数据搬迁
  1. 守时使命,进行部分数据搬迁

5.3.3.2 源码验证

CRUD触发都会触发_dictRehashStep(渐进式hash)
  1. 每次增修正的时分都会调用_dictRehashStep办法
if (dictIsRehashing(d)) _dictRehashStep(d); //这个代码在增修正查的办法中都会调用
  1. _dictRehashStep办法
static void _dictRehashStep(dict *d) {
	if (d->iterators == 0) dictRehash(d,1);
}
  1. dictRehash办法
int dictRehash(dict *d, int n) {
	int empty_visits = n*10; /* Max number of empty
	buckets to visit. */ //要拜访的最大的空桶数 避免此次耗时过长
	if (!dictIsRehashing(d)) return 0; //假设没有在rehash回来0
	//循环,最多1次,而且ht[0]的数据没有搬迁完 进入循环
	while(n-- && d->ht[0].used != 0) {
		dictEntry *de, *nextde;
        /* Note that rehashidx can't overflow as we are
        sure there are more
        * elements because ht[0].used != 0 */
		assert(d->ht[0].size > (unsigned long)d->rehashidx);
		//rehashidx rehash的索引,默许0 假设hash桶为空 进入自旋 而且判别是否到了最大的拜访空桶数
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
		de = d->ht[0].table[d->rehashidx]; //得到ht[0]的下标为rehashidx桶
        /* Move all the keys in this bucket from the old
        to the new hash HT */
        while(de) { //循环hash桶的链表 而且搬运到ht[1]
            uint64_t h;
            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & d-
            >ht[1].sizemask;
            de->next = d->ht[1].table[h]; //头插
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
        //搬运完后 将ht[0]相对的hash桶设置为null
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
	}
    /* Check if we already rehashed the whole table... */
    //ht[0].used=0 代表rehash悉数完结
    if (d->ht[0].used == 0) {
        //清空ht[0]table
        zfree(d->ht[0].table);
        //将ht[0] 从头指向现已完结rehash的ht[1]
        d->ht[0] = d->ht[1];
        //将ht[1]一切数据从头设置
        _dictReset(&d->ht[1]);
        //设置-1,代表rehash完结
        d->rehashidx = -1;
        return 0;
    }
    /* More to rehash... */
    return 1;
}
//将ht[1]的属性复位
static void _dictReset(dictht *ht)
{
    ht->table = NULL;
    ht->size = 0;
    ht->sizemask = 0;
    ht->used = 0;
}
守时使命触发

再讲守时使命之前,咱们要知道Redis中有个时刻工作驱动,在 server.c文件下有个serverCron 办法。

serverCron 每隔多久会履行一次,履行频率依据redis.conf中的hz装备,serverCorn中有个办法databasesCron()

  1. 守时办法serverCron
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    //.......
    /* We need to do a few operations on clients
    asynchronously. */
    clientsCron();
    /* Handle background operations on Redis databases. */
    databasesCron(); //处理Redis数据库上的后台操作。
    //.......
}
  1. databasesCron办法
void databasesCron(void) {
    /* Expire keys by random sampling. Not required for
    slaves
    * as master will synthesize DELs for us. */
    if (server.active_expire_enabled) {
    	if (iAmMaster()) {
    		activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
    	} else {
    		expireSlaveKeys();
    	}
    }
    /* Defrag keys gradually. */
    activeDefragCycle();
    /* Perform hash tables rehashing if needed, but only
    if there are no
    * other processes saving the DB on disk. Otherwise
    rehashing is bad
    * as will cause a lot of copy-on-write of memory
    pages. */
    if (!hasActiveChildProcess()) {
        /* We use global counters so if we stop the
        computation at a given
        * DB we'll be able to start from the successive
        in the next
        * cron loop iteration. */
        static unsigned int resize_db = 0;
        static unsigned int rehash_db = 0;
    	int dbs_per_call = CRON_DBS_PER_CALL;
    	int j;
   		/* Don't test more DBs than we have. */
    	if (dbs_per_call > server.dbnum) 
            dbs_per_call = server.dbnum;
        /* Resize */
        for (j = 0; j < dbs_per_call; j++) {
            tryResizeHashTables(resize_db % server.dbnum);
            resize_db++;
        }
        /* Rehash */ //Rehash逻辑
        if (server.activerehashing) {
            for (j = 0; j < dbs_per_call; j++) {
                //进入incrementallyRehash
                int work_done = incrementallyRehash(rehash_db);
                if (work_done) {
                    /* If the function did some work, stop
                        here, we'll do
                        * more at the next cron loop. */
                    break;
                } else {
                    /* If this db didn't need rehash,
                        we'll try the next one. */
                    rehash_db++;
                    rehash_db %= server.dbnum;
                }
            }
        }
    }
} 
  1. 进入incrementallyRehash办法(server.c)
int incrementallyRehash(int dbid) {
    /* Keys dictionary */
    if (dictIsRehashing(server.db[dbid].dict)) {
        dictRehashMilliseconds(server.db[dbid].dict,1);
        return 1; 
        /* already used our millisecond for this
        loop... */
    }
    /* Expires */
    if (dictIsRehashing(server.db[dbid].expires)) {
        dictRehashMilliseconds(server.db[dbid].expires,1);
        return 1; /* already used our millisecond for this
        loop... */
    }
    return 0;
}
  1. 进入dictRehashMilliseconds(dict.c)
int dictRehashMilliseconds(dict *d, int ms) {
    long long start = timeInMilliseconds();
    int rehashes = 0;
    //进入rehash办法 dictRehash 去完结渐进时HASH
    while(dictRehash(d,100)) {
        rehashes += 100;
        //判别是否超时
        if (timeInMilliseconds()-start > ms) break;
    }
    return rehashes;
}

5.4 String类型数据结构

Redis中String的底层没有用c的char来完结,而是选用了SDS(simple Dynamic String)的数据结构来完结。而且供给了5种不同的类型

5.4.1 SDS数据结构界说(sds.h文件)

typedef char *sds;
/* Note: sdshdr5 is never used, we just access the flags
    byte directly.
    * However is here to document the layout of type 5 SDS
strings. */
struct __attribute__ ((__packed__)) sdshdr5 {
    unsigned char flags; /* 3 lsb of type, and 5 msb of
    	string length */
    char buf[];
}
struct __attribute__ ((__packed__)) sdshdr8 {
    uint8_t len;  /* used */ //已运用的长度
    uint8_t alloc; /* excluding the header and null terminator */ //分配的总容量 不包含头和空终止符
    unsigned char flags; /* 3 lsb of type, 5 unused bits */ //低三位类型 高5bit未运用
    char buf[]; //数据buf 存储字符串数组
};
struct __attribute__ ((__packed__)) sdshdr16 {
    uint16_t len; /* used */
    uint16_t alloc; /* excluding the header and null
    terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits
    */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 {
    uint32_t len; /* used */
    uint32_t alloc; /* excluding the header and null
    terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits
    */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {
    uint64_t len; /* used */
    uint64_t alloc; /* excluding the header and null
    terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits
    */
    char buf[];
};

5.4.2 SDS数据结构的优势

  1. char字符串不记载本身长度,所以获取一个字符串长度的复杂度是O(n),可是SDS记载分配的长度alloc,已运用的长度len,获取长度为 O(1)
  1. 削减修正字符串带来的内存重分配次数
  1. 空间预分配,SDS长度假设小于1MB,预分配跟长度相同的,大于1M,每次跟len的巨细多1M
  1. 惰性空间开释,截取的时分不立刻开释空间,供下次运用!一起供给相应的开释SDS未运用空间的API
  1. 二进制安全

5.5 Hash类型数据结构

Redis的字典相当于Java言语中的HashMap,他是无序字典。内部结构上同样是数组 + 链表二维结构。榜首维hash的数组方位磕碰时,就会将磕碰的元素运用链表串起来。

5.5.1 Hash数据结构图

一文搞懂Redis

5.5.2 Hash的紧缩列表

紧缩列表会依据存入的数据的不同类型以及不同巨细,分配不同巨细的空间。所以是为了节约内存而选用的。

由所以一块完好的内存空间,当里边的元素发生变更时,会发生连锁更新,严重影响咱们的拜访功能。所以,只适用于数据量比较小的场景。

所以,Redis会有相关装备,Hashes只需小数据量的时分才会用到ziplist,当hash目标一起满意以下两个条件的时分,运用ziplist编码:

  1. 哈希目标保存的键值对数量<512个
  1. 一切的键值对的键和值的字符串长度都<64byte(一个英文字母一个字节)
hash-max-ziplist-value 64 // ziplist中最大能寄存的值长度
hash-max-ziplist-entries 512 // ziplist中最多能寄存的entry节点数量

5.6 List类型数据结构

5.6.1 quickList快速列表

兼顾了ziplist的节约内存,而且必定程度上处理了连锁更新的问题,咱们的 quicklistNode节点里边是个ziplist,每个节点是分开的。那么就算发生了连锁更新,也只会发生在一个quicklistNode节点。

quicklist.h文件

typedef struct{
    struct quicklistNode *prev; //前指针
    struct quicklistNode *next; //后指针
    unsigned char *zl; //数据指针 指向ziplist结果
    unsigned int sz; //ziplist巨细 /* ziplist
    size in bytes */
    unsigned int count : 16; /* count of items in ziplist */ //ziplist的元素
    unsigned int encoding : 2; /* RAW==1 or LZF==2 */ //
    是否紧缩, 1没有紧缩 2 lzf紧缩
    unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */ //预留容器字段
    unsigned int recompress : 1; /* was this node previous compressed? */
    unsigned int attempted_compress : 1; /* node can't compress; too small */
    unsigned int extra : 10; /* more bits to steal for  future usage */ //预留字段
} quicklistNode;

5.6.2 list数据结构图

一文搞懂Redis

5.7 Set类型数据结构

Redis用intset或hashtable存储set。满意下面条件,就用inset存储。

  1. 假设不是整数类型,就用dictht hash表(数组+链表)
  1. 假设元素个数超越512个,也会用hashtable存储。跟一个装备有关:
set-max-intset-entries 512

不满意上述条件的,都运用hash表存储,value存null。

5.8 ZSet数据结构

5.8.1 ZipList

默许运用ziplist编码。

在ziplist的内部,依照score排序递加来存储。刺进的时分要移动之后的数据。

假设元素数量大于等于128个,或许任一member长度大于等于64字节运用 skiplist+dict存储。

zset-max-ziplist-entries 128
zset-max-ziplist-value 64

假设不满意条件,选用跳表。

5.8.2 跳表(skiplist)

结构界说(servier.h)

/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
 sds ele; //sds数据
 double score; //score
 struct zskiplistNode *backward; //撤退指针
 //层级数组
 struct zskiplistLevel {
     struct zskiplistNode *forward; //前进指针
     unsigned long span; //跨度
 } level[];
} zskiplistNode;
//跳表列表
typedef struct zskiplist {
 struct zskiplistNode *header, *tail; //头尾节点
 unsigned long length; //节点数量
 int level; //最大的节点层级
} zskiplist;

zslInsert 增加节点办法 (t_zset.c)

zskiplistNode *zslInsert(zskiplist *zsl, double score, sds
ele) {
 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
 unsigned int rank[ZSKIPLIST_MAXLEVEL];
 int i, level;
 serverAssert(!isnan(score));
 x = zsl->header;
 for (i = zsl->level-1; i >= 0; i--) {
     /* store rank that is crossed to reach the insert
     position */
     rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
     while (x->level[i].forward &&
         (x->level[i].forward->score < score ||
         咕泡出品 必属精品精品属精品 咕泡出品 必属精品 咕泡出品 必属精品 咕泡咕泡
         (x->level[i].forward->score == score
         &&
         sdscmp(x->level[i].forward->ele,ele) <
         0)))
         {
         rank[i] += x->level[i].span;
         x = x->level[i].forward;
     }
     update[i] = x;
 }
 /* we assume the element is not already inside, since
 we allow duplicated
    * scores, reinserting the same element should never
    happen since the
    * caller of zslInsert() should test in the hash table
    if the element is
    * already inside or not. */
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,ele);
    //不同层级建立链表联络
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;
        /* update span covered by update[i] as x is
        inserted here */
        x->level[i].span = update[i]->level[i].span -
        (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) +
        1;
    }
    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
    	update[i]->level[i].span++;
    }
    x->backward = (update[0] == zsl->header) ? NULL :
    update[0];
    if (x->level[0].forward)
    	x->level[0].forward->backward = x;
    else
        zsl->tail = x;
        zsl->length++;
    return x;
}

ZSKIPLIST_MAXLEVEL默许32 界说在server.h

#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64
elements */

六、Redis过期战略

6.1 惰性过期

所谓惰性过期,便是在每次拜访操作key的时分,判别这个key是不是过期了,过期了就删去。

6.1.1 源码验证

  1. expireIfNeeded办法(db.c文件)
int expireIfNeeded(redisDb *db, robj *key) {
	if (!keyIsExpired(db,key)) return 0;
    /* If we are running in the context of a slave,
    instead of
    * evicting the expired key from the database, we
    return ASAP:
    * the slave key expiration is controlled by the
    master that will
    * send us synthesized DEL operations for expired
    keys.
    *
    * Still we try to return the right information to the
    caller,
    * that is, 0 if we think the key should be still
    valid, 1 if
    * we think the key is expired at this time. */
    // 假设装备有masterhost,说明是从节点,那么不操作删去
    if (server.masterhost != NULL) return 1;
    /* Delete the key */
    server.stat_expiredkeys++;
    propagateExpire(db,key,server.lazyfree_lazy_expire);
    notifyKeyspaceEvent(NOTIFY_EXPIRED,
    "expired",key,db->id);
    //是否是异步删去 避免单个Key的数据量很大 堵塞主线程 是4.0之后增加的新功能,默许封闭
    int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : dbSyncDelete(db,key);
    if (retval) signalModifiedKey(NULL,db,key);
    return retval;
}

每次调用到相关指令时,才会履行expireIfNeeded判别是否过期,平时不会去判别是否过期。

该战略能够最大化的节约CPU资源。

可是却对内存十分不友爱。由于假设没有再次拜访,就或许一向堆积再内存中。占用内存

所以就需求另一种战略来合作运用,便是定期过期

6.2 定期过期

那么多久去清除一次,咱们在讲Rehash的时分,有个办法是serverCron,履行频率依据redis.conf中的hz装备。

这办法除了做Rehash以外,还会做很多其他的工作,比方:

  1. 整理数据库中的过期键值对
  1. 更新服务器的各类统计信息,比方时刻、内存占用、数据库占用情况等
  1. 封闭和整理连接失效的客户端
  1. 尝试进行耐久化操作

6.2.1 完结流程

  1. 守时serverCron办法去履行整理,履行频率依据redis.cong中的hz装备的值(默许是10,也便是1s履行10次,100ms履行一次)
  1. 履行整理的时分,不是去扫描一切的key,而是去扫描一切设置了过期时刻的key redisDB.expires
  1. 假设每次去把一切的key都拿出来,那么假设过期的key很多,就会很慢,所以也不是一次性拿出一切的key
  1. 依据hash桶的维度去扫描key,扫到20(可装备)个key为止。假设榜首个桶是15个key没有满意20,持续扫描第二个桶,第二个桶20个key,由所以以hash桶的维度扫描的,所以第二个扫到了就会全扫,一共扫描35个key
  1. 找到扫描的key里边过期的key,并进行删去
  1. 假设取了400个空桶(最多拿400个桶),或许扫描的删去比例跟扫描的总数超越10%,持续履行4、5步
  1. 也不能无限的循环,循环16次后回去检测时刻,超越指守时刻会跳出

6.2.2 源码验证

  1. 进口serverCrom
// serverCron办法调用databasesCron办法(server.c)
/* Handle background operations on Redis databases. */
databasesCron();
void databasesCron(void) {
    /* Expire keys by random sampling. Not required for
    slaves
    * as master will synthesize DELs for us. */
    if (server.active_expire_enabled) {
        if (iAmMaster()) {
        	activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
        //过期删去办法
        } else {
        	expireSlaveKeys();
        }
    }
}

七、Redis筛选战略

由于Redis内存是有巨细的,而且我或许里边的数据都没有过期,当快满的时分,我又没有过期的数据进行筛选,那么这个时分内存也会满。内存满了,Redis也会不能放入新的数据。所以,咱们不得已需求一些战略来处理这个问题来确保可用性。

7.1 筛选战略

7.1.1 noeviction

New values aren’t saved when memory limit is reached. When a database uses replication, this applies to the primary database.

默许,不筛选 能读不能写

7.1.2 allkeys-lru

Keeps most recently used keys; removes least recently used (LRU) keys

依据伪LRU算法,在一切的key中去筛选

7.1.3 allkeys-lfu

Keeps frequently used keys; removes least frequently used (LFU) keys.

依据伪LFU算法,在一切的key中去筛选

7.1.4 volatile-lru

Removes least recently used keys with the expire field set to true.

依据伪LRU算法,在设置了过期时刻的key中去筛选

7.1.5 volatile-lfu

Removes least frequently used keys with the expire field set to true.

依据伪LFU算法 在设置了过期时刻的key中去筛选

7.1.6 allkeys-random

Randomly removes keys to make space for the new data added.

依据随机算法,在一切的key中去筛选

7.1.7 volatile-random

Randomly removes keys with expire field set to true.

依据随机算法 在设置了过期时刻的key中去筛选

7.1.8 volatile-ttl

Removes least frequently used keys with expire field set to true and the shortest remaining time-to-live (TTL) value.

依据过期时刻来,筛选即将过期的

7.2 筛选流程

  1. 首要,咱们会有个筛选池,默许巨细是16,而且里边的数据是结尾筛选制
  1. 每次指令操作的时分,自旋会判别当时内存是否满意指令所需求的内存
  1. 假设当时不满意,会从筛选池的尾部拿一个最合适筛选的数据
  1. 会取样(装备 maxmemory-samples),从Redis中获取随机获取到取样的数据,处理一次性读取一切的数据慢的问题
  1. 在取样的数据中,依据筛选算法找到最合适筛选的数据
  1. 将最合适的那个数据跟筛选池中的数据进行比较,是否比筛选池的数据更合适筛选,假设更合适,放入筛选池
  1. 依照合适的程度进行排序,最合适筛选的放入尾部
  1. 将需求筛选的数据从Redis删去,而且从筛选池移除

7.3 LRU算法

Lru,Least Recently Used 翻译过来是最久未运用,依据时刻轴来走,仍好久没用的数据。只需最近有用过,我就默许是有用的。

那么它的一个衡量标准是啥?时刻!依据运用时刻,从近到远,越远的越简单筛选。

7.3.1 完结原理

需求:得到目标隔多久没拜访,隔的时刻越久,越简单被筛选

  1. 首要,LRU是依据这个目标的拜访操作时刻来进行筛选的,那么咱们需求知道这个目标最终操作的拜访时刻
  1. 知道了目标的最终操作拜访时刻后,咱们只需求跟当时系统时刻来进行比照,就能算出目标多久没拜访了

7.3.2 源码验证

redis中,目标都会被redisObject目标包装,其间有个字段叫lru。

redisObject目标 (server.h文件)

typedef struct redisObject {
 unsigned type:4;
 unsigned encoding:4;
 unsigned lru:LRU_BITS; /* LRU time (relative to global
 lru_clock) or
 * LFU data (least significant 8 bits frequency
 * and most significant 16 bits access time). */
 int refcount;
 void *ptr;
} robj;

看LRU的字段的说明,那么咱们大概能猜出来,redis去完结lru筛选算法必定跟咱们这个目标的lru相关

而且这个字段的巨细为24bit,那么这个字段记载的是目标操作拜访时分的秒单位时刻的后24bit

相当于:

long timeMillis=System.currentTimeMillis();
System.out.println(timeMillis/1000); //获取当时秒
System.out.println(timeMillis/1000 & ((1<<24)-1)); //获取秒的后24位

咱们知道了这个目标的最终操作拜访的时刻。假设咱们要得到这个目标多久没拜访了,咱们是不是就很简单,用我当时的时刻-这个目标的拜访时刻就能够了!可是这个拜访时刻是秒单位时刻的后24bit 所以,也是用当时的时刻的秒单位的后24bit去减。

estimateObjectIdleTime办法(evict.c)

unsigned long long estimateObjectIdleTime(robj *o) {
 //获取秒单位时刻的最终24位
 unsigned long long lruclock = LRU_CLOCK();
 //由于只需24位,一切最大的值为2的24次方-1
 //超越最大值从0开始,所以需求判别lruclock(当时系统时刻)跟缓存目标的lru字段的巨细
 if (lruclock >= o->lru) {
     //假设lruclock>=robj.lru,回来lruclock-o->lru,再转换单位
     return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION;
 } else {
     //否则选用lruclock + (LRU_CLOCK_MAX - o->lru),得到对
     象的值越小,回来的值越大,越大越简单被筛选
     return (lruclock + (LRU_CLOCK_MAX - o->lru)) *
     LRU_CLOCK_RESOLUTION;
 }
}

7.3.3 LRU总结

用lruclock与 redisObject.lru进行比较,由于lruclock只获取了当时秒单位时刻的后24位,所以必定有个轮询。

所以,咱们会用lruclock跟 redisObject.lru进行比较,假设lruclock>redisObject.lru,那么咱们用lruclock- redisObject.lru,否则lruclock+(24bit的最大值-redisObject.lru),得到lru越小,那么回来的数据越大,相差越大的越优先会被筛选!

www.processon.com/view/link/5…

7.4 LFU算法

LFU,英文 Least Frequently Used,翻译成中文便是最不常用的优先筛选。

不常用,它的衡量标准便是次数,次数越少的越简单被筛选。

这个完结起来应该也很简单,目标被操作拜访的时分,去记载次数,每次操作拜访一次,就+1;筛选的时分,直接去比较这个次数,次数越少的越简单筛选!

7.4.1 LFU的时效性问题

何为时效性问题?便是只会去考虑数量,可是不会去考虑时刻。

ps:上一年的某个新闻很火,点击量3000W,本年有个新闻刚出来点击量100次,原本咱们是想保留本年的新的新闻的,可是假设依据LFU来做的话,咱们发现筛选的是本年的新闻,明显是不合理的

导致的问题:新的数据进不去,旧的数据出不来。

那么怎么处理呢,且往下看

7.4.2 源码分析

咱们还是来看redisObject(server.h)

typedef struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:LRU_BITS; /* LRU time (relative to global
    lru_clock) or
    * LFU data (least significant 8 bits frequency
    * and most significant 16 bits access time). */
    int refcount;
    void *ptr;
} robj;

咱们看它的lru,它假设在LFU算法的时分!它前面16位代表的是时刻,后8位代表的是一个数值,frequency是频率,应该便是代表这个目标的拜访次数,咱们先给它叫做counter。

前16bits时刻有啥用?

跟时刻相关,我猜测应该是跟处理时效性相关的,那么怎样处理的?从生活中找例子

大家应该充过一些会员,比方我这个年岁的,小时分喜欢充腾讯的黄钻、绿钻、蓝钻等等。可是有一个点,假设哪天我没充钱了的话,或许没有续VIP的时分,我这个钻石等级会跟着时刻的丢掉而降低。比方我原本是黄钻V6,可是一年不充钱的话,或许就变成了V4。

那么回到Redis,大胆的去猜测,那么这个时刻是不是去记载这个目标有多久没拜访了,假设多久没拜访,我就去削减对应的次数。

LFUDecrAndReturn办法(evict.c)

unsigned long LFUDecrAndReturn(robj *o) {
    //lru字段右移8位,得到前面16位的时刻
    unsigned long ldt = o->lru >> 8;
    //lru字段与255进行&运算(255代表8位的最大值),
    //得到8位counter值
    unsigned long counter = o->lru & 255;
    //假设装备了lfu_decay_time,用LFUTimeElapsed(ldt) 除以装备的值
    //LFUTimeElapsed(ldt)源码见下
    //总的没拜访的分钟时刻/装备值,得到每分钟没拜访衰减多少
    unsigned long num_periods = server.lfu_decay_time ?LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
    if (num_periods)
    //不能削减为负数,非负数用couter值减去衰减值
    counter = (num_periods > counter) ? 0 : counter - num_periods;
    return counter;
}

衰减因子由装备

lfu-decay-time 1 //多少分钟没操作拜访就去衰减一次

后8bits的次数,最大值是255,够不够?

必定不够,一个目标的拜访操作次数必定不止255次。

可是咱们能够让数据抵达255的很难。那么怎样做的?这个比较简单,咱们先看源码,然后再总结

LFULogIncr办法 (evict.c 文件)

uint8_t LFULogIncr(uint8_t counter) {
    //假设现已到最大值255,回来255 ,8位的最大值
    if (counter == 255) return 255;
    //得到随机数(0-1)
    double r = (double)rand()/RAND_MAX;
    //LFU_INIT_VAL表明基数值(在server.h装备)
    double baseval = counter - LFU_INIT_VAL;
    //假设达不到基数值,表明快不行了,baseval =0
    if (baseval < 0) baseval = 0;
    //假设快不行了,必定给他加counter
    //不然,依照几率是否加counter,一起跟baseval与lfu_log_factor相关
    //都是在分子,所以2个值越大,加counter几率越小
    double p = 1.0/(baseval*server.lfu_log_factor+1);
    if (r < p) counter++;
    return counter;
}

八、Redis 耐久化

8.1 RBD

8.1.1 何时触发RBD

主动触发

  1. 装备触发
save 900 1 900s查看一次,至少有1key被修正就触发
save 300 10 300s查看一次,至少有10key被修正就触发
save 60 10000 60s查看一次,至少有10000key被修正
  1. shutdown正常封闭

任何组件在正常封闭的时分,都会去完结应该完结的事。比方Mysql 中的Redolog耐久化,正常封闭的时分也会去耐久化。

  1. flushall指令触发

数据清空指令会触发RDB操作,而且是触发一个空的RDB文件,所以, 假设在没有敞开其他的耐久化的时分,flushall是能够删库跑路的,在生产环境慎用。

8.1.2 RDB的优劣

优势

  1. 是个十分紧凑型的文件,十分合适备份与灾祸恢复
  1. 最大极限的提高了功能,会fork一个子进程,父进程永远不会产于磁盘 IO或许相似操作
  1. 更快的重启

缺乏

  1. 数据安全性不是很高,由所以依据装备的时刻来备份,假设每5分钟备份 一次,也会有5分钟数据的丢掉
  1. 经常fork子进程,所以比较耗CPU,对CPU不是很友爱

8.2 AOF

由于RDB的数据可靠性十分低,所以Redis又供给了别的一种耐久化方案: Append Only File 简称:AOF

AOF默许是封闭的,你能够在装备文件中进行敞开:

appendonly no //默许封闭,能够进行敞开
# The name of the append only file (default:"appendonly.aof")
appendfilename "appendonly.aof" //AOF文件名

追加文件,即每次更改的指令都会附加到我的AOF文件中。

8.2.1 同步机制

AOF会记载每个写操作,那么问题来了。我难道每次操作指令都要和磁盘交互?

当然不行,所以redis支撑几种战略,由用户来决议要不要每次都和磁盘交互

# appendfsync always 表明每次写入都履行fsync(改写)函数 功能会十分十分慢 可是十分安全
appendfsync everysec 每秒履行一次fsync函数 或许丢掉1s的数据
# appendfsync no 由操作系统确保数据同步到磁盘,速度最快 你的数据只需求交给操作系统就行

默许1s一次,最多有1s丢掉

8.2.2 重写机制

由于AOF是追加的方式,所以文件会越来越大,越大的话,数据加载越慢。 所以咱们需求对AOF文件进行重写。

何为重写

比方 咱们的incr指令,假设咱们incr了100次,现在数据是100,可是咱们的aof文件中会有100条incr指令,可是咱们发现这个100条指令用处不大, 假设咱们能把最新的内存里的数据保存下来的话。所以,重写便是做了这么一件工作,把当时内存的数据重写下来,然后把之前的追加的文件删去。

重写流程

  • 在Redis7之前
  1. Redis fork一个子进程,在一个暂时文件中写入新的AOF(当时内存的数据生成的新的AOF)
  1. 那么在写入新的AOF的时分,主进程还会有指令进入,那么主进程会在内存缓存区中累计新的指令 (可是一起也会写在旧的AOF文件中,就算 重写失利,也不会导致AOF损坏或许数据丢掉)
  1. 假设子进程重写完结,父进程会收到完结信号,而且把内存缓存中的指令追加到新的AOF文件中
  1. 替换旧的AOF文件 ,而且将新的指令附加到重写好的AOF文件中
  • 在Redis7之后,AOF文件不再是一个,所以会有暂时清单的概念
  1. 子进程开始在一个暂时文件中写入新的根底AOF
  1. 父级翻开一个新的增量 AOF 文件以持续写入更新。假设重写失利,旧的根底和增量文件(假设有的话)加上这个新翻开的增量文件就代表了完好的更新数据集,所以咱们是安全的
  1. 当子进程完结根底文件的重写后,父进程收到一个信号,并运用新翻开 的增量文件和子进程生成的根底文件来构建暂时清单,并将其耐久化
  1. 现在 Redis 对清单文件进行原子交流,以便此 AOF 重写的结果生效。 Redis 还会整理旧的根底文件和任何未运用的增量文件

可是重写是把当时内存的数据,写入一个新的AOF文件,假设当时数据比较大,然后以指令的方式写入的话,会很慢很慢

所以在4.0之后,在重写的时分是选用RDB的方式去生成新的AOF文件,然 后后续追加的指令,还是以指令追加的方式追加的文件结尾

aof-use-rdb-preamble yes //是否敞开RDB与AOF混合模式

什么时分重写

装备文件redis.conf

# 重写触发机制
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb 就算抵达了榜首个百分比的巨细,也必须大于 64M
在 aof 文件小于64mb的时分不进行重写,当抵达64mb的时分,就重写一
次。重写后的 aof 文件或许是40mb。上面装备了auto-aof-rewritepercentag为100,即 aof 文件到了80mb的时分,进行重写。

8.2.3 AOF的优势与缺乏

优势

  1. 安全性高,就算是默许的耐久化同步机制,也最多只会丢掉1s数据
  1. AOF由于某些原因,比方磁盘满了等导致追加失利,也能经过redischeck-aof 工具来修复
  1. 格局都是追加的日志,所以可读性更高

缺乏

  1. 数据集一般比RDB大
  1. 耐久化跟数据加载比RDB更慢
  1. 在7.0之前,重写的时分,由于重写的时分,新的指令会缓存在内存区,所以会导致很多的内存运用
  1. 而且重写期间,会跟磁盘进行2次IO,一个是写入老的AOF文件,一个是写入新的AOF文件

九、Redis常见问题总结

9.1 Redis数据丢掉场景

9.1.1 耐久化丢掉

选用RDB或许不耐久化, 会有数据丢掉,由所以手动或许装备以快照的方式来进行备份。

处理:启用AOF,以指令追加的方式进行备份,可是默许也会有1s丢掉, 这是在功能与数据安全性中寻求的一个最合适的方案,假设为了确保数据 一致性,能够将装备更改为always,可是功能很慢,一般不用。

9.1.2 主从切换

由于Redis的数据是主异步同步给从的,提高了功能,可是由所以异步同步到从。所以存在数据丢掉的或许

  1. master写入数据k1 ,由所以异步同步到slave,当master没有同步给slave的时分,master挂了
  1. slave会成为新的master,而且没有同步k1
  1. master重启,会成为新master的slave,同步数据会清空自己的数据,从新的master加载
  1. k1丢掉

9.2 Redis缓存雪崩、穿透、击穿问题分析

9.2.1 缓存雪崩

缓存雪崩便是Redis的很多热点数据一起过期(失效),由于设置了相同的过期时刻,刚好这个时分Redis恳求的并发量又很大,就会导致一切的恳求落到数据库。

  1. 确保Redis的高可用,避免由于Redis不可用导致悉数打到DB
  1. 加互斥锁或许运用队列,针对同一个key只允许一个线程到数据库查询
  1. 缓存守时预先更新,避免一起失效
  1. 经过加随机数,使key在不同的时刻过期

9.2.2 缓存穿透

缓存穿透是指缓存和数据库中都没有的数据,可是用户一向恳求不存在的数据!这时的用户很或许便是攻击者,歹意搞你们公司的,攻击会导致数据库压力过大。

处理方案:布隆过滤器

布隆过滤器的思维:已然是由于你Redis跟DB都没有,然后用户歹意一向拜访不存在的key,然后悉数打到Redis跟DB。那么咱们能不能独自把DB的数据独自存到别的一个当地,可是这个当地只存一个简单的符号,符号这个key存不存在,假设存在,就去拜访Redis跟DB,否则直接回来。而且这个内存最好很小。

9.2.3 缓存击穿

单个key过期的时分有很多并发,运用互斥锁,回写redis,而且选用双重查看锁来提高功能!削减对DB的拜访。