Flink流数据api实战之实现机器学习密度峰值聚类算法

最近好几天没更新了,并没有偷闲玩哈,短学期前立的flag仍旧记在心中。没更新的这几天我但是在每天加班加点的学习呢,为了今天这篇博客我这几天没少掉头发啊,呜呜呜呜!!!

事例布景

  • 此事例的数据源为经过GPS定位产生的经纬度信息回来到服务器,然后经过调用特定的定位接口来变成接口文档一片特定区域的平面图的x和y坐标。同一个人收集到的坐标集加上特定的id作为标签。
  • 此事例的数据源的类型为实时流式数据,其间最大的特色就是有头无尾,只需敞开收集程序,就会收集到源源不断的流式数据
  • 此事例的算法思维为:每隔极短的时刻java模拟器收集一次每个人的方位信息数据传入到flink,在flipython能够做什么作业nk中按照id进行分类差异每个人,对每个人https和http的差异的数据在固定时刻类(1天or1周)进行一次密度峰值聚类算法,得到这个人这段时刻类活动规划的多个聚类中心(涉及到自适应的问题,下文只回来最大的那一个)。然后可剖析python怎样读这个人的行为轨迹习气。

详细结束

定义一个简单POJP类作为数据源的数据开源阅览类型

经过事例布景可知道
数据源包含三个参数

  • x: 方位xhttps认证坐标的值
  • y: 方位y坐标的值
  • id:每个人的编号

详细代码如下

