在老东家作业 3 年了,公司的事务和技能栈相对熟练得差不多了。

领导觉得我可以委以重任,便把一个新项目交给我担任,另外指派一名搭档协助我。

项目的要点在于数据的交互比较多,以及每天大量的数据同步和批量操作,不能犯错。

队友主张以短、平、快为主,可以使用已有现成的技能就用现成的技能。直接面向进程开发是人们最为舒适,是人为本能的习惯。由于他有这一种可以处理好的决计,便把数据批量操作这块委托于他。

查看了以往公司现成一些写法,一部分是直接面向 SQL 写法批量刺进,面对增量同步则先查出,存在的更新,不存在的刺进。一部分是经过 Kafka 和后台任务原子操作。

理论上这么操作成果也能成,可是看到修改记录,我就知道面对的需求变了很多改变很快,导致大量的更改。私底下问询担任人也了解出了太多问题,原本一了百了赶忙写完成果反而投入了更多的精力和时刻。

出于防备心理,也对那位搭档进行了提示并且加以思考再下手。

不到一个月,咱们就把项目上线了,并且没有呈现数据上的过错,得到了领导的表彰。

咱们也提前收场,做一些小的优化,其他时刻在摸鱼。

一段时刻之后,费事便接二连三,其一便是开端数据量暴增,那位搭档在做增量同步时进行了锁表操作,批量操作需要一些时刻,在前台读取时呈现响应超时。

其二便是增量同步要调整,以主库或第三方来历库为主,呈现数据更新和删除的需要同步操作。

搭档现在的主力放在了新项目上,把一些零散的时刻用来调整需求和 bug,成果越处理,bug 呈现的越多,不是数量过多卡死便是变量不对导致数据处理不对。

于是到了某一时刻总算迸发,领导找到我俩,被痛批一顿,作业这么久就只会 CRUD 操作,来的实习生都会干的活,还养你们干什么。

当然,要复盘的话当然有迹可循。我想碰见这种情况还真不少,初次开发项目时趁热打铁,以“短、平、快”战术面向进程开发,短时刻内上线。

可是,一个软件的生命周期可不停步于上线,还要进程运维以及面对改变。

导致在二次开发的时候就脱节了,要么当时写法不契合现有事务,要么改动太多动不动就割到了大动脉大出血,要么人跑了…

所以咱们会选用面向对象,抽象化编程,便是用来保稳定,预留一部分来敷衍改变,防止牵一发而动全身。

挨完骂,也要开端收拾烂摊子。

于是我计划重新拼装一个通用的办法,计划一了百了。

首先咱们界说一个接口通用思想IDbAsyncBulk。由于源码现已发布到了github,所以一些注释写成了英文,大致也能看出糟糕英文的注释。

