2. V0.1-项目构建及根底模块界说

首要咱们创立咱们的项目,项目的主文件目录就叫KisFlow,且在Github上创立对应的库房: github.com/aceld/kis-f… 然后将项目代码clone到本地。

2.0 项目构建

(这儿假如你是依照本教程开发,需求在自己的库房从头创立一个新项目,而且clone到本地开发)

2.0.1 创立项目目录

接下来,咱们先将项目中的必要的文件目录创立好,项目的目录结构如下:

 kis-flow /
.
├── LICENSE
├── README.md
├── common/
├── example/
├── function/
├── conn/
├── config/
├── flow/
└── kis/

这儿咱们创立三个文件夹, common/为 寄存咱们一些共用的根底常量和一些枚举参数,还有一些工具类的办法。 flow/为寄存KisFlow的中心代码。 function/为寄存KisFunction的中心代码。 conn/为寄存KisConnector的中心代码。 config/ 寄存flow、functioin、connector等战略装备信息模块。 example/为咱们针对KisFlow的一些测验事例和test单元测验事例等,能够及时验证咱们的项目效果。 kis/来寄存一切模块的笼统层。

2.0.1 创立go.mod

cd 到 kis-flow的项目根目录,履行如下指令:

咱们会得到go.mod文件,这个是作为当时项目的包管理文件,如下:

module kis-flow
go 1.18

首要因为在之后会有很多调试日志要打印,咱们先把日志模块集成了,日志模块KisFlow供给一个默许的规范输出Logger目标,再对我敞开一个SetLogger() 办法来进行从头设置开发者自己的Logger模块。

2.1 KisLogger

2.1.1 Logger笼统接口

将Logger的界说在kis-flow/log/目录下,创立kis_log.go文件:

kis-flow/log/kis_log.go

package log
import "context"
type KisLogger interface {
	// InfoFX 有上下文的Info等级日志接口, format字符串格局
	InfoFX(ctx context.Context, str string, v ...interface{})
	// ErrorFX 有上下文的Error等级日志接口, format字符串格局
	ErrorFX(ctx context.Context, str string, v ...interface{})
	// DebugFX 有上下文的Debug等级日志接口, format字符串格局
	DebugFX(ctx context.Context, str string, v ...interface{})
	// InfoF 无上下文的Info等级日志接口, format字符串格局
	InfoF(str string, v ...interface{})
	// ErrorF 无上下文的Error等级日志接口, format字符串格局
	ErrorF(str string, v ...interface{})
	// DebugF 无上下文的Debug等级日志接口, format字符串格局
	DebugF(str string, v ...interface{})
}
// kisLog 默许的KisLog 目标
var kisLog KisLogger
// SetLogger 设置KisLog目标, 能够是用户自界说的Logger目标
func SetLogger(newlog KisLogger) {
	kisLog = newlog
}
// Logger 获取到kisLog目标
func Logger() KisLogger {
	return kisLog
}

KisLogger供给了三个等级的日志,别离是Info、Error、Debug。且也别离供给了具有context参数与不具有context参数的两套日志接口。 供给一个大局目标kisLog,默许的KisLog 目标。以及办法SetLogger()Logger()供开发能够设置自己的Logger目标以及获取到Logger目标。

2.1.2 默许的日志目标KisDefaultLogger

假如开发没有自界说的日志目标界说,那么KisFlow会供给一个默许的日志目标kisDefaultLogger,这个类完成了KisLogger的悉数接口,且都是默许打印到规范输出的方式来打印日志,界说在kis-flow/log/目录下,创立kis_default_log.go文件。

kis-flow/log/kis_default_log.go

package log
import (
	"context"
	"fmt"
)
// kisDefaultLog 默许供给的日志目标
type kisDefaultLog struct{}
func (log *kisDefaultLog) InfoF(str string, v ...interface{}) {
	fmt.Printf(str, v...)
}
func (log *kisDefaultLog) ErrorF(str string, v ...interface{}) {
	fmt.Printf(str, v...)
}
func (log *kisDefaultLog) DebugF(str string, v ...interface{}) {
	fmt.Printf(str, v...)
}
func (log *kisDefaultLog) InfoFX(ctx context.Context, str string, v ...interface{}) {
	fmt.Println(ctx)
	fmt.Printf(str, v...)
}
func (log *kisDefaultLog) ErrorFX(ctx context.Context, str string, v ...interface{}) {
	fmt.Println(ctx)
	fmt.Printf(str, v...)
}
func (log *kisDefaultLog) DebugFX(ctx context.Context, str string, v ...interface{}) {
	fmt.Println(ctx)
	fmt.Printf(str, v...)
}
func init() {
	// 假如没有设置Logger, 则启动时运用默许的kisDefaultLog目标
	if Logger() == nil {
		SetLogger(&kisDefaultLog{})
	}
}

这儿在init()初始化办法中,会判别现在是否已经有设置大局的Logger目标,假如没有,KisFlow会默许选择kisDefaultLog 作为大局Logger日志目标。

2.1.3 单元测验KisLogger

现在,咱们先不针对KisLogger做过多的办法开发,咱们优先将现有的程序跑起来,做一个单元测验来测验创立一个KisLogger

kis-flow/test/kis_log_test.go

package test
import (
	"context"
	"kis-flow/log"
	"testing"
)
func TestKisLogger(t *testing.T) {
	ctx := context.Background()
	log.Logger().InfoFX(ctx, "TestKisLogger InfoFX")
	log.Logger().ErrorFX(ctx, "TestKisLogger ErrorFX")
	log.Logger().DebugFX(ctx, "TestKisLogger DebugFX")
	log.Logger().InfoF("TestKisLogger InfoF")
	log.Logger().ErrorF("TestKisLogger ErrorF")
	log.Logger().DebugF("TestKisLogger DebugF")
}

咱们cdkis-flow/test/目录下履行单元测验指令:

go test -test.v -test.paniconexit0 -test.run TestKisLogger

得到成果如下:

=== RUN   TestKisLogger
context.Background
TestKisLogger InfoFX
context.Background
TestKisLogger ErrorFX
context.Background
TestKisLogger DebugFX
TestKisLogger InfoF
TestKisLogger ErrorF
TestKisLogger DebugF
--- PASS: TestKisLogger (0.00s)
PASS
ok      kis-flow/test   0.509s

2.2 KisConfig

在KisFlow中,咱们界说了三种中心模块,别离是KisFunction, KisFlow, KisConnector ,所以KisConfig也别离需求针对这三个模块进行界说,咱们将悉数有关KisConfig的代码都放在kis-flow/config/目录下。

➜  kis-flow git:(master) ✗ tree
.
├── LICENSE
├── README.md
├── common/
│  └── 
├── example/
│  └── 
├── config/
│  ├──
├── test/
└── go.mod

2.2.1 KisFuncConfig 界说

KisFuncConfig在规划文档中的yaml文件方式如下:

kistype: func
fname: 测验KisFunction_S1
fmode: Save
source:
 name: 被校验的测验数据源1-用户订单维度
 must:
 - userid
 - orderid
option:
 cname: 测验KisConnector_1
 retry_times: 3
 retry_duration: 500
 default_params:
 default1: default1_param
 default2: default2_param

参数阐明:

Golang结构实战-KisFlow流式核算结构(2)-项目构建/根底模块-(上)
Golang结构实战-KisFlow流式核算结构(2)-项目构建/根底模块-(上)

接下来咱们依据上述的装备协议,来界说KisFunction的战略装备结构体,而且供给一些呼应的初始化办法。 咱们在项目文档中创立kis_func_config.go文件,在这儿咱们将需求的Config界说完成。

A. 结构体界说

kis-flow/config/kis_func_config.go

package config
import (
	"kis-flow/common"
	"kis-flow/log"
)
// FParam 在当时Flow中Function定制固定装备参数类型
type FParam map[string]string
// KisSource 表明当时Function的业务源
type KisSource struct {
	Name string   `yaml:"name"` //本层Function的数据源描绘
	Must []string `yaml:"must"` //source必传字段
}
// KisFuncOption 可选装备
type KisFuncOption struct {
	CName        string `yaml:"cname"`           //连接器Connector称号
	RetryTimes   int    `yaml:"retry_times"`     //选填,Function调度重试(不包括正常调度)最大次数
	RetryDuriton int    `yaml:"return_duration"` //选填,Function调度每次重试最大时刻间隔(单位:ms)
	Params       FParam `yaml:"default_params"`  //选填,在当时Flow中Function定制固定装备参数
}
// KisFuncConfig 一个KisFunction战略装备
type KisFuncConfig struct {
	KisType string        `yaml:"kistype"`
	FName   string        `yaml:"fname"`
	FMode   string        `yaml:"fmode"`
	Source  KisSource     `yaml:"source"`
	Option  KisFuncOption `yaml:"option"`
}

这儿KisFuncConfig是相关结构体,其中 FParamKisSourceKisFuncOption均为一些相关的参数类型。

B. 相关办法界说

下面咱们先简略的供给创立KisFuncConfig的构造办法。

kis-flow/config/kis_func_config.go

// NewFuncConfig 创立一个Function战略装备目标, 用于描绘一个KisFunction信息
func NewFuncConfig(funcName string, mode common.KisMode, source *KisSource, option *KisFuncOption) *KisFuncConfig {
     config := new(KisFuncConfig)
     config.FName = funcName
     if source == nil {
         log.Logger().ErrorF("funcName NewConfig Error, source is nil, funcName = %sn", funcName)
         return nil
     }
     config.Source = *source
     config.FMode = string(mode)
     //FunctionS 和 L 需求必传KisConnector参数,原因是S和L需求经过Connector进行树立流式联系
     if mode == common.S || mode == common.L {
             if option == nil {
                   log.Logger().ErrorF("Funcion S/L need option->Cidn")
                   return nil
             } else if option.CName == "" {
                   log.Logger().ErrorF("Funcion S/L need option->Cidn")
                   return nil
             }
       }
      if option != nil {
           config.Option = *option
      }
     return config
}

上述代码中提到了common.Scommon.L两个枚举类型,这是咱们针对KisFunction供给的五种类型的枚举值,咱们能够将他们界说在 kis-flow/common/const.go文件中。

kis-flow/common/const.go

package common
type KisMode string
const (
	// V 为校验特征的KisFunction, 
    // 首要进行数据的过滤,验证,字段梳理,幂等等前置数据处理
	V KisMode = "Verify"
	// S 为存储特征的KisFunction, 
    // S会经过NsConnector进即将数据进行存储,数据的临时声明周期为NsWindow
	S KisMode = "Save"
	// L 为加载特征的KisFunction,
    // L会经过KisConnector进行数据加载,经过该Function能够从逻辑上与对应的S Function进行并流
	L KisMode = "Load"
	// C 为核算特征的KisFunction, 
    // C会经过KisFlow中的数据核算,生成新的字段,将数据流传递给下流S进行存储,或许自己也已直接经过KisConnector进行存储
	C KisMode = "Calculate"
	// E 为扩展特征的KisFunction,
    // 作为流式核算的自界说特征Function,如,Notify 调度器触发使命的音讯发送,删除一些数据,重置状态等。
	E KisMode = "Expand"
)

假如fmodeSave或许Load阐明这个function有查询库或许存储数据的行为,那么这个Function就需求相关一个KisConnector,那么CName就需求传递进来。

C. 创立KisFuncConfig单元测验

现在,咱们先不针对KisFuncConfig做过多的办法开发,咱们优先将现有的程序跑起来,做一个单元测验来测验创立一个KisFuncConfig

kis-flow/test/kis_config_test.go

func TestNewFuncConfig(t *testing.T) {
	source := config.KisSource{
		Name: "大众号抖音商城户订单数据",
		Must: []string{"order_id", "user_id"},
	}
	option := config.KisFuncOption{
		CName:        "connectorName1",
		RetryTimes:   3,
		RetryDuriton: 300,
		Params: config.FParam{
			"param1": "value1",
			"param2": "value2",
		},
	}
	myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option)
	log.Logger().InfoF("funcName1: %+vn", myFunc1)
}

咱们cdkis-flow/test/目录下履行单元测验指令:

go test -test.v -test.paniconexit0 -test.run TestNewFuncConfig

得到成果如下:

=== RUN   TestNewFuncConfig
funcName1: &{KisType: FName:funcName1 FMode:Save Source:{Name:大众号抖音商城户订单数据 Must:[order_id user_id]} Option:{CName:connectorName1 RetryTimes:3 RetryDuriton:300 Params:map[param1:value1 param2:value2]}}
--- PASS: TestNewFuncConfig (0.00s)
PASS
ok      kis-flow/test   0.545s

好了,现在最简略的KisFuncConfig的战略创立基本完成了。

2.2.2 KisFlowConfig 界说

KisFlowConfig在规划文档中的yaml文件方式如下:

kistype: flow
status: 1
flow_name: MyFlow1
flows:
  - fname: 测验PrintInput
    params:
      args1: value1
      args2: value2
  - fname: 测验KisFunction_S1
  - fname: 测验PrintInput
    params:
      args1: value11
      args2: value22
      default2: newDefault
  - fname: 测验PrintInput
  - fname: 测验KisFunction_S1
    params:
      my_user_param1: ffffffxxxxxx
  - fname: 测验PrintInput

参数阐明:

Golang结构实战-KisFlow流式核算结构(2)-项目构建/根底模块-(上)

A. 结构体界说

接下来咱们依据上述的装备协议,来界说KisFlow的战略装备结构体,而且供给一些呼应的初始化办法。 咱们在项目文档中创立kis_flow_config.go文件,在这儿咱们将需求的Config界说完成。

kis-flow/config/kis_flow_config.go

package config
import "kis-flow/common"
// KisFlowFunctionParam 一个Flow装备中Function的Id及携带固定装备参数
type KisFlowFunctionParam struct {
	FuncName string `yaml:"fname"`  //必须
	Params   FParam `yaml:"params"` //选填,在当时Flow中Function定制固定装备参数
}
// KisFlowConfig 用户贯穿整条流式核算上下文环境的目标
type KisFlowConfig struct {
	KisType  string                 `yaml:"kistype"`
	Status   int                    `yaml:"status"`
	FlowName string                 `yaml:"flow_name"`
	Flows    []KisFlowFunctionParam `yaml:"flows"`
}

这儿供给了一个新的参数类型 KisFlowFunctionParam ,这个表明装备KisFlow的时候,在调度的时候,flow默许传递当时被调度Function的自界说默许参数,假如不需求能够不增加此参数。

B. 相关办法界说

供给一个新建KisFlowConfig的构造办法。

kis-flow/config/kis_flow_config.go

// NewFlowConfig 创立一个Flow战略装备目标, 用于描绘一个KisFlow信息
func NewFlowConfig(flowName string, enable common.KisOnOff) *KisFlowConfig {
	config := new(KisFlowConfig)
	config.FlowName = flowName
	config.Flows = make([]KisFlowFunctionParam, 0)
	config.Status = int(enable)
	return config
}
// AppendFunctionConfig 增加一个Function Config 到当时Flow中
func (fConfig *KisFlowConfig) AppendFunctionConfig(params KisFlowFunctionParam) {
	fConfig.Flows = append(fConfig.Flows, params)
}

有关flow携带的Function装备,这儿咱们选用经过AppendFunctionConfig动态的去增加,目的是为了,今后或许有关kisflow的装备会从数据库/动态远程装备等中提取,那么就需求动态的将装备组合进来。

C. KisFlowConfig单元测验

相同,咱们简略些一个单元测验来测验KisFlowConfig的创立。

kis-flow/test/kis_config_test.go


func TestNewFlowConfig(t *testing.T) {
	flowFuncParams1 := config.KisFlowFunctionParam{
		FuncName: "funcName1",
		Params: config.FParam{
			"flowSetFunParam1": "value1",
			"flowSetFunParam2": "value2",
		},
	}
	flowFuncParams2 := config.KisFlowFunctionParam{
		FuncName: "funcName2",
		Params: config.FParam{
			"default": "value1",
		},
	}
	myFlow1 := config.NewFlowConfig("flowName1", common.FlowEnable)
	myFlow1.AppendFunctionConfig(flowFuncParams1)
	myFlow1.AppendFunctionConfig(flowFuncParams2)
	log.Logger().InfoF("myFlow1: %+vn", myFlow1)
}

咱们cdkis-flow/test/目录下履行单元测验指令:

$ go test -test.v -test.paniconexit0 -test.run TestNewFlowConfig

得到成果如下:

=== RUN   TestNewFlowConfig
myFlow1: &{KisType: Status:1 FlowName:flowName1 Flows:[{FuncName:funcName1 Params:map[flowSetFunParam1:value1 flowSetFunParam2:value2]} {FuncName:funcName2 Params:map[default:value1]}]}
--- PASS: TestNewFlowConfig (0.00s)
PASS
ok      kis-flow/test   0.251s

