假如你会恣意一门言语的stream流,没道理不会大数据开发。

俗话说男追女隔座山,女追男隔层纱。 假如说零根底学大数据,感觉前面是一座山,那么只需你会java或许恣意一门言语的stream流,那大数据就只隔了一层纱。
本文以java stream流核算为例,讲解一些根底的spark操作。另一个流行的大数据结构flink同理。


准备工作

测试数据,以下列别离表明名字,年纪,部分,职位。

张三,20,研发部,普通职工
李四,31,研发部,普通职工
李丽,36,财务部,普通职工
张伟,38,研发部,司理
杜航,25,人事部,普通职工
周歌,28,研发部,普通职工

创立一个Employee类。

    @Getter
    @Setter
    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    static
    class Employee implements Serializable {
        private String name;
        private Integer age;
        private String department;
        private String level;
    }
}

版别: jdk:1.8 spark:3.2.0 scala:2.12.15
上面的scala版别仅仅spark结构本身需要依靠到scala。
由于scala确实是比较小众的言语,本文还是运用java演示spark代码。

1.map类

1.1 java stream map

map表明1对1操作。将上游数据的一行数据进行恣意操作,终究得到操作后的一条数据。
这种思维,在java和spark,flink都是共同的。

咱们先用java stream演示读取文件,再运用map操作将每行数据映射为Employee目标。

List<String> list = FileUtils.readLines(new File("f:/test.txt"), "utf-8");
        List<Employee> employeeList = list.stream().map(word -> {
            List<String> words = Arrays.stream(word.split(",")).collect(Collectors.toList());
            Employee employee = new Employee(words.get(0), Integer.parseInt(words.get(1)), words.get(2), words.get(3));
            return employee;
        }).collect(Collectors.toList());
        employeeList.forEach(System.out::println);

转化后的数据:

JavaStreamDemo.Employee(name=张三, age=20, department=研发部, level=普通职工)
JavaStreamDemo.Employee(name=李四, age=31, department=研发部, level=普通职工)
JavaStreamDemo.Employee(name=李丽, age=36, department=财务部, level=普通职工)
JavaStreamDemo.Employee(name=张伟, age=38, department=研发部, level=司理)
JavaStreamDemo.Employee(name=杜航, age=25, department=人事部, level=普通职工)
JavaStreamDemo.Employee(name=周歌, age=28, department=研发部, level=普通职工)

1.2 spark map

首要得到一个SparkSession目标,读取文件,得到一个DataSet弹性数据集目标。

SparkSession session = SparkSession.builder().master("local[*]").getOrCreate();
Dataset<Row> reader = session.read().text("F:/test.txt");
reader.show();

这儿的show()便是打印输出当时数据集,它是一个action类的算子。
得到成果:

+-----------------------+
|                  value|
+-----------------------+
|张三,20,研发部,普通职工|
|李四,31,研发部,普通职工|
|李丽,36,财务部,普通职工|
|    张伟,38,研发部,司理|
|杜航,25,人事部,普通职工|
|周歌,28,研发部,普通职工|
+-----------------------+

现在咱们拿到了根底数据,咱们运用map1对1操作,将一行行数据转化为Employee目标。
咱们这儿不运用lamda表达式,让大家看得愈加清晰。
这儿完结了MapFunction接口里的call办法,每次拿到一行数据,咱们这儿进行切分,再转化为目标。

  1. 需要特别指出的一点是,与后端WEB运用有一个统一反常处理不同的是,大数据运用,特别是流式核算,要确保7*24在线,需要对每个算子进行反常捕获。
    由于你不知道上游数据清洗到底怎么样,很或许拿到一条脏数据,处理的时分抛出反常,假如没有捕获处理,那么整个运用就会挂掉。

  2. spark的算子分为Transformation和Action两种类型。Transformation会开成一个DAG图,具有lazy推迟性,它只会从一个dataset(rdd/df)转化成另一个dataset(rdd/df),只有当遇到action类的算子才会真实履行。 咱们今天会演示的算子都是Transformation类的算子。
    典型的Action算子包括show,collect,save之类的。比如在本地进行show检查成果,或许完结运转后save到数据库,或许HDFS。

  3. spark履行时分为driver和executor。但不是本文的要点,不会展开讲。 只需要注意driver端会将代码分发到各个分布式系统的节点executor上,它本身不会参与核算。一般来说,算子外部,如以下示例代码的a处会在driver端履行,b处算子内部会不同服务器上的executor端履行。 所以在算子外部定义的变量,在算子内部运用的时分要特别注意!! 不要想当然地认为都是一个main办法里写的代码,就一定会在同一个JVM里。
    这儿涉及到序列化的问题,一起它们分处不同的JVM,运用”==”比较的时分也或许会出问题!!
    这是一个后端WEB开发转向大数据开发时,这个思维一定要改变过来。
    简言之,后端WEB服务的分布式是咱们自己完结的,大数据的分布式是结构天生帮咱们完结的

