当今互联网应用中,跟着事务的开展,数据量越来越大,查询功率越来越高,关于时序数据的存储、查询和分析需求也越来越激烈,这时候 Redis 就成为了首选的计划之一。

Redis 供给了多种数据结构,如字符串、哈希表、列表、调集、有序调集等,每种数据结构都具备不同的特性,能够满足不同的事务需求。其间,有序调集的 score 能够存储时刻戳,十分适合用于存储时序数据,例如监控方针、日志、核算数据、报表等。下面举几个时序数据场景例子:

  1. 监控方针:

假定咱们有一个服务,名为 my_service,需求监控它的恳求呼应时刻。咱们能够运用 Redis 有序调集来存储数据,每个恳求的呼应时刻作为 value,恳求的时刻戳作为 score。示例如下:

> ZADD requests:my_service 1613115560 350
(integer) 1
> ZADD requests:my_service 1613115570 450
(integer) 1
> ZADD requests:my_service 1613115580 550
(integer) 1

这些指令向名为 requests:my_service 的有序调集中添加了 3 条数据,分别是 2021 年 2 月 12 日 10:19:20 的恳求呼应时刻为 350ms,10:19:30 的恳求呼应时刻为 450ms,10:19:40 的恳求呼应时刻为 550ms。

接下来,咱们来看一下怎么运用 Redis 指令查询这些监控方针的数据。下面的指令会回来 requests:my_service 有序调集内一切数据:

> ZRANGE requests:my_service 0 -1 WITHSCORES
1) "350"
2) "1613115560"
3) "450"
4) "1613115570"
5) "550"
6) "1613115580"

指令履行结果表明,数据依照 score 排序,其间 score 是时刻戳(单位为秒),value 是恳求呼应时刻(单位为毫秒)。同时,运用 ZRANGEBYSCORE 指令能够获取一段时刻范围内的监控数据,例如:

> ZRANGEBYSCORE requests:my_service 1613115570 1613115580 WITHSCORES
1) "450"
2) "1613115570"
3) "550"
4) "1613115580"

这条指令回来了 requests:my_service 有序调集中在时刻戳 1613115570 到 1613115580 之间的一切数据。

  1. 日志:

假定咱们要存储的日志是一条指定格局的字符串,包括时刻戳和日志内容。运用 Redis 列表存储日志数据,每次写入新日志时能够运用 Redis 列表的 rpush 指令将数据写入列表的尾部。示例如下:

> RPUSH logs:my_logs 2021-02-12 10:30:00 INFO message 1
(integer) 1
> RPUSH logs:my_logs 2021-02-12 10:30:01 ERROR message 2
(integer) 2
> RPUSH logs:my_logs 2021-02-12 10:30:02 WARN message 3
(integer) 3

这些指令向名为 logs:my_logs 的列表尾部添加 3 条数据,分别是 2021 年 2 月 12 日 10:30:00 的 INFO 等级音讯,10:30:01 的 ERROR 等级音讯和 10:30:02 的 WARN 等级音讯。

接下来,咱们来看一下怎么运用 Redis 指令查询这些日志数据。下面的指令会回来 logs:my_logs 列表内一切数据:

> LRANGE logs:my_logs 0 -1
1) "2021-02-12 10:30:00 INFO message 1"
2) "2021-02-12 10:30:01 ERROR message 2"
3) "2021-02-12 10:30:02 WARN message 3"

指令履行结果表明,数据依照插入次序排序,从列表头部开始遍历。运用 ZRANGEBYSCORE 指令能够获取一段时刻范围内的日志数据,例如:

> ZRANGEBYSCORE logs:my_logs 1613115570 1613115580
1) "2021-02-12 10:30:01 ERROR message 2"

这条指令回来了 logs:my_logs 列表中在时刻戳 1613115570 到 1613115580 之间的日志数据,但由于日志数据并没有详细的 time stamp 做 score,所以这个例子仅仅演示这个指令的用法,实际上应该运用有序调集去查询时刻区间内的日志数据。

  1. 核算数据:

假定咱们要存储的核算数据是一些详细事务相关的计数器,例如每分钟用户拜访量。咱们能够运用 Redis 有序调集来存储核算数据,key 是计数器称号,score 是时刻戳,value 是详细的计数值(例如拜访次数)。示例如下:

> ZADD visits 1613167800 100
(integer) 1
> ZADD visits 1613167860 120
(integer) 1
> ZADD visits 1613167920 150
(integer) 1

这些指令向名为 visits 的有序调集中添加了 3 条数据,分别是 2021 年 2 月 12 日 23:30:00 的拜访次数为 100,23:31:00 的拜访次数为 120,23:32:00 的拜访次数为 150。

接下来,咱们来看一下怎么运用 Redis 指令查询这些核算数据。下面的指令会回来 visits 有序调集内一切数据:

> ZRANGE visits 0 -1 WITHSCORES
1) "100"
2) "1613167800"
3) "120"
4) "1613167860"
5) "150"
6) "1613167920"

指令履行结果表明,数据依照 score 排序,其间 score 是时刻戳(单位为秒),value 是拜访次数。运用 ZRANGEBYSCORE 指令能够获取一段时刻范围内的核算数据,例如:

> ZRANGEBYSCORE visits 1613167860 1613167920 WITHSCORES
1) "120"
2) "1613167860"
3) "150"
4) "1613167920"

这条指令回来了 visits 有序调集中在时刻戳 1613167860 到 1613167920 之间的一切数据。

运用 Redis 有序调集中的另一个常见场景是核算 TopN,例如找出拜访次数最多的前 10 个计数器,能够运用指令 ZREVRANGE visits 0 9 WITHSCORES,它回来 visits 有序调集中前 10 个元素,依照 value 从大到小摆放,而且回来每个元素的 score。

