总结一些 Spark 处理小 trick

前语

最近做了许多数据清洗以及了解的作业,由于处理的数据很大,所以采用了spark进行辅佐处理,期间遇到了许多问题,特此记录一下,供咱们学习。

由于比较了解python, 所以笔者采用的是pyspark,所以下面给的demo都是根据pyspark,其实其他语言脚本相同,重在学习思维,详细完成改改对应的API即可。

这儿尽或许的把一些坑以及完成技巧以demo的方式直白的供给出来,顺序不分先后。有了这些demo,咱们在完成自己各式各样需求尤其是一些有难度需求的时分,就能够参考了,当然了有时间笔者后续还会更新一些demo,感兴趣的同学能够关注下。

trick

首要说一个最基本思维:能map绝不reduce

换句话说当在完成某一需求时,要尽或许得用map类的算子,这是适当快的。可是聚合类的算子通常来说是相对较慢,假如咱们终究不得不必聚合类算子的时分,咱们也要把这一步逻辑看看能不能尽或许的往后放,而把一些比方过滤什么的逻辑往前放,这样终究的数据量就会越来越少,再进行聚合的时分就会快许多。假如反过来,那就因小失大了,尽管终究完成的作用是相同的,可是时间差却是数量级的。

  • 常用API

这儿列一下咱们最常用的算子

rdd=rdd.filter(lambdax:fun(x))
rdd=rdd.map(lambdax:fun(x))
rdd=rdd.flatMap(lambdax:fun(x))
rdd=rdd.reduceByKey(lambdaa,b:a+b)

filter: 过滤,满意条件的回来True, 需求过滤的回来False。

map: 每条样本做一些一同的操作。

flatMap: 一条拆分红多条回来,详细的是list。

reduceByKey: 根据key进行聚合。

  • 聚合

一个最常见的场景便是需求对某一个字段进行聚合:假定现在咱们有一份流水表,其每一行数据便是一个用户的一次点击行为,那现在咱们想统计一下每个用户总共点击了多少次,更乃至咱们想拿到每个用户点击过的一切item调集。伪代码如下:

defget_key_value(x):
user=x[0]
item=x[1]
return(user,[item])
rdd=rdd.map(lambdax:get_key_value(x))
rdd=rdd.reduceByKey(lambdaa,b:a+b)

首要咱们先经过get_key_value函数将每条数据转化成(key, value)的方式,然后经过reduceByKey聚合算子进行聚合,它就会把相同key的数据聚合在一同,说到这儿,咱们或许不觉得有什么?这算什么trick!其实笔者这儿想展现的是get_key_value函数回来方式:[item]

为了比照,这儿笔者再列一下两者的差异:

defget_key_value(x):
user=x[0]
item=x[1]
return(user,[item])
defget_key_value(x):
user=x[0]
item=x[1]
return(user,item)

能够看到第一个的value是一个列表,而第二个便是单纯的item,咱们看reduceByKey这儿咱们用的详细聚合方式是相加,列表相加便是得到一个更大的列表即:

所以终究咱们就拿到了:每个用户点击过的一切item调集,详细的是一个列表。

  • 抽样、分批

在日常中咱们需求抽样出一部分数据进行数据分析或许实验,乃至咱们需求将数据等分红多少份,一份一份用(后边会说),这个时分怎样办呢?

当然了spark也有类似sample这样的抽样算子

那其实咱们也能够完成,并且能够灵敏控制等分等等且速度十分快,如下:

defget_prefix(x,num):
prefix=random.randint(1,num)
return[x,num]
defget_sample(x):
prefix=x[1]
ifprefix==1:
returnTrue
else:
returnFalse
rdd=rdd.map(lambdax:get_prefix(x,num))
rdd=rdd.filter(lambdax:get_sample(x))

假定咱们需求抽取1/10的数据出来,总的思路便是先给每个样本打上一个[1,10]的随机数,然后只过滤出打上1的数据即可。

以此类推,咱们还能够得到3/10的数据出来,那便是在过滤的时分,取出打上[1,2,3]的即可,当然了[4,5,6]也行,只要取三个就行。

  • 笛卡尔积

有的时分需求在两个调集之间做笛卡尔积,假定这两个调集是A和B即两个rdd。

首要spark现已供给了对应的API即cartesian,详细如下:

rdd_cartesian=rdd_A.cartesian(rdd_B)

其更详细的用法和回来方式咱们能够找找相关博客,许多,笔者这儿不再累述。

可是其速度十分慢

尤其当rdd_A和rdd_B比较大的时分,这个时分怎样办呢?

这个时分咱们能够凭借播送机制,其完成已有人也用了这个trick:

http://xiaoyu.world/spark/spark-cartesian/

首要说一下spark中的播送机制,假定一个变量被请求为了播送机制,那么其实是缓存了一个只读的变量在每台机器上,假定当时rdd_A比较小,rdd_B比较大,那么我能够把rdd_A转化为播送变量,然后用这个播送变量和每个rdd_B中的每个元素都去做一个操作,进而完成笛卡尔积的作用,好了,笔者给一下pyspark的完成:

defops(A,B):
pass

deffun(A_list,B):
result=[]
forcur_AinA_list:
result.append(cur_A+B)
returnresult

rdd_A=sc.broadcast(rdd_A.collect())
rdd_cartesian=rdd_B.flatMap(lambdax:fun(rdd_A.value,x))

能够看到咱们先把rdd_A转化为播送变量,然后经过flatMap,将rdd_A和一切rdd_B中的单个元素进行操作,详细是什么操作咱们能够在ops函数中自己界说自己的逻辑。

关于spark的播送机制更多解说,咱们也能够找找文档,许多的,比方:

https://www.cnblogs.com/Lee-yl/p/9777857.html

但目前为止,其实还没有真真结束,从上面咱们能够看到,rdd_A被转化为了播送变量,可是其有一个重要的条件:那便是rdd_A比较小。可是当rdd_A比较大的时分,咱们在转化的过程中,就会报内存错误,当然了能够经过增加配置:

spark.driver.maxResultSize=10g

可是假如rdd_A仍是极其大呢?换句话说rdd_A和rdd_B都是十分大的,哪一个做播送变量都是不合适的,怎样办呢?

其实咱们一部分一部分的做。假定咱们把rdd_A拆分红10份,这样的话,每一份的量级就降下来了,然后把每一份转化为播送变量且都去和rdd_B做笛卡尔积,终究再汇总一下就能够啦。

有了想法,那么怎样完成呢?

分批咱们都会了,如上。可是这儿面会有别的一个问题,那便是这个播送变量名会被重复使用,在进行下一批播送变量的时分,需求先毁掉,再创建,demo如下:

defops(A,B):
pass

deffun(A_list,B):
result=[]
forcur_AinA_list:
result.append(cur_A+B)
returnresult

defget_rdd_cartesian(rdd_A,rdd_B):
rdd_cartesian=rdd_B.flatMap(lambdax:fun(rdd_A.value,x))
returnrdd_cartesian

foriinrange(len(rdd_A_batch))
qb_rdd_temp=rdd_A_batch[i]
qb_rdd_temp=sc.broadcast(qb_rdd_temp.collect())
rdd_cartesian_batch=get_rdd_cartesian(qb_rdd_temp,rdd_B)
dw.saveToTable(rdd_cartesian_batch,tdw_table,"p_"+ds,overwrite=False)
qb_rdd_temp.unpersist()

能够看到,最主要的便是unpersist()

  • 播送变量应用之向量索引

说到播送机制,这儿就再介绍一个略微复杂的demo,乘热打铁。

做算法的同学,或许经常会遇到向量索引这一场景:即每一个item被表征成一个embedding,然后两个item的类似度便能够根据embedding的余弦类似度进行量化。向量索引是指假定来了一个query,候选池子里边假定有几百万的doc,终究意图便是要从候选池子中挑选出与query最类似的n个topk个doc。

关于做大规划数量级的索引现已有许多现成好的API能够用,最常见的包比方有faiss。假如还不了解faiss的同学,能够先简略搜一下其基本用法,看看demo,很简略。

好啦,假定现在query的量级是10w,doc的量级是100w,面对这么大的量级,咱们当然是想经过spark来并行处理,加快核算流程。那么该怎样做呢?

这时咱们便能够使用spark的播送机制进行处理啦,并且很显然doc应该是播送变量,因为每一个query都要和悉数的doc做核算。

废话不多说,直接看完成