1.2.1 MapFunction

// a 算子外部,driver端
Dataset<Employee> employeeDataset = reader.map(new MapFunction<Row, Employee>() {
            @Override
            public Employee call(Row row) throws Exception {
                // b 算子内部,executor端
                Employee employee = null;
                try {
                    // gson.fromJson(); 这儿运用gson涉及到序列化问题
                    List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList());
                    employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3));
                } catch (Exception exception) {
                    // 日志记载
                    // 流式核算中要做到7*24小时不间断,恣意一条上流脏数据都或许导致失利,然后导致使命退出,所以这儿要做好反常的抓取
                    exception.printStackTrace();
                }
                return employee;
            }
        }, Encoders.bean(Employee.class));
        employeeDataset.show();

输出

+---+----------+--------+----+
|age|department|   level|name|
+---+----------+--------+----+
| 20|    研发部|普通职工|张三|
| 31|    研发部|普通职工|李四|
| 36|    财务部|普通职工|李丽|
| 38|    研发部|    司理|张伟|
| 25|    人事部|普通职工|杜航|
| 28|    研发部|普通职工|周歌|

1.2.2 MapPartitionsFunction

spark中 map和mapPartitions有啥差异?
map是1条1条处理数据
mapPartitions是一个分区一个分区处理数据


后者一定比前者效率高吗?
不一定,看具体情况。

这儿运用前面 map 相同的逻辑处理。能够看到在call办法里得到的是一个Iterator迭代器,是一批数据。
得到一批数据,然后再1对1映射为目标,再以Iterator的方式回来这批数据。

Dataset<Employee> employeeDataset2 = reader.mapPartitions(new MapPartitionsFunction<Row, Employee>() {
            @Override
            public Iterator<Employee> call(Iterator<Row> iterator) throws Exception {
                List<Employee> employeeList = new ArrayList<>();
                while (iterator.hasNext()){
                    Row row = iterator.next();
                    try {
                        List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList());
                        Employee employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3));
                        employeeList.add(employee);
                    } catch (Exception exception) {
                        // 日志记载
                        // 流式核算中要做到7*24小时不间断,恣意一条上流脏数据都或许导致失利,然后导致使命退出,所以这儿要做好反常的抓取
                        exception.printStackTrace();
                    }
                }
                return employeeList.iterator();
            }
        }, Encoders.bean(Employee.class));
        employeeDataset2.show();

输出成果跟map相同,这儿就不贴出来了。

2.flatMap类

map和flatMap有什么差异?
map是1对1,flatMap是一对多。 当然在java stream中,flatMap叫法叫做扁平化。

这种思维,在java和spark,flink都是共同的。

2.1 java stream flatMap

以下代码将1条原始数据映射到2个目标上并回来。

List<Employee> employeeList2 = list.stream().flatMap(word -> {
            List<String> words = Arrays.stream(word.split(",")).collect(Collectors.toList());
            List<Employee> lists = new ArrayList<>();
            Employee employee = new Employee(words.get(0), Integer.parseInt(words.get(1)), words.get(2), words.get(3));
            lists.add(employee);
            Employee employee2 = new Employee(words.get(0)+"_2", Integer.parseInt(words.get(1)), words.get(2), words.get(3));
            lists.add(employee2);
            return lists.stream();
        }).collect(Collectors.toList());
        employeeList2.forEach(System.out::println);

输出