需求实践:

这是一个实时监控体系,主要用于记录和核算服务发生的过错状况,以便在过错数量超越预设阈值时发出正告信息。

体系每秒钟生成随机过错数据,并将它们存储到 Redis 数据库中。每隔 10 秒钟,体系会从 Redis 数据库中聚合最近一分钟内的过错数据,并依照服务名和过错类型进行核算核算。假如某个服务的过错数量超越预设阈值,体系会输出一条正告信息提示用户。

整个体系的方针是协助用户及时了解每个服务的过错状况,以便及时采纳相应的措施,保障服务的稳定性和可靠性

代码示例:

模拟接口服务反常数据

package com.example.demo.redis;
import redis.clients.jedis.Jedis;
import java.util.*;
public class DataGenerator {
    // 界说服务列表
    private static final List<String> SERVICES = Arrays.asList("service1", "service2", "service3");
    // 界说过错列表
    public static final List<String> ERRORS = Arrays.asList("invalid_param", "timeout", "unknown_error");
    /**
     * 生成数据
     *
     * @param total 数据总数
     * @param jedis Redis 客户端连接
     */
    public static void generateData(int total, Jedis jedis) {
        Random rand = new Random(); // 初始化随机数生成器
        long currentTimestamp = System.currentTimeMillis() / 1000; // 获取当时时刻戳,准确到秒
        long startTimestamp = currentTimestamp - 60; // 核算开始时刻戳,为当时时刻戳减去 60 秒
        for (int i = 0; i < total; i++) { // 循环 total 次,生成 total 条数据
            String service = SERVICES.get(rand.nextInt(SERVICES.size())); // 随机挑选一个服务
            String error = ERRORS.get(rand.nextInt(ERRORS.size())); // 随机挑选一个过错
            long timestamp = startTimestamp + rand.nextInt(60); // 生成一个随机时刻戳,准确到秒,范围为开始时刻戳到当时时刻戳
            int count = 1;
            String item = String.format("%s:%s:%d:%d", service, error, timestamp, count);
            jedis.zadd("error_data", timestamp, item); // 将过错数据存储到 Redis 数据库中
        }
    }
}

聚合反常数据,到达阈值告警

package com.example.demo.redis;
import redis.clients.jedis.Jedis;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class DataAggregator {
    private static final String REDIS_HOST = "localhost"; // Redis 主机名
    private static final int REDIS_PORT = 6379; // Redis 端口号
    private static final int THRESHOLD = 100; // 预设阈值,当过错数量超越该阈值时触发正告
    public static void main(String[] args) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); // 创立一个只要一个线程的守时使命履行程序
        Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT); // 创立 Redis 客户端连接
        scheduler.scheduleAtFixedRate(() -> {
            // 并发状况下,线程会阻塞
            synchronized (jedis) {
                DataGenerator.generateData(20, jedis); // 生成随机过错数据,并将其存储到 Redis 数据库中
            }
        }, 0, 1, TimeUnit.SECONDS); // 守时使命间隔为 1 秒钟
        scheduler.scheduleAtFixedRate(() -> { // 守时使命逻辑
            synchronized (jedis) {
                long currentTimestamp = System.currentTimeMillis() / 1000; // 获取当时时刻戳,准确到秒
                long startTimestamp = currentTimestamp - 60; // 核算开始时刻戳,为当时时刻戳减去 60 秒
                Set<String> data = jedis.zrangeByScore("error_data", startTimestamp, currentTimestamp); // 运用 zrange 指令获取指守时刻范围内的数据
                Map<String, Map<String, Integer>> countMap = new HashMap<>(); // 用于记录聚合后的服务和过错数量信息
                for (String item : data) { // 遍历一切过错数据
                    String[] parts = item.split(":"); // 以冒号为分隔符,将过错数据分割为部分
                    String service = parts[0]; // 获取服务名
                    String error = parts[1]; // 获取过错类型
                    long timestamp = Long.parseLong(parts[2]); // 获取时刻戳
                    int count = Integer.parseInt(parts[3]); // 获取过错数量
                    if (timestamp < startTimestamp) { // 假如时刻戳早于开始时刻戳,则越过该数据
                        continue;
                    }
                    Map<String, Integer> serviceCountMap = countMap.computeIfAbsent(service, k -> new HashMap<>()); // 获取指定服务的过错数量信息
                    serviceCountMap.put(error, serviceCountMap.getOrDefault(error, 0) + count); // 更新指定服务和过错类型的过错数量信息
                }
                List<String> alerts = new ArrayList<>(); // 用于存储正告信息
                for (String service : countMap.keySet()) { // 遍历服务名列表
                    Map<String, Integer> serviceCountMap = countMap.get(service); // 获取服务和过错数量信息
                    int totalErrors = 0;
                    for (String error : serviceCountMap.keySet()) { // 遍历过错列表
                        int count = serviceCountMap.get(error); // 获取过错数量
                        totalErrors += count;
                    }
                    if (totalErrors > THRESHOLD) { // 假如过错数量超越预设阈值
                        alerts.add(service + " has too many errors: " + serviceCountMap.keySet() + ", count: " + totalErrors); // 将该服务名添加到正告信息列表中
                    }
                }
                if (!alerts.isEmpty()) { // 假如正告信息列表不为空
                    System.out.println(String.join("\n", alerts)); // 打印正告信息
                }
            }
        }, 0, 10, TimeUnit.SECONDS); // 守时使命间隔为 10 秒
        // 关闭 Redis 连接
        jedis.close();
    }
}

以上代码可正常运转,有疑问能够沟通~~

欢迎关注公众号:程序员的考虑与落地

公众号供给很多实践案例,Java入门者不容错失哦,能够沟通!!