首要树立doc索引:

#获取indexembedding,并collect,便利后续树立索引
index_embedding_list=index_embedding_rdd.collect()
all_ids=np.array([row[1]forrowinindex_embedding_list],np.str)
all_vectors=np.array([str_to_vec(row[2])forrowinindex_embedding_list],np.float32)
del(index_embedding_list)
#faiss.normalize_L2(all_vectors)
print(all_ids[:2])
print(all_vectors[:2])
print("allidsize:{},allvecshape:{}".format(len(all_ids),all_vectors.shape))
#树立index索引,并转化为播送变量
faiss_index=FaissIndex(all_ids,all_vectors,self.args.fast_mode,self.args.nlist,self.args.nprobe)
del(all_vectors)
del(all_ids)
print("broadcaststart")
bc_faiss_index=self.sc.broadcast(faiss_index)
print("broadcastdone")

这儿的index_embedding_rdd便是doc的embedding,能够看到先要collect,然后树立索引。

树立完索引后,就能够开端核算了,可是这儿会有一个问题便是query的量级也是比较大的,假如一同核算或许会OM,所以咱们分批次进行即batch:

#开端检索
#https://blog.csdn.net/wx1528159409/article/details/125879542
query_embedding_rdd=query_embedding_rdd.repartition(300)
top_n=5
batch_size=1000
query_sim_rdd=query_embedding_rdd.mapPartitions(
lambdaiters:batch_get_nearest_ids(
iters,bc_faiss_index,top_n,batch_size
)
)

假定query_embedding_rdd是悉数query的embedding,为了完成batch,咱们先将query_embedding_rdd进行分区repartition,然后每个batch进行,能够看到中心便是batch_get_nearest_ids这个函数:

defbatch_get_nearest_ids(iters,bc_faiss_index,top_n,batch_size):
importmkl
mkl.get_max_threads()
res=list()
rows=list()
foritiniters:
rows.append(it)
iflen(rows)>=batch_size:
batch_res=__batch_get_nearest_ids(rows,bc_faiss_index,top_n)
res.extend(batch_res)
rows=list()
ifrows:
batch_res=__batch_get_nearest_ids(rows,bc_faiss_index,top_n)
res.extend(batch_res)
returnres

从这儿能够清楚的看到便是组batch,组够一个batch后就能够给当时这个batch内的query进行核算最类似的候选啦即__batch_get_nearest_ids这个中心函数:

def__batch_get_nearest_ids(rows,bc_faiss_index,top_n):
importmkl
mkl.get_max_threads()
importfaiss
embs=[str_to_vec(row[3])forrowinrows]
vec=np.array(embs,np.float32)
#faiss.normalize_L2(vec)
similarities,dst_ids=bc_faiss_index.value.batch_search(vec,top_n)
batch_res=list()
foriinrange(len(rows)):
batch_res.append([str("\\t".join([rows[i][1],rows[i][2]])),"$$$".join(["\\t".join(dst.split("\\t")+[str(round(sim,2))])fordst,siminzip(dst_ids[i],similarities[i])])])
returnbatch_res

这儿便是真真的调用faiss的索引API进行召回啦,当然了batch_res这个便是成果,自己能够想怎样界说都行,笔者这儿不仅回来了召回的item,还回来了query自身的一些信息。

  • 留意点

在map的时分,不论是self的类成员仍是类办法都要放到外面,不要放到类里边,不然会报错

总结

总归,在用spark做任何需求之前,一定要紧记能map就map,尽量不要聚合算子,真实不可就尽或许放到终究。

进技术交流群请增加AINLP小帮手微信(id:ainlp2)

请补白详细方向+所用到的相关技术点
	
总结一些 Spark 处理小 trick

关于AINLP

AINLP 是一个风趣有AI的自然语言处理社区,专心于 AI、NLP、机器学习、深度学习、引荐算法等相关技术的共享,主题包含文本摘要、智能问答、聊天机器人、机器翻译、自动生成、常识图谱、预训练模型、引荐系统、核算广告、招聘信息、求职经历共享等,欢迎关注!加技术交流群请增加AINLP小帮手微信(id:ainlp2),补白作业/研讨方向+加群意图。

总结一些 Spark 处理小 trick

阅读至此了,共享、点赞、在看三选一吧