JavaStreamDemo.Employee(name=张三, age=20, department=研发部, level=普通职工)
JavaStreamDemo.Employee(name=张三_2, age=20, department=研发部, level=普通职工)
JavaStreamDemo.Employee(name=李四, age=31, department=研发部, level=普通职工)
JavaStreamDemo.Employee(name=李四_2, age=31, department=研发部, level=普通职工)
JavaStreamDemo.Employee(name=李丽, age=36, department=财务部, level=普通职工)
JavaStreamDemo.Employee(name=李丽_2, age=36, department=财务部, level=普通职工)
JavaStreamDemo.Employee(name=张伟, age=38, department=研发部, level=司理)
JavaStreamDemo.Employee(name=张伟_2, age=38, department=研发部, level=司理)
JavaStreamDemo.Employee(name=杜航, age=25, department=人事部, level=普通职工)
JavaStreamDemo.Employee(name=杜航_2, age=25, department=人事部, level=普通职工)
JavaStreamDemo.Employee(name=周歌, age=28, department=研发部, level=普通职工)
JavaStreamDemo.Employee(name=周歌_2, age=28, department=研发部, level=普通职工)

2.2 spark flatMap

这儿完结FlatMapFunction的call办法,一次拿到1条数据,然后回来值是Iterator,所以能够回来多条。

Dataset<Employee> employeeDatasetFlatmap = reader.flatMap(new FlatMapFunction<Row, Employee>() {
            @Override
            public Iterator<Employee> call(Row row) throws Exception {
                List<Employee> employeeList = new ArrayList<>();
                try {
                    List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList());
                    Employee employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3));
                    employeeList.add(employee);
                    Employee employee2 = new Employee(list.get(0)+"_2", Integer.parseInt(list.get(1)), list.get(2), list.get(3));
                    employeeList.add(employee2);
                } catch (Exception exception) {
                    exception.printStackTrace();
                }
                return employeeList.iterator();
            }
        }, Encoders.bean(Employee.class));
        employeeDatasetFlatmap.show();

输出

+---+----------+--------+------+
|age|department|   level|  name|
+---+----------+--------+------+
| 20|    研发部|普通职工|  张三|
| 20|    研发部|普通职工|张三_2|
| 31|    研发部|普通职工|  李四|
| 31|    研发部|普通职工|李四_2|
| 36|    财务部|普通职工|  李丽|
| 36|    财务部|普通职工|李丽_2|
| 38|    研发部|    司理|  张伟|
| 38|    研发部|    司理|张伟_2|
| 25|    人事部|普通职工|  杜航|
| 25|    人事部|普通职工|杜航_2|
| 28|    研发部|普通职工|  周歌|
| 28|    研发部|普通职工|周歌_2|
+---+----------+--------+------+

3 groupby类

与SQL相似,java stream流和spark相同,groupby对数据集进行分组并在此根底上能够进行聚合函数操作。也能够分组直接得到一组子数据集。

3.1 java stream groupBy

按部分分组核算部分人数:

Map<String, Long> map = employeeList.stream().collect(Collectors.groupingBy(Employee::getDepartment, Collectors.counting()));
        System.out.println(map);

输出

{财务部=1, 人事部=1, 研发部=4}

3.2 spark groupBy

将映射为目标的数据集按部分分组,在此根底上核算部分职工数和平均年纪。

RelationalGroupedDataset datasetGroupBy = employeeDataset.groupBy("department");
// 核算每个部分有多少职工
datasetGroupBy.count().show(); 
/**
 * 每个部分的平均年纪
 */
datasetGroupBy.avg("age").withColumnRenamed("avg(age)","avgAge").show();

输出别离为

+----------+-----+
|department|count|
+----------+-----+
|    财务部|    1|
|    人事部|    1|
|    研发部|    4|
+----------+-----+
+----------+------+
|department|avgAge|
+----------+------+
|    财务部|  36.0|
|    人事部|  25.0|
|    研发部| 29.25|
+----------+------+

3.3 spark groupByKey

spark的groupBygroupByKey的差异,前者在此根底上运用聚合函数得到一个聚合值,后者仅仅进行分组,不进行任何核算。
相似于java stream的:

Map<String, List<Employee>> map2 = employeeList.stream().collect(Collectors.groupingBy(Employee::getDepartment));
System.out.println(map2);

输出