publicinterfaceIDbAsyncBulk
{
///<summary>
///defaultinit.
///usereflecttoautoinitalltype,tolowercasedatabasefileds,anddefaultbasictype.
///ifignoresomefileds,pleaseuseDbBulk,Ignorepropertytoremarkablefileds.
///ifotheroperating,needuser-definedtoinitoperate.
///</summary>
///<typeparamname="T">Correspondingtype</typeparam>
TaskInitDefaultMappings<T>();
///<summary>
///batchoperating
///</summary>
///<typeparamname="T">willoperateobjectentitytype.</typeparam>
///<paramname="connection_string">databaseconnectingstring.</param>
///<paramname="targetTable">targettablename.</param>
///<paramname="list">willoperatedatalist.</param>
TaskCopyToServer<T>(stringconnection_string,stringtargetTable,List<T>list);
///<summary>
///batchoperating
///</summary>
///<typeparamname="T">willoperateobjectentitytype.</typeparam>
///<paramname="connection">databaseconnectingstring.needtocheckdatabaseconnectingisopenning.
///ifnothingotherfollow-upoperate,shouldn'tcoverthisconnecting.</param>
///<paramname="targetTable">targettablename.</param>
///<paramname="list">willoperatedatalist.</param>
TaskCopyToServer<T>(DbConnectionconnection,stringtargetTable,List<T>list);
///<summary>
///renewasitexists,insertasitnotexists.
///followup:
///1.createtemporarytable
///2.putdataintotemporarytable.
///3.mergedatatotargettable.
///</summary>
///<typeparamname="T">datatype</typeparam>
///<paramname="connection_string">connectingstring</param>
///<paramname="keys">mappingorignaltableandtargettablefileds,needprimarykeyanddataonly,ifnotwillthrowerror.</param>
///<paramname="targetTable">targettablename.</param>
///<paramname="list">willoperatedatalist.</param>
///<paramname="tempTable">putdataintotemporarytable,defaultnameas'targettablename+#or_temp'</param>
///<paramname="insertmapping">needtoinsertcolumn,ifisnull,justuseMappingfileds,inordertoavoidauto-createcolumn</param>
///<paramname="updatemapping">needtomodifycolumn,ifisnull,justuseMappingfileds</param>
TaskMergeToServer<T>(stringconnection_string,List<string>keys,stringtargetTable,List<T>list,stringtempTable=null,List<string>insertmapping=null,List<string>updatemapping=null);
///<summary>
///renewasitexists,insertasitnotexists.
///followup:
///1.createtemporarytable
///2.putdataintotemporarytable.
///3.mergedatatotargettable.
///</summary>
///<typeparamname="T">datatype</typeparam>
///<paramname="connection">databaseconnectingstring.needtocheckdatabaseconnectingisopenning.</param>
///<paramname="keys">mappingorignaltableandtargettablefileds,needprimarykeyanddataonly,ifnotwillthrowerror.</param>
///<paramname="targetTable">targettablename.</param>
///<paramname="list">willoperatedatalist.</param>
///<paramname="tempTable">putdataintotemporarytable,defaultnameas'targettablename+#or_temp'</param>
///<paramname="insertmapping">needtoinsertcolumn,ifisnull,justuseMappingfileds,inordertoavoidauto-createcolumn</param>
///<paramname="updatemapping">needtomodifycolumn,ifisnull,justuseMappingfileds</param>
TaskMergeToServer<T>(DbConnectionconnection,List<string>keys,stringtargetTable,List<T>list,stringtempTable=null,List<string>insertmapping=null,List<string>updatemapping=null);
///<summary>
/// batch update operating。
///1.createtemporarytable
///2.putdataintotemporarytable.
///3.mergedatatotargettable.
///</summary>
///<typeparamname="T">datatype</typeparam>
///<paramname="connection_string">connectingstring</param>
///<paramname="where_name">matching'where'comparefileds.</param>
///<paramname="update_name">needtoupdatefileds.</param>
///<paramname="targetTable">targettablename</param>
///<paramname="list">willoperatedatalist.</param>
///<paramname="tempTable">putdataintotemporarytable,defaultnameas'targettablename+#or_temp'</param>
TaskUpdateToServer<T>(stringconnection_string,List<string>where_name,List<string>update_name,stringtargetTable,List<T>list,stringtempTable=null);
///<summary>
/// batch update operating。
///1.createtemporarytable
///2.putdataintotemporarytable.
///3.mergedatatotargettable.
///</summary>
///<typeparamname="T">datatype</typeparam>
///<paramname="connection_string">connectingstring</param>
///<paramname="where_name">matching'where'comparefileds.</param>
///<paramname="update_name">needtoupdatefileds.</param>
///<paramname="targetTable">targettablename</param>
///<paramname="list">willoperatedatalist.</param>
///<paramname="tempTable">putdataintotemporarytable,defaultnameas'targettablename+#or_temp'</param>
///<paramname="createtemp">createtemporarytableornot</param>
TaskUpdateToServer<T>(DbConnectionconnection,List<string>where_name,List<string>update_name,stringtargetTable,List<T>list,stringtempTable=null,boolcreatetemp=true);
///<summary>
///renewasitexists,insertasitnotexists.originaltablenotexistandtargettableexistwillremove.
///1.createtemporarytable
///2.putdataintotemporarytable.
///3.mergedatatotargettable.
///4.willremovedatathattemporarydatanotexistandtargettableexist.
///</summary>
///<typeparamname="T">datatype</typeparam>
///<paramname="connection_string">connectingstring</param>
///<paramname="keys">mappingorignaltableandtargettablefileds,needprimarykeyanddataonly,ifnotwillthrowerror.</param>
///<paramname="targetTable">targettablename</param>
///<paramname="list">willoperatedatalist.</param>
///<paramname="tempTable">putdataintotemporarytable,defaultnameas'targettablename+#or_temp'</param>
///<paramname="insertmapping">needtoinsertcolumn,ifisnull,justuseMappingfileds,inordertoavoidauto-createcolumn</param>
///<paramname="updatemapping">needtomodifycolumn,ifisnull,justuseMappingfileds</param>
TaskMergeAndDeleteToServer<T>(stringconnection_string,List<string>keys,stringtargetTable,List<T>list,stringtempTable=null,List<string>insertmapping=null,List<string>updatemapping=null);
///<summary>
///renewasitexists,insertasitnotexists.originaltablenotexistandtargettableexistwillremove.
///1.createtemporarytable
///2.putdataintotemporarytable.
///3.mergedatatotargettable.
///4.willremovedatathattemporarydatanotexistandtargettableexist.
///</summary>
///<typeparamname="T">datatype</typeparam>
///<paramname="connection">databaseconnectingstring.needtocheckdatabaseconnectingisopenning.</param>
///<paramname="keys">mappingorignaltableandtargettablefileds,needprimarykeyanddataonly,ifnotwillthrowerror.</param>
///<paramname="targetTable">targettablename</param>
///<paramname="list">willoperatedatalist.</param>
///<paramname="tempTable">putdataintotemporarytable,defaultnameas'targettablename+#or_temp'</param>
///<paramname="insertmapping">needtoinsertcolumn,ifisnull,justuseMappingfileds,inordertoavoidauto-createcolumn</param>
///<paramname="updatemapping">needtomodifycolumn,ifisnull,justuseMappingfileds</param>
TaskMergeAndDeleteToServer<T>(DbConnectionconnection,List<string>keys,stringtargetTable,List<T>list,stringtempTable=null,List<string>insertmapping=null,List<string>updatemapping=null);
///<summary>
///createtemporarytable
///</summary>
///<paramname="tempTable">createtemporarytablename</param>
///<paramname="targetTable">rargettablename</param>
///<paramname="connection">databaseconnecting</param>
TaskCreateTempTable(stringtempTable,stringtargetTable,DbConnectionconnection);
}

解说几个办法的作用:

InitDefaultMappings:初始化映射,将方针表的字段映射到实体,在批量操作时候会依据反射进行一一匹配表字段;

CopyToServer:批量新增,在契合数据表结构时批量复制到方针表,选用官方 SqlBulkCopy 类结合实体简化操作。

MergeToServer:增量同步,需指定唯一键,存在即更新,不存在则刺进。支撑指定更新字段,指定刺进字段。

UpdateToServer:批量更新,需指定 where 条件,以及更新的字段。

MergeAndDeleteToServer:增量同步,以数据源和方针表进行匹配,方针表存在的则更新,不存在的则刺进,方针表存在,数据源不存在则方针表移除。

CreateTempTable:创建临时表。

添加实体特点符号,用来符号列名是否疏忽同步数据,以及消除数据库别名,大小写的差异。

///<summary>
///数据库批量操作符号,用于符号对象特点。
///</summary>
publicclassDbBulkAttribute:Attribute
{
///<summary>
///是否疏忽。疏忽则其他特点不需要设置,不疏忽则必须设置Type。
///</summary>
publicboolIgnore{get;set;}
///<summary>
///列名,不设置则默以为实体字段名小写
///</summary>
publicstringColumnName{get;set;}
}

完成类,现在仅支撑 SqlServer 数据库,正在更新 MySql 和 PGSql 中。然后需要界说BatchSize(default 10000)、BulkCopyTimeout (default 300)、ColumnMappings,分别是每批次大小,答应超时时刻和映射的字段。

///<summary>
///sqlserverbatch
///</summary>
publicclassSqlServerAsyncBulk:IDbAsyncBulk
{
///<summary>
///logrecoding
///</summary>
privateILogger_log;
///<summary>
///batch insert size(handle a batch every time )。default 10000。
///</summary>
publicintBatchSize{get;set;}
///<summary>
///overtime,default300
///</summary>
publicintBulkCopyTimeout{get;set;}
///<summary>
///columnsmapping
///</summary>
publicDictionary<string,string>ColumnMappings{get;set;}
///<summary>
///structurefunction
///</summary>
///<paramname="log"></param>
publicSqlServerAsyncBulk(ILogger<SqlServerAsyncBulk>log)
{
_log=log;
BatchSize=10000;
BulkCopyTimeout=300;
}

//...todo

使用上也十分的简便,直接在服务里注册单例形式,使用的时候直接依赖注入。

//ifyouuseSqlServerdatabase,configSqlServerAsyncBulkservice.
services.AddSingleton<IDbAsyncBulk,SqlServerAsyncBulk>();
publicclassBatchOperate
{
privatereadonlyIDbAsyncBulk_bulk;
publicBatchOperate(IDbAsyncBulkbulk)
{
_bulk=bulk;
}
}

以 user_base 表举两个实例,现在测试几十万数据也才零点几秒。

publicasyncTaskCopyToServerTest()
{
varconnectStr=@"DataSource=KF009\SQLEXPRESS;InitialCatalog=MockData;UserID=xxx;Password=xxx";
await_bulk.InitDefaultMappings<UserBaseModel>();
varmock_list=newList<UserBaseModel>();
for(vari=0;i<1000;i++){
mock_list.Add(newUserBaseModel
{
age=i,
birthday=DateTime.Now.AddMonths(-i).Date,
education="本科",
email="xiaoyu@163.com",
name=$"小榆{i}",
nation="",
nationality="我国"
});
}
await_bulk.CopyToServer(connectStr,"user_base",mock_list);
}
publicasyncTaskMergeToServerTest()
{
varconnectStr=@"DataSource=KF009\SQLEXPRESS;InitialCatalog=MockData;UserID=sa;Password=root";
await_bulk.InitDefaultMappings<UserBaseModel>();
varmock_list=newList<UserBaseModel>();
for(vari=0;i<1000;i++)
{
mock_list.Add(newUserBaseModel
{
age=i,
birthday=DateTime.Now.AddMonths(-i).Date,
education="本科",
email="mock@163.com",
name=$"小榆{i}",
nation="汉",
nationality="我国"
});
}
varinsertMapping=newList<string>{"birthday","education","age","email","name","nation","nationality"};
varupdateMapping=newList<string>{"birthday","education","age","email"};
await_bulk.MergeToServer(connectStr,newList<string>{"id"},"user_base",mock_list,null,insertMapping,updateMapping);

到这儿,也现已完成了批量数据操作啦,不用再面对大量的sql操作啦。面向 sql 开发一时确实爽,可是面对改变或许别人接手的时候,是很苦楚的。

详细完成细节内容过多,篇幅有限暂时不全部展示,有兴趣或许尝试的伙伴可以进 github 进行参阅。

github:github.com/sangxiaoyu/…