public class ClustjavaeeerReading {
//方位信息 x 和 y
public double x,y;
// 编号 id
public int id;
// 默许无参结构函数  (必须有这个,不开源矿工然进行keyby分组的时分会报错)
public ClusterReading()  {    }
// 带参结构函数
public ClusterReading(double x, double y,int id)
{
this.id = id;
this.x = x;
this.y = y;
}
// 定义全部获得办法
public int getId()
{
retu接口测验rn id;
}
public double gejava怎样读tX() {
return x;
}
public double getY(){
return y;
}
// 定义接口crc过错计数全部设置办法
public void setId(int id) {
this.id = id;
}
public void setX(double x) {
this.x = x;
}
public void set开源矿工Y(double y) {
this.y = y;
}
// 重写 tostring()
@Override
public Stpython123ring toString()
{
return "ClusterReading{" + "id='" + id + ''' + ", x=" + x + ", yjavascript=" + y +python爬虫 '}';
}
}

仿照数据源

为什么要弄这java初学开源节流是什么意思是什么呢,由于仅开源代码网站github仅实验,不可能和真是环境一样,只能经过自定义数据源来仿照整个算法进程

为了便java模拟器利:数据源只有三个id标签
详细代码如下

import java.util.Random;
import org.apache.flink.streaming.api.datastreampython安装.DataStreamSource;
imhttps认证port org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
imjava面试题port org.apache.flink.streaming.api.functions.source.SourceFunction;
public class CustomSource {
public stathttps域名ic void main(java难学吗String [] args) th接口测验rows Exception {
// 获取实施环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 获取Source数据源
DataStreamSource<ClusterReading> inputDataStream = env.addSource(new DPCCustomSource());
// 打印输出
inputDjava怎样读ataStream.print();
//实施
env.execute();
}接口crc过错计数
public statijava怎样读c class DPCCustomSource implements SourceFunction<ClusterReading> {
// 定义无线循环,不断产生数据,除非被cancel
boolean running = true;
@Override
publipython123渠道登录c void ruhttps认证n(SourceContext<CljavascriptusterReading> sourceContext) throws Exception {
Random random = new Random();
while (r开源节流unning)
{
int a =开源节流 random.next接口卡Int(3)+1;   //
for(int开源代码网站github i = 0;i<5;i++){
if(a == 1)
{
double x = Math.round((10.00 + random.nextDouble() * 20.00)*100)/100.0; // 规划[10,30]
double y = Math.round((10.00 + random.nextDouble() * 20.00)*100)/100.0;// 规划[10,30]
sourceContext.collec开源软件t(new ClusterReading(x,y,a));
T接口卡hrea开源节流d.sleep(100L);
}
if(a == 2)
{
double x = Math.round((3python能够自学吗.00 + random.nextDouble()python怎样读 * 25.00)*100)/100.0;  // 规划[3,28]
double y = Math.round((3.00 + random.nextDouble() * 25.00)*100)/100.0;  // 规划[3,28]开源阅览
sourceContext.copython爬虫llect(new ClusterReading(x,y,a));
Thread.sleep(100L);
}
if(a == 3)javascript
{
double x = Math.round((10.00 + random.nextDouble() * 20.00)*100)/100.0;  // 规划[1https协议0,30]
double y = Math.round((3.00 + random.nextDouble() * 25java工作培训班.00)*100)/100.0;  // 规划[3,28]
sourceContext.collect(new Clupython能够自学吗sterReading(x,y,a));
Thread.sleep(100L);
}
}
Thread.sleep(1000L);
}
}
@Override
public void cancel接口测验() {
running = false;
}
}
}

事例详细结束

整体结构(main里边)

经过前接口测验面试题面的学习,我们javascript知道flink流处理流java面试题程包含 source->tra开源节流nsform->sink
我们就经过这个流程来剖析

  • sou接口的定义rce:数据源为自己仿照的数据源
  • transform:首要包含以下操作
    • keyby来经过id分组
    • 开窗经过时刻窗口将数据收集
    • 经过全窗口函数来对数据一起处理
  • sink:直接print()打印出最大的聚类中心接口的定义点坐标

详细代码开源阅览如下

p开源阅览app下载安装ublihttps安全问题c class DPC_R开源ealize {
public static void main(String[] args) thro接口的效果ws Exception {
// 获取实施环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironmhttps域名ent();
// 获取Source数据源
DataStreamSource<ClusterReading> inputDataStream = env.addSource(new CustomSource.DPCCustomSource());
DataStreamSink<String> resultDataStream = inputDataStream.python编程keyBy("id")
.timeWindow(Time.sec接口测验onds(10L))
.process(new CustomProcessW接口crc过错计数indowFunction())
.开源矿工print();
env.execute();
}

密度峰值聚类算法的结束

由于Java中没有python中的numpy库对许多数据的批剖析很困难
这儿我们运用了GitHub上的开源项目Deep Java Library中的NDArray这个api,其间有许多相似numpy的操作,但是相关于numpy还是比较僵硬难以上手。

从maven引入依托

<dep接口测验面试题endency>
<groupId开源节流是什么意思是什么>ai.djl</groupId>
<artifactId>api</artifactId>
<version>0.12.0&lpython能够做什么作业t;/version>
</dependency>
<!-- https://mvnrepository.c开源众包om/artifact/ai.djl.pytorch/pytorch-engine -->
<dpython怎样读ependency>
<groupId>ai.djava怎样读jl.pytorch</groupId>
<artifactId>pytorch-engine</artifactId>
<version&python安装gt;0.12.0</开源节流是什么意思是什么version>
</dependency>
<!-- https://mvnrepository.com/artifact/ai.djl.pytorch/pytorch-native-auto -->
<dependency>
<groupId>ai.djl.pytorch</groupId>
<art接口测验ifactId>pytorch-native-auto</ar接口测验tifactId>
<version>1.8.1</versiohttps协议n>
<scope>runtime</scopepython123>
<开源/dependejava面试题ncy>

算法整体结构

private long DPC(NDArray nd) {
NDManager manager = NDManager.newBaseManager();
// 核算间隔矩阵
NDArray dists = getDistanceMatrix(nd);python怎样读
//开源代码网站github 核算dc
NDArray dc = select_dc(dists)python基础教程;
//用开源阅览app下载安装高斯方程核算部分密度
String method = "Gaujava初学ssion";
// 核算部分密度
NDArray rho = get_density(dists,HTTPS dc, method);
//核算密度间隔
NDArray deltas = get_接口deltas(dists, rho开源代码网站github);
//获取聚类中心点
NDArray centers =find_centers_K(rho, deltas);
//https认证回来最大一个 (现在这样)
long centersmax = centers.get(new NDIndex("0")开源是什么意思).toLongArray()[0];
//            double deltasmax =deltas.get(new NDIndex("{}",java初学centersmax)).toDoubleArray()[0];
//            double rhomax =rho.java难学吗get(new NDIndex("{}",centersmax)).toDoubleArray()[0];
rehttps和http的差异turn centersma开源x;
}

核算数据点两两之间的间隔

private NDArray getDistanceMatrix(NDArrhttps和http的差异ay nd) {
//核算数据点两两之间的间隔
NDManager manager = NDManager.newBasejava语言Manager();
//java模拟器 获取 nd的维度数(n,d)
Shape e = nd.getShape();接口crc过错计数
//java模拟器dists初始化为维度为(n,n)
NDArray dists = manager.zeros接口测验面试题(new Shape(e.siz开源众包e()/e.dimension(开源众包),e.开源阅览app下载安装size()/e.dimension()接口文档));
//求javaee出每个点到其它点的间隔
for(int i = 0;i < dists.getShap开源矿工e().dimension();i++)
{
for(int j = 0;j < dists.gethttps域名Shape().dimension();j++)
{
NDAjava面试题rray vi = nd.get(new NDIndex("{}",i));
NDArray vj = nd.get(new开源节流 NDIndex("{}",j));
dists.set(new NDIndex("{},{}",python爬虫i,j), array -> {
array = ((vi.开源是什么意思sub(vj)).dot(vi.su接口类型b(vj))).sqrt();
retujava难学吗rn array;
});
}
}
return dists;
}

寻觅密度核java工作培训班算的开源节流阈值

// 找到密度核算的阈值dc
// 要求均匀每个点周围间隔小于dc的点的数目占总点数的1%-2%
private NDArray select_dc(NDAjava怎样读rray dijava怎样读sts) {
//获取 dists的维度数(n,d)
Shpython123ape e = dists.getShape();
//求出 n 值
long N = e.get(0);
//把 di开源阅览app下载安装sts的形状改为一维 列数N * N
NDArray tt = dists.resh开源阅览a开源pe(N*N);
//开源软件定义筛选百分比
double开源软件 pepython编程rcent = 2.0;
/开源是什么意思/ 方位
int position = (int)(N * (N - 1) * pjava语言ercent / 100);
//回来dc值
return (tt.sort()).get(n开源是什么意思ew NDIndex ("{开源众包}",posHTTPSition + N));
}

核算部分密度

private NDArrjava模拟器ay get_density(NDArray dists,NDAhttps和http的差异rray dc,Stringjavaapi中文在线看 method  )
{
//获取 dists的维度数(n,d)
Shape e = dists.getShape();
//求出 n 值
long N = e.get(0);
/接口crc过错计数/初始化rho数组
NDManager manjava面试题ager = NDManager.newBaseManager();
NDArray rhhttps协议o = manager.zeros(new Shape(N));
for (int i = 0;i<N;i++)
{
//假设没有指定用什么办法,默许办法
if (method == null)
{
//筛选出 (dists[i, :] <开源阅览 dc)条件下的行
NDArray g = dists.get(new NDIndex("{}", i));
NDArray s = g.get(g.lte(python123渠道登录dc)).get(new NDIndex("0"));
// rho[i]为s的维度-1
Shapjavaeee c = s.getShape();
long a = c.get(0)-1;
NDArray r =manager.create(a);
rho.set(new NDIndex("{}", i),aa->r);
}
else   //使用高斯方程核算
{
// 没想让你们看懂
NDArray t = (https安全问题(dists.get(new NDIndex("{}", i)).div(dc)).pow(2).neg().exp()).sum().sub(1);
rho.set(new NDIndex("{}", i),aa->t);
}
}
return rho;
}

核算密度间隔

private NDArray get_deltas(NDArray dists, NDArray rho) {
//获取 dists的维度数(n,d)
Shape e = dists.getShape();
//求出 n 值https安全问题
long Npython编程 = e.gJavaet(0);
//初始化deltas数组
NDManager manager = NDManager.newBaseManager();
NDArray deltas = manager.zeros(new Shape(N));
NDArray index_rho = r开源节流ho.argSort().fli接口p(0);
for (inpython安装t i = 0;i<N;i++)
{
//写出值
long i开源软件ndex =index_rho.ge接口测验t(new NDIndex("{}",i)).toLongArpython能够做什么作业ray()[接口0];
// 关于接口类型密度最大的点
if开源阅览app下载安装(i == 0)
continue;接口是什么
//关于其它的点
// 找到密度比其它点大的序java工作培训班
NDArray index_higher_rho = index_rho.get(new NDIndex(":{}", i));开源节流
//获取这些点间隔当时点的间隔,并找最小值
//下面这段对应python句子为:deltas[index] =python怎样读 np.min(dists[inde开源x, index_higjava难学吗her_rho])
//看了懂算接口卡我输
NDArray z = dists.get(new NDIndex("{}",index));
long Z = index_higher_rho.size();
for (int c = 0;c<Z;c++)
{
NDArray C = manahttps协议ger.zeros(new Shape(Z));
int finalC = c;
C.set(new NDIndex("{}",c), aa->z.get(npython是什么意思ew NDIndjava难学吗ex("{}",index_higher_rho.toLongArray()[finalC] )));
deltas.se开源众包t(new NDIndex("{}", index),aa->C.min());
}
}
//导入python123渠道登录最大值
long max = index_rjavascriptho.get(new NDInjavaapi中文在线看dex("{}",0)).toLongAhttps协议rray()[0];
deltas.set(new NDIndex("{}javaee", max),aa->deltas.max开源软件());
return deltaspython怎样读;
}

获取聚类中心点

private NDA接口和抽象类的差异rray find_centersjava语言_K(NDArray rho, NDArray deltas) {
//每个点都相乘
NDArray rho_delta = rho.java难学吗mul(deltas);
//从大到小排序回来下标NDArray数组
NDArr开源ay centers = rho_delta.python是什么意思argSort().flip(0);
// 回来最大的三个下标
return centeJavars.get(new NDIndejavascriptx("java面试题:3"));
}

全窗口函数的详细结束

详细思维:
先遍历全部数据获取数据数量,创立二维数组
把数据导入到二维数组
经过二维数组创立NDArray数组
实施DPC算法回来聚类中心点的下标
经过下标回来结果Python

详细代码如下

public static  class CustomProcessWindowFunction extends ProcessWindowFunction <ClusterReading,String,Tuple, TimeWindow&gt开源众包;
{
@Override
pu接口测验blic void process(Tuple tuple, Conjavaapi中文在线看text context, Iterable<ClusterReading> iterable, Col接口的效果lector<String> collector) throws Exception {
// 统计数java工作培训班据的数量
int count = 0;
//增强for循环遍历全窗口全部数据
for (ClusterReading cluster : iterable) {
count++;
}
//初始化一个二维数组来记载这个窗口里边的全部数据
dohttps认证uble data [][] = new double[count][];
//增强for循环遍历全窗口全部数据输入到二维java面试题数组
int i =0; // 计数
for (ClusterReading cluster : iterable){
data[i] = new double[] {clpython基础教程uster.getX(), cluster.getY()};
i++接口crc过错计数;
}
//调用 djl中的 NDArray数组 将data导入进行算法剖析
NDManhttps安全问题ager manager = NDManager.newBaseMaJavanager() ;
NDArray nd = manager.create(python基础教程data); // 创立NDArray数组
long centerspython123渠道登录max = DPC(nd);     //接口测验进行dp算法找出聚类中心点的下标
String resultStr = "id:"+tuple.getField(0) +"聚类中心为:"+"x="+data[Math.toIntExajava面试题ct(centersmax)][0]+"y="+data[Math.toIntExact(centersmax)][1];
collector.collect(resultStr);接口测验
}

欢迎交流学习

开源代码网站github人博客

csdn主页

发表评论

提供最优质的资源集合

立即查看 了解详情