{财务部=[JavaStreamDemo.Employee(name=李丽, age=36, department=财务部, level=普通职工)],
人事部=[JavaStreamDemo.Employee(name=杜航, age=25, department=人事部, level=普通职工)], 
研发部=[JavaStreamDemo.Employee(name=张三, age=20, department=研发部, level=普通职工), JavaStreamDemo.Employee(name=李四, age=31, department=研发部, level=普通职工), JavaStreamDemo.Employee(name=张伟, age=38, department=研发部, level=司理), JavaStreamDemo.Employee(name=周歌, age=28, department=研发部, level=普通职工)]}

运用spark groupByKey。
先得到一个key-value的一对多的一个集合数据集。 这儿的call()办法回来的是key,即分组的key。

KeyValueGroupedDataset keyValueGroupedDataset = employeeDataset.groupByKey(new MapFunction<Employee, String>() {
            @Override
            public String call(Employee employee) throws Exception {
                // 回来分组的key,这儿表明依据部分进行分组
                return employee.getDepartment();
            }
        }, Encoders.STRING());

再在keyValueGroupedDataset 的根底上进行mapGroups,在call()办法里就能够拿到每个key的一切原始数据。

keyValueGroupedDataset.mapGroups(new MapGroupsFunction() {
            @Override
            public Object call(Object key, Iterator iterator) throws Exception {
                System.out.println("key = " + key);
                while (iterator.hasNext()){
                    System.out.println(iterator.next());
                }
                return iterator; 
            }
        }, Encoders.bean(Iterator.class))
                .show(); // 这儿的show()没有意义,仅仅触发核算罢了

输出

key = 人事部
SparkDemo.Employee(name=杜航, age=25, department=人事部, level=普通职工)
key = 研发部
SparkDemo.Employee(name=张三, age=20, department=研发部, level=普通职工)
SparkDemo.Employee(name=李四, age=31, department=研发部, level=普通职工)
SparkDemo.Employee(name=张伟, age=38, department=研发部, level=司理)
SparkDemo.Employee(name=周歌, age=28, department=研发部, level=普通职工)
key = 财务部
SparkDemo.Employee(name=李丽, age=36, department=财务部, level=普通职工)

4 reduce类

reduce的字面意思是:削减;减小;下降;缩小。
又叫归约。

它将数据集进行循环,让当时目标前一目标两两进行核算,每次核算得到的成果作为下一次核算的前一目标,并终究得到一个目标。
假设有5个数据【1,2,3,4,5】,运用reduce进行求和核算,别离是

揭开神秘面纱,会stream流就会大数据

比如上面的测试数据集,我要核算各部分年纪总数。运用聚合函数得到的是一个int类型的数字。

4.1 java stream reduce

int age = employeeList.stream().mapToInt(e -> e.age).sum();
System.out.println(age);//178

运用reduce也可进行上面的核算

int age1 = employeeList.stream().mapToInt(e -> e.getAge()).reduce(0,(a,b) -> a+b);
System.out.println(age1);// 178

但是我将年纪求和,一起得到一个完好的目标呢?

JavaStreamDemo.Employee(name=周歌, age=178, department=研发部, level=普通职工)

能够运用reduce将数据集两两循环,将年纪相加,一起回来最终一个遍历的目标。
下面代码的pre 代表前一个目标,current 代表当时目标。

 /**
 * pre 代表前一个目标
 * current 代表当时目标
 */
Employee reduceEmployee = employeeList.stream().reduce(new Employee(), (pre,current) -> {
     // 当第一次循环时前一个目标为null
    if (pre.getAge() == null) {
        current.setAge(current.getAge());
    } else {
        current.setAge(pre.getAge() + current.getAge());
    }
    return current;
});
System.out.println(reduceEmployee);

4.2 spark reduce

spark reduce的基本思维跟java stream是相同的。
直接看代码:

 Employee datasetReduce = employeeDataset.reduce(new ReduceFunction<Employee>() {
            @Override
            public Employee call(Employee t1, Employee t2) throws Exception {
                // 不同的版别看是否需要判别t1 == null
                t2.setAge(t1.getAge() + t2.getAge());
                return t2;
            }
        });
        System.out.println(datasetReduce);

输出

SparkDemo.Employee(name=周歌, age=178, department=研发部, level=普通职工)

其它常见操作类

