背景

由于某种原因,要做云搬迁,即需要将数据从一个云 A 搬迁到云 B。现在计划是在不影响现有事务的前提下,用商家现有的东西对数据进行实时同步,待真实搬迁前一天,发布公告,然后找一个晚大将原有云 A 的一切东西迁到云 B。

其间最重要的就是搬迁前后数据的共同性,这个影响比较大。所以在使用商家东西同步数据后的情况下,自己仍需要写个东西来比对搬迁前后数据的共同性。

今日这篇文章主要是给大家总结下,我在写这个东西(脚本)的过程中踩过的坑,以及有哪些地方值得留意和学习的。

写东西前的思考

工欲善其事,必先利其器。

我平常在做一些需求前,都会把不清楚或有疑问的地方先列出来,然后在脑海里先简略想出一些可行的解决计划,终究带着这些问题和计划找 leader 沟通,再确定终究计划。

相同,在写该比对东西前,需要承认和考虑的问题,有以下这些:

  • 比对的是什么数据?是一切仍是只比对一些关键的数据即可?
  • 数据量有多少?需求是要在多少分钟内比对完?
  • 比对完,以什么办法输出不共同的数据?
  • 比对脚本在什么时刻节点履行?
  • ……

一些阐明

  • 比对的是一些核心事务的数据,包括:订单、支付、账单等数据
  • 百万级别的数据量,共触及 3 个库,5 千张表,要求 15 分钟左右比对完
  • 对于不共同的数据,先放到带 buffer 的 channel 中,终究以 excel 文件办法输出
  • 比对脚本在搬迁期间,db 数据同步完后再履行

考虑到对履行效率、性能以及并发度要求较高,因而挑选用 Go 语言来开发此比对脚本。

踩过的坑

并发使用同一数据库局柄

并发使用同一数据库衔接会产生争抢的问题,导致报错。

解决计划:

办法一:使用衔接池(引荐)

可用封装好的,也可自己完成。用 channel 简略完成如下:

funcinit(){
fori:=0;i<DB_POOL_CH;i++{
dbPoolMap:=make(map[string]*mysqlclient.MysqlDBPool)
dbPool,err:=mysqlclient.NewDBPool(g_Config.User,g_Config.Pwd,g_Config.DBHost,"md_deal","utf8",3306,60)
iferr!=nil{
md_log.Errorf(-1,nil,"mysqlclient.InitDBPoolBySetfailed.err:%v",err)
returnerr
}
txDBPool,err:=mysqlclient.NewDBPool(g_Config.User,g_Config.Pwd,g_Config.TXDBHost,"md_deal","utf8",3306,60)
iferr!=nil{
md_log.Errorf(-1,nil,"mysqlclient.InitDBPoolBySetfailed.err:%v",err)
returnerr
}
dbPoolMap["dbPool"]=dbPool
dbPoolMap["txDBPool"]=txDBPool
dbPoolCh<-dbPoolMap
}
md_log.Debugf("initdbconnpollsuccess")
fmt.Println("initdbconnpollsuccess")
}

办法二:使用同一个数据库衔接(不引荐)

该办法并发调用会有问题,示例代码如下:

funcinit(){
//初始化txDB衔接
g_TXDBPool,err=mysqlclient.NewDBPool(g_Config.User,g_Config.Pwd,g_Config.TXDBHost,"md_deal","utf8",3306,10)
iferr!=nil{
md_log.Errorf(-1,nil,"mysqlclient.NewDBPoolfailed.err:%v",err)
returnerr
}
}
funcdoQuery(){
dbPool:=g_TXDBPool.GetOpertBySet(DealSet,0)
oRs,err:=dbPool.Query(dealSql)
iferr!=nil{
md_log.Errorf(-1,nil,"查询deal数据失利,err:%v,tableName:%v,dealSql:%v",err,tableName,dealSql)
return
}
deferoRs.Close()
}

过多的敞开 Goroutine

开端的思路是:一张表开一个协程,5 千张表就有 5 千个协程序,然后一张表的数据又按时刻一个月开一个协程,表中的数据算 07 年开端,即一共开了大概(2022-2007)125000=九十万的协程。

假如对于单独布置的机器来说,这不算很多,可是布置履行该脚本的机器上也有在跑其他事务的程序,导致 cpu 占用飙升,以及短时刻内有大量处于 time_wait 状况的衔接,另外也偶然会报 buffer busy 的情况。

得出的结论是:假如咱们迅速的敞开 goroutine (不操控并发的 goroutine 数量 )的话,会在短时刻内占有操作系统的资源(CPU、内存、文件描述符等)。

  • CPU 使用率起浮上涨
  • Memory 占用不断上涨
  • 主进程崩溃(被杀掉了)

这些资源实际上是一切用户态程序共享的资源,所以大批的 goroutine 敞开终究引发的问题不仅仅是本身,还会影响到其他运行的程序。

所以咱们在平常开发中编写代码的时分,需要留意代码中敞开的 goroutine 数量,以及敞开的 goroutine 是否可以操控,必需要注重的问题。

解决计划:

用带缓冲的 channel 做+sync 来约束 Goroutine 敞开的数量

参阅我之前写过的一篇文章:/post/701728…

多层循环嵌套查数据

所谓比对数据就是比对两个 db 的数据,即先从一个 db 中查出数据,然后根据主键再去查另一个 db 的数据,终究比对每一个字段数据是否共同。

这其间触及循环嵌套查数据库,发现这样做效率比较低。

解决计划:

改为批量查询,然后并发比对数据。参阅代码如下:

funcDoCompareLogic(start,endtime.Time,tableNamestring){
startTime,endTime:=start.Format(Layout),end.Format(Layout)
md_log.Keyf("[Startingcomparelogic]-tableName:%v,startTime:%s,endTime:%s\n",tableName,startTime,endTime)
dbPoolMap:=<-dbPoolCh
deferfunc(){
dbPoolCh<-dbPoolMap
}()
dbPool:=dbPoolMap["dbPool"]
txDBPool:=dbPoolMap["txDBPool"]
dealSql:=fmt.Sprintf("select*from%swhereFlast_update_time>='%v'andFlast_update_time<'%v'",tableName,startTime,endTime)
oRs,err:=dbPool.Query(dealSql)
iferr!=nil{
md_log.Errorf(-1,nil,"查询deal数据失利,err:%v,tableName:%v,dealSql:%v",err,tableName,dealSql)
return
}
deferoRs.Close()
vartxDealSqlstring
varstrDealId,pkNamestring
primaryIds:=make([]int64,0)
dealDataMap:=make(map[int64]map[string]interface{})
//dealDataSlice:=make([]map[string]interface{},0)
foroRs.Next(){
dealMap:=GetDealDataMap(oRs,tableInfoMap)
dealId:=oRs.GetInt64("Fdeal_id")
buyerId:=oRs.GetInt64("Fbuyer_id")
strDealId=fmt.Sprintf("%08d%04d%04d",dealId,buyerId%1000,oRs.GetInt64("Fseller_id")%1000)
varprimaryKeyint64
ifstrings.HasPrefix(tableName,"t_deal"){
primaryKey=dealId
primaryIds=append(primaryIds,dealId)
pkName="Fdeal_id"
}elseifstrings.HasPrefix(tableName,"t_recv"){
recvId:=oRs.GetInt64("Frecv_fee_id")
primaryKey=recvId
primaryIds=append(primaryIds,recvId)
pkName="Frecv_fee_id"
}elseifstrings.HasPrefix(tableName,"t_aftersale"){
afterSaleId:=oRs.GetInt64("Faftersale_id")
primaryKey=afterSaleId
primaryIds=append(primaryIds,afterSaleId)
pkName="Faftersale_id"
}else{
continue
}
dealMap["primaryKey"]=primaryKey
dealMap["strDealId"]=strDealId
dealDataMap[primaryKey]=dealMap
//dealDataSlice=append(dealDataSlice,dealDataMap)
}
iflen(primaryIds)==0{
return
}
a,_:=json.Marshal(primaryIds)
b:=strings.ReplaceAll(string(a),"[","(")
pIds:=strings.ReplaceAll(b,"]",")")
txDealSql=fmt.Sprintf("SELECT*FROM%vWHERE%vIN%v",tableName,pkName,pIds)
txRet,err:=txDBPool.Query(txDealSql)
iferr!=nil{
md_log.Errorf(-1,nil,"查询txDeal数据失利,tableName:%v,txDealSql:%v,err:%v",tableName,txDealSql,err)
return
}
txHasRecord:=false
fortxRet.Next(){
CompareField(tableInfoMap,dealDataMap,txRet,pkName,tableName)
txHasRecord=true
}
if!txHasRecord{
difMap:=make(map[string]interface{})
md_log.Debugf("查询txDeal数据empty,tableName:%v,strDealId:%v",tableName,strDealId)
difMap["primaryKey"]=string(a)
difMap["dealId"]=strDealId
difMap["tableName"]=tableName
difMap["desc"]=fmt.Sprintf("txCloudnorecord")
difCh<-difMap
}
txRet.Close()
md_log.Keyf("[Endcomparelogic]-tableName:%v,startTime:%s,endTime:%s\n",tableName,startTime,endTime)
return
}

总结

  • 不能并发操作同一个数据库衔接
  • 不能无约束的敞开 Goroutine
  • 嵌套循环时,看下时刻复杂度,适当做下优化

比对东西代码路径:

github.com/Scoefield/g…