2.2.3 KisConnConfig

KisConnConfig在规划文档中的yaml文件方式如下:

kistype: conn
cname: 测验KisConnector_1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: userid_orderid_option
params:
  args1: value1
  args2: value2
load: null
save:
  - 测验KisFunction_S1

A. 结构体界说

接下来咱们依据上述的装备协议,来界说KisConnector的战略装备结构体,而且供给一些呼应的初始化办法。 咱们在项目文档中创立kis_conn_config.go文件,在这儿咱们将需求的Config界说完成。

kis-flow/config/kis_conn_config.go

package config
import (
	"errors"
	"fmt"
	"kis-flow/common"
)
// KisConnConfig KisConnector 战略装备
type KisConnConfig struct {
	//装备类型
	KisType string `yaml:"kistype"`
	//唯一描绘标识
	CName string `yaml:"cname"`
	//根底存储前言地址
	AddrString string `yaml:"addrs"`
	//存储前言引擎类型"Mysql" "Redis" "Kafka"等
	Type common.KisConnType `yaml:"type"`
	//一次存储的标识:如Redis为Key称号、Mysql为Table称号,Kafka为Topic称号等
	Key string `yaml:"key"`
	//装备信息中的自界说参数
	Params map[string]string `yaml:"params"`
	//存储读取所绑定的NsFuncionID
	Load []string `yaml:"load"`
	Save []string `yaml:"save"`
}

B. 相关办法界说

kis-flow/config/kis_conn_config.go

// NewConnConfig 创立一个KisConnector战略装备目标, 用于描绘一个KisConnector信息
func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig {
	strategy := new(KisConnConfig)
	strategy.CName = cName
	strategy.AddrString = addr
	strategy.Type = t
	strategy.Key = key
	strategy.Params = param
	return strategy
}
// WithFunc Connector与Function进行联系绑定
func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {
	switch common.KisMode(fConfig.FMode) {
	case common.S:
		cConfig.Save = append(cConfig.Save, fConfig.FName)
	case common.L:
		cConfig.Load = append(cConfig.Load, fConfig.FName)
	default:
		return errors.New(fmt.Sprintf("Wrong KisMode %s", fConfig.FMode))
	}
	return nil
}

这儿也是经过供给WithFunc办法来动态的增加Conn和Function的相关联系 ###

C. KisConnConfig 单元测验 相同,咱们简略些一个单元测验来测验KisConnConfig的创立。

kis-flow/test/kis_config_test.go

func TestNewConnConfig(t *testing.T) {
	source := config.KisSource{
		Name: "大众号抖音商城户订单数据",
		Must: []string{"order_id", "user_id"},
	}
	option := config.KisFuncOption{
		CName:        "connectorName1",
		RetryTimes:   3,
		RetryDuriton: 300,
		Params: config.FParam{
			"param1": "value1",
			"param2": "value2",
		},
	}
	myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option)
	connParams := config.FParam{
		"param1": "value1",
		"param2": "value2",
	}
	myConnector1 := config.NewConnConfig("connectorName1", "0.0.0.0:9987,0.0.0.0:9997", common.REDIS, "key", connParams)
	if err := myConnector1.WithFunc(myFunc1); err != nil {
		log.Logger().ErrorF("WithFunc err: %sn", err.Error())
	}
	log.Logger().InfoF("myConnector1: %+vn", myConnector1)
}

咱们cdkis-fow/test/目录下履行单元测验指令:

$ go test -test.v -test.paniconexit0 -test.run TestNewConnConfig

得到成果如下:

=== RUN   TestNewConnConfig
myConnector1: &{KisType: CName:connectorName1 AddrString:0.0.0.0:9987,0.0.0.0:9997 Type:redis Key:key Params:map[param1:value1 param2:value2] Load:[] Save:[funcName1]}
--- PASS: TestNewConnConfig (0.00s)
PASS
ok      kis-flow/test   0.481s

作者:刘丹冰Aceld github: github.com/aceld

KisFlow开源项目地址:github.com/aceld/kis-f…


连载中…

Golang结构实战-KisFlow流式核算结构(1)-概述

Golang结构实战-KisFlow流式核算结构(2)-项目构建/根底模块-(上)