Employee employee = employeeDataset.filter("age > 30").limit(3).sort("age").first();
System.out.println(employee);
// SparkDemo.Employee(name=李四, age=31, department=研发部, level=普通职工)

一起能够将dataset注册成table,运用更为强壮的SQL来进行各种强壮的运算。
现在SQL是flink的一等公民,spark也不遑多让。
这儿举一个十分简略的比如。

employeeDataset.registerTempTable("table");
session.sql("select * from table where age > 30 order by age desc limit 3").show();

输出

+---+----------+--------+----+
|age|department|   level|name|
+---+----------+--------+----+
| 38|    研发部|    司理|张伟|
| 36|    财务部|普通职工|李丽|
| 31|    研发部|普通职工|李四|
+---+----------+--------+----+

employeeDataset.registerTempTable("table");
session.sql("select 
            concat_ws(',',collect_set(name)) as names, // group_concat
            avg(age) as age,
            department from table 
            where age > 30  
            group by department 
            order by age desc 
            limit 3").show();

输出

+---------+----+----------+
|    names| age|department|
+---------+----+----------+
|     李丽|36.0|    财务部|
|张伟,李四|34.5|    研发部|
+---------+----+----------+

小结

本文依据java stream的相似性,介绍了spark里面一些常见的算子操作。
本文仅仅做一个十分简略的入门介绍。
假如感兴趣的话, 后端的同学能够尝试着操作一下,十分简略,本地不需要建立环境,只需引进spark 的 maven依靠即可。
我把本文的一切代码悉数贴在最终面。

java stream 源码:

点击检查代码
import lombok.*;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class JavaStreamDemo {
    public static void main(String[] args) throws IOException {
        /**
         * 张三,20,研发部,普通职工
         * 李四,31,研发部,普通职工
         * 李丽,36,财务部,普通职工
         * 张伟,38,研发部,司理
         * 杜航,25,人事部,普通职工
         * 周歌,28,研发部,普通职工
         */
        List<String> list = FileUtils.readLines(new File("f:/test.txt"), "utf-8");
        List<Employee> employeeList = list.stream().map(word -> {
            List<String> words = Arrays.stream(word.split(",")).collect(Collectors.toList());
            Employee employee = new Employee(words.get(0), Integer.parseInt(words.get(1)), words.get(2), words.get(3));
            return employee;
        }).collect(Collectors.toList());
        // employeeList.forEach(System.out::println);
        List<Employee> employeeList2 = list.stream().flatMap(word -> {
            List<String> words = Arrays.stream(word.split(",")).collect(Collectors.toList());
            List<Employee> lists = new ArrayList<>();
            Employee employee = new Employee(words.get(0), Integer.parseInt(words.get(1)), words.get(2), words.get(3));
            lists.add(employee);
            Employee employee2 = new Employee(words.get(0)+"_2", Integer.parseInt(words.get(1)), words.get(2), words.get(3));
            lists.add(employee2);
            return lists.stream();
        }).collect(Collectors.toList());
        // employeeList2.forEach(System.out::println);
        Map<String, Long> map = employeeList.stream().collect(Collectors.groupingBy(Employee::getDepartment, Collectors.counting()));
        System.out.println(map);
        Map<String, List<Employee>> map2 = employeeList.stream().collect(Collectors.groupingBy(Employee::getDepartment));
        System.out.println(map2);
        int age = employeeList.stream().mapToInt(e -> e.age).sum();
        System.out.println(age);// 178
        int age1 = employeeList.stream().mapToInt(e -> e.getAge()).reduce(0,(a,b) -> a+b);
        System.out.println(age1);// 178
        /**
         * pre 代表前一个目标
         * current 代表当时目标
         */
        Employee reduceEmployee = employeeList.stream().reduce(new Employee(), (pre,current) -> {
            if (pre.getAge() == null) {
                current.setAge(current.getAge());
            } else {
                current.setAge(pre.getAge() + current.getAge());
            }
            return current;
        });
        System.out.println(reduceEmployee);
    }
    @Getter
    @Setter
    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    static
    class Employee implements Serializable {
        private String name;
        private Integer age;
        private String department;
        private String level;
    }
}

spark的源码:

点击检查代码
import com.google.gson.Gson;
import lombok.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.*;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
public class SparkDemo {
    public static void main(String[] args) {
        SparkSession session = SparkSession.builder().master("local[*]").getOrCreate();
        Dataset<Row> reader = session.read().text("F:/test.txt");
        // reader.show();
        /**
         * +-----------------------+
         * |                  value|
         * +-----------------------+
         * |张三,20,研发部,普通职工|
         * |李四,31,研发部,普通职工|
         * |李丽,36,财务部,普通职工|
         * |张伟,38,研发部,司理|
         * |杜航,25,人事部,普通职工|
         * |周歌,28,研发部,普通职工|
         * +-----------------------+
         */
        // 本地演示罢了,实际分布式环境,这儿的gson涉及到序列化问题
        // 算子以外的代码都在driver端运转
        // 任何算子以内的代码都在executor端运转,即会在不同的服务器节点上履行
        Gson gson = new Gson();
        // a 算子外部,driver端
        Dataset<Employee> employeeDataset = reader.map(new MapFunction<Row, Employee>() {
            @Override
            public Employee call(Row row) throws Exception {
                // b 算子内部,executor端
                Employee employee = null;
                try {
                    // gson.fromJson(); 这儿运用gson涉及到序列化问题
                    List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList());
                    employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3));
                } catch (Exception exception) {
                    // 日志记载
                    // 流式核算中要做到7*24小时不间断,恣意一条上流脏数据都或许导致失利,然后导致使命退出,所以这儿要做好反常的抓取
                    exception.printStackTrace();
                }
                return employee;
            }
        }, Encoders.bean(Employee.class));
        // employeeDataset.show();
        /**
         * +---+----------+--------+----+
         * |age|department|   level|name|
         * +---+----------+--------+----+
         * | 20|    研发部|普通职工|张三|
         * | 31|    研发部|普通职工|李四|
         * | 36|    财务部|普通职工|李丽|
         * | 38|    研发部|    司理|张伟|
         * | 25|    人事部|普通职工|杜航|
         * | 28|    研发部|普通职工|周歌|
         */
        Dataset<Employee> employeeDataset2 = reader.mapPartitions(new MapPartitionsFunction<Row, Employee>() {
            @Override
            public Iterator<Employee> call(Iterator<Row> iterator) throws Exception {
                List<Employee> employeeList = new ArrayList<>();
                while (iterator.hasNext()){
                    Row row = iterator.next();
                    try {
                        List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList());
                        Employee employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3));
                        employeeList.add(employee);
                    } catch (Exception exception) {
                        // 日志记载
                        // 流式核算中要做到7*24小时不间断,恣意一条上流脏数据都或许导致失利,然后导致使命退出,所以这儿要做好反常的抓取
                        exception.printStackTrace();
                    }
                }
                return employeeList.iterator();
            }
        }, Encoders.bean(Employee.class));
        // employeeDataset2.show();
        /**
         * +---+----------+--------+----+
         * |age|department|   level|name|
         * +---+----------+--------+----+
         * | 20|    研发部|普通职工|张三|
         * | 31|    研发部|普通职工|李四|
         * | 36|    财务部|普通职工|李丽|
         * | 38|    研发部|    司理|张伟|
         * | 25|    人事部|普通职工|杜航|
         * | 28|    研发部|普通职工|周歌|
         * +---+----------+--------+----+
         */
        Dataset<Employee> employeeDatasetFlatmap = reader.flatMap(new FlatMapFunction<Row, Employee>() {
            @Override
            public Iterator<Employee> call(Row row) throws Exception {
                List<Employee> employeeList = new ArrayList<>();
                try {
                    List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList());
                    Employee employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3));
                    employeeList.add(employee);
                    Employee employee2 = new Employee(list.get(0)+"_2", Integer.parseInt(list.get(1)), list.get(2), list.get(3));
                    employeeList.add(employee2);
                } catch (Exception exception) {
                    exception.printStackTrace();
                }
                return employeeList.iterator();
            }
        }, Encoders.bean(Employee.class));
//        employeeDatasetFlatmap.show();
        /**
         * +---+----------+--------+------+
         * |age|department|   level|  name|
         * +---+----------+--------+------+
         * | 20|    研发部|普通职工|  张三|
         * | 20|    研发部|普通职工|张三_2|
         * | 31|    研发部|普通职工|  李四|
         * | 31|    研发部|普通职工|李四_2|
         * | 36|    财务部|普通职工|  李丽|
         * | 36|    财务部|普通职工|李丽_2|
         * | 38|    研发部|    司理|  张伟|
         * | 38|    研发部|    司理|张伟_2|
         * | 25|    人事部|普通职工|  杜航|
         * | 25|    人事部|普通职工|杜航_2|
         * | 28|    研发部|普通职工|  周歌|
         * | 28|    研发部|普通职工|周歌_2|
         * +---+----------+--------+------+
         */
        RelationalGroupedDataset datasetGroupBy = employeeDataset.groupBy("department");
        // 核算每个部分有多少职工
        // datasetGroupBy.count().show();
        /**
         * +----------+-----+
         * |department|count|
         * +----------+-----+
         * |    财务部|    1|
         * |    人事部|    1|
         * |    研发部|    4|
         * +----------+-----+
         */
        /**
         * 每个部分的平均年纪
         */
        // datasetGroupBy.avg("age").withColumnRenamed("avg(age)","avgAge").show();
        /**
         * +----------+--------+
         * |department|avg(age)|
         * +----------+--------+
         * |    财务部|    36.0|
         * |    人事部|    25.0|
         * |    研发部|   29.25|
         * +----------+--------+
         */
        KeyValueGroupedDataset keyValueGroupedDataset = employeeDataset.groupByKey(new MapFunction<Employee, String>() {
            @Override
            public String call(Employee employee) throws Exception {
                // 回来分组的key,这儿表明依据部分进行分组
                return employee.getDepartment();
            }
        }, Encoders.STRING());
        keyValueGroupedDataset.mapGroups(new MapGroupsFunction() {
            @Override
            public Object call(Object key, Iterator iterator) throws Exception {
                System.out.println("key = " + key);
                while (iterator.hasNext()){
                    System.out.println(iterator.next());
                }
                return iterator;
                /**
                 * key = 人事部
                 * SparkDemo.Employee(name=杜航, age=25, department=人事部, level=普通职工)
                 * key = 研发部
                 * SparkDemo.Employee(name=张三, age=20, department=研发部, level=普通职工)
                 * SparkDemo.Employee(name=李四, age=31, department=研发部, level=普通职工)
                 * SparkDemo.Employee(name=张伟, age=38, department=研发部, level=司理)
                 * SparkDemo.Employee(name=周歌, age=28, department=研发部, level=普通职工)
                 * key = 财务部
                 * SparkDemo.Employee(name=李丽, age=36, department=财务部, level=普通职工)
                 */
            }
        }, Encoders.bean(Iterator.class))
                .show(); // 这儿的show()没有意义,仅仅触发核算罢了
        Employee datasetReduce = employeeDataset.reduce(new ReduceFunction<Employee>() {
            @Override
            public Employee call(Employee t1, Employee t2) throws Exception {
                // 不同的版别看是否需要判别t1 == null
                t2.setAge(t1.getAge() + t2.getAge());
                return t2;
            }
        });
        System.out.println(datasetReduce);
        Employee employee = employeeDataset.filter("age > 30").limit(3).sort("age").first();
        System.out.println(employee);
        // SparkDemo.Employee(name=李四, age=31, department=研发部, level=普通职工)
        employeeDataset.registerTempTable("table");
        session.sql("select * from table where age > 30 order by age desc limit 3").show();
        /**
         * +---+----------+--------+----+
         * |age|department|   level|name|
         * +---+----------+--------+----+
         * | 38|    研发部|    司理|张伟|
         * | 36|    财务部|普通职工|李丽|
         * | 31|    研发部|普通职工|李四|
         * +---+----------+--------+----+
         */
    }
    @Getter
    @Setter
    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    public static class Employee implements Serializable {
        private String name;
        private Integer age;
        private String department;
        private String level;
    }
}

spark maven依靠,自行不需要的spark-streaming,kafka依靠去掉。

点击检查代码
<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.12.15</scala.version>
        <spark.version>3.2.0</spark.version>
        <encoding>UTF-8</encoding>
    </properties>
    <dependencies>
        <!-- scala依靠-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- spark依靠-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.2</version>
            <scope>provided</scope>
        </dependency>
        <!--<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.7</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.34</version>
        </dependency>
    </dependencies>