进入到kafkaWorker的所在目录, 以下没有特殊说明的情况下均以此目录作为相对目录
• 配置
在conf目中创建一个子目录demo。 (实际使用时,命名方式根据业务决定)
demo中至少三个文件是必须要配置的. 文件命名无特殊要求,因为运行kafkaWorker时需要指定参数,文件格式不变即可。
- config.json:zookeeper信息; 这文件参考courseh5、pk、gold 中的json文件格式,变量为 kafka的topic、zk中的存储路径,zk 以及hosts
- conf.ini:kafkaWorker运行所需要的配置项;参考:业务应用框架
- template.ini: kafkaWorker处理消息的模板定义。这个文件比较特殊,其全路径配置在conf.ini -> DEFAULT -> tpl,kafkaWorker在运行时会读取这个变量来获取模板并编译
这里主要说下 template.ini 模板文件 (模板解析采用了go的原生包:text/template)
[demo]
-={
{
$arg := .}}{
{
comsumKafkaDemo $arg }}
-=@NONE
元素介绍:
- [demo] : 为命令名称。 在调用kafkaProxy 时,传入的标识符,和数据一起发送给kafkaWorker. eg:
package demo
import (
"git.xesv5.com/golib/gotools/utils/kafkautil"
"git.xesv5.com/golib/logger"
"streamCalculator/service/demo/proto"
)
func sendKafka(topic, key string, data []byte) error {
if topic == "" || key == "" {
return logger.NewError("kafka topic%s,key:%s exception", topic, key)
}
poolByte := getByte()
dataBuf := bytes.NewBuffer(poolByte)
dataBuf.WriteString(key)
dataBuf.WriteString(" ")
dataBuf.Write(data)
if err := kafkautil.Send2Proxy(topic, dataBuf.Bytes()); err != nil {
freeByte(poolByte)
return err
}
freeByte(poolByte)
return nil
}
func main(){
logs := proto.SubmitStuLogs{
123,
456,
789,
"Hello World!",
}
var recordBuf bytes.Buffer
if err := msgp.Encode(&recordBuf, &logs); err != nil {
logger.E("submitStuLogsAction", "msgp.encode err:%v", err)
}
//向kafkaProxy发送异步消息
sendKafka(demo_topic, "demo", recordBuf.Bytes())
}
-
“-=” : 为 .ini文件缺省键的定义格式,表示解析配置文件时,每行的键为自然序列: “#1”, “#2”,… , 从1开始。
-
**"{ { KaTeX parse error: Expected 'EOF', got '}' at position 10: arg := . }̲}" :** 这是变量占位符,args 为定义的临时变量,可在本模板中被其他占位符引用, 等号后面的“.” 代表执行模板替换时所传递的对象,如:tmpl.Execute(out, data) 则 "."就代表对象data。
在kafkaWorker中 . 特指字节序列:dataBuf.Write(data) (上例)
-
命令的边界符: @NONE 纯变量传入并执行; @CURL @RET @END 配对使用 构造http请求; 缺省时按SQL语句进行解析
-
**自定义函数 { {comsumKafkaDemo KaTeX parse error: Expected 'EOF', got '}' at position 5: args}̲}:** 其中comsumKa…args 则为其接收的变量
• 模板函数自定义
首先kafkaWorker已经帮我们预备了一些常用函数,参考此包定义:github.com/Masterminds/sprig。
进入目录 src/template ,新建一个go文件
//+build newstudent
package template
import (
"bytes"
"git.xesv5.com/golib/logger"
"github.com/tinylib/msgp/msgp"
"git.xesv5.com/service/streamCalculator/service/demo"
"git.xesv5.com/service/streamCalculator/service/demo/proto"
)
var demoReply = proto.Reply{
}
func init() {
//注册模板处理函数,和template.ini 中的函数一一对应, 函数的入参和返参个数、类型任意
//kafkaWorker 仅是作为消息消费之用,业务逻辑最好还是写到其他服务中,以包的形式引入
funcsMap["comsumKafkaDemo"] = func(v interface{
}) (ret string, err error) {
logs := proto.SubmitStuLogs{
}
err = msgp.Decode(bytes.NewBuffer(toByteSlice(v, nil)), &logs)
logger.I("comsumKafkaDemo", logs)
demoSev := &demo.DemoService{
}
demoSev.SaveToRedis(createCtxFromMsg(logs), &logs, &demoReply)
return "OK", nil
}
}
至此一个kafkaWorker 就基本配置完毕,接下来就是在自己的项目中写具体的业务逻辑了
• 序列化
kafkaWorker 和 kafkaProxy 之间是通过字节码进行数据传输的,所以我们需要把结构体进行编码,msgp是一种简单易用的方式。
定义好结构后,在需要进行编码结构体最上面添加注释: //go:generate msgp。 多个文件就写多个
//go:generate msgp
package proto
type Reply struct {
Code int64
Msg string
Data []interface{
}
}
type SubmitStuLogs struct {
StuId int64 `json:"stu_id"`
PlanId int64 `json:"plan_id"`
TestId int64 `json:"test_id"`
Info string `json:"info"`
}
安装msgp: go get -u -t github.com/tinylib/msgp
$ cd /结构体存放目录 && go generate
之后,结构体文件所在目录会自动生成文件,msgp已经帮助我们自动实现了编码接口
• 编译安装
$ make
经过漫长的等待后,可执行文件被安装在了./bin目录下
• 运行
$ ./bin/kafkaWorker -h
Usage of ./bin/kafkaWorker:
-c string
config path, 注:相对目录是以可执行文件所在目录为参照(如本例的目录 ...../kafkaWorker/bin)
-cfg string
json config path (default "conf/config.json") 注:相对目录是以运行目录为参照(如本例的目录 ...../kafkaWorker)
-f foreground
-m foreground
-mode int
mode
-p string
config path prefix with no trailing backslash
-s string
start or stop
-v foreground
$ ./bin/kafkaWorker -c ../conf/demo/conf.ini -cfg conf/demo/config.json
出现如下信息即为成功启动
....
Connected to 10.17.91.13:2181
Authenticated: id=245591183736176647, timeout=10000
Re-submitting `0` credentials after reconnect