前言
golang在对接话费充值这一块时,一定会选择同时接入多个渠道。这是基于以下原因:
- 有的渠道便宜,有的渠道很贵。
- 有的渠道对移动/联通/电信手机,不支持,或存在极大的失败率。
- 渠道的不可靠性,需要我们同时接入多个渠道。
接入多个渠道,预先定好谁先谁后,然后写充值和回调逻辑,不是很简单吗,为什么特意形成解决方案?
这是因为:
- 我们接入新的渠道时,不应该侵入历史接入的渠道业务代码。
- 以前接入渠道的人,可能离职转岗,总之做了交接。
- 充值渠道的价格浮动,可靠性浮动,可能需要变更优先顺序,这个变更操作不应该很复杂。
那么,我们要如何以最简单的形式,以兜底的形式,接入多个渠道呢?
实现分析
我们需要基于接口,来实现这一整套流程,将具体某个渠道的业务 ,和调用位置分离。
流程分析:
单渠道充值流程:
多渠道兜底流程分析
实现
channelI.go
- channelI 订制了渠道接口,要求所有渠道都接入该接口
package core
// 话费渠道接口
type HuafeiChannelI interface {
// 渠道key
ChannelKey() string
// 冲话费的电话, 金额(分), 订单号。 返回requestBuf, responseBuf,error
Charge(phone string, amount int, orderId string) ([]byte, []byte, error)
// 回调地址
NotifyURL() string
}
wrapChannel.go
- wrapChannel.go 对channel进行了封装,形成了链表节点。指向了下一次尝试的话费渠道对象。
package core
type wrapChannel struct {
channel HuafeiChannelI
next *wrapChannel
}
func newWrapChannel(channel HuafeiChannelI) *wrapChannel {
return &wrapChannel{
channel: channel,
}
}
func (o *wrapChannel) setNext(next *wrapChannel) {
o.next = next
}
好了,下面是我们的核心。话费调度器
manager.go
package core
import (
"fmt"
)
// 话费管理器,app全局单例
// 内部的状态全量运行期只读,所以不用加锁
type HuafeiManager struct {
channels map[string]HuafeiChannelI // 存放话费充值渠道对象
wrapChannels map[string]*wrapChannel // 用于定位某个回调对象
sorted []*wrapChannel // 用于渠道回调失败降级定序
closeCallback bool // 本参数仅用于测试,true时,将不会执行以下所有回调
allFailFunc func(fen int, attach map[string]interface{
}) // 全部失败时,则会调用allFailFunc。建议在充值话费前,优先扣除掉抵扣物,而不是抵扣物抵扣回滚
onCharge func(channel HuafeiChannelI, req []byte, resp []byte, orderId string) // 每次调用充值时,都会call一次onCharge回调
onNotify func(channel HuafeiChannelI, req []byte, orderId string) // 每次收到回调,都会call一次onNotify回调
onSuccess func(channel HuafeiChannelI, attach map[string]interface{
}) // 收到回调,并且结果为成功,则会触发OnSuccess
}
// 初始化管理器对象
func NewHuafeiManager() *HuafeiManager {
return &HuafeiManager{
channels: make(map[string]HuafeiChannelI),
wrapChannels: make(map[string]*wrapChannel),
sorted: make([]*wrapChannel, 0, 10),
}
}
// 进入测试模式,将不触发回调
func (hm *HuafeiManager) WithoutCallback() {
hm.closeCallback = true
}
// 设置全部失败的回调
func (hm *HuafeiManager) SetAllFailFunc(f func(fen int, attach map[string]interface{
})) {
hm.allFailFunc = f
}
// 设置充值的回调
func (hm *HuafeiManager) SetOnCharge(f func(channel HuafeiChannelI, req []byte, resp []byte, orderId string)) {
hm.onCharge = f
}
// 设置收到notify的回调
func (hm *HuafeiManager) SetOnNotify(f func(channel HuafeiChannelI, req []byte, orderId string)) {
hm.onNotify = f
}
// 设置到账的回调
func (hm *HuafeiManager) SetOnSuccess(f func(channel HuafeiChannelI, attach map[string]interface{
})) {
hm.onSuccess = f
}
func (hm *HuafeiManager) AllFailFunc(fen int, attach map[string]interface{
}) {
if hm.closeCallback == true {
fmt.Println("all fail")
return
}
hm.allFailFunc(fen, attach)
}
func (hm *HuafeiManager) OnCharge(channel HuafeiChannelI, req []byte, resp []byte, orderId string) {
if hm.closeCallback == true {
fmt.Println(fmt.Sprintf("%s_charge_req:", channel.ChannelKey()), string(req))
fmt.Println(fmt.Sprintf("%s_charge_resp:", channel.ChannelKey()), string(resp))
return
}
hm.onCharge(channel, req, resp, orderId)
}
func (hm *HuafeiManager) OnNotify(channel HuafeiChannelI, req []byte, orderId string) {
if hm.closeCallback == true {
fmt.Println(fmt.Sprintf("%s_notify:", channel.ChannelKey()), string(req))
return
}
hm.onNotify(channel, req, orderId)
}
func (hm *HuafeiManager) OnSuccess(channel HuafeiChannelI, attach map[string]interface{
}) {
if hm.closeCallback == true {
fmt.Println(fmt.Sprintf("%s_success:", channel.ChannelKey()))
return
}
hm.onSuccess(channel, attach)
}
// 增加一个话费渠道
func (hm *HuafeiManager) Add(channel HuafeiChannelI) {
channelKey := channel.ChannelKey()
hm.channels[channelKey] = channel
wraped := newWrapChannel(channel)
hm.wrapChannels[channelKey] = wraped
if len(hm.sorted) > 0 {
hm.sorted[len(hm.sorted)-1].setNext(wraped)
}
hm.sorted = append(hm.sorted, wraped)
}
// 充值
type ChargeResp struct {
ChannelResponses []ChannelResponse // 第一次发起充值话费时的request和response
Kvs map[string]string // key为 xxx_charge_request, xxx_charge_response, xxx_notify_request 三类
Err error
}
type ChannelResponse struct {
ChannelKey string `json:"channel_key"`
RequestBody []byte `json:"request_body"`
Response []byte `json:"rsp"`
}
func (hm HuafeiManager) Charge(phone string, fen int, orderId string, attach map[string]interface{
}) {
var failTimes int
for i, _ := range hm.sorted {
channel := hm.sorted[i].channel
requestBuf, responseBuf, e := channel.Charge(phone, fen, orderId)
hm.OnCharge(channel, requestBuf, responseBuf, orderId)
if e == nil {
break
}
failTimes ++
}
if failTimes >= len(hm.channels) {
hm.AllFailFunc(fen, attach)
}
return
}
func (hm *HuafeiManager) HandleNotifyFail(channel HuafeiChannelI, phone string, fen int, orderId string, attach map[string]interface{
}) {
wc, exist := hm.wrapChannels[channel.ChannelKey()]
if !exist {
return
}
if wc.next == nil {
hm.AllFailFunc(fen, attach)
return
}
hm.chargeWithStart(wc.next.channel, phone, fen, orderId, attach)
}
// 从有序渠道里,某一个渠道开始滚动充值
func (hm *HuafeiManager) chargeWithStart(channel HuafeiChannelI, phone string, amount int, orderId string, attach map[string]interface{
}) {
// 不处理未标记key的渠道
if channel.ChannelKey() == "" {
return
}
// 找到基准渠道的有序数组的index
startKey := channel.ChannelKey()
var hitIndex = 0
for i, v := range hm.sorted {
if v.channel.ChannelKey() == startKey {
hitIndex = i
break
}
}
// 选择需要滚动的子序列
var subSorted = make([]*wrapChannel, 0, 10)
for i := hitIndex; i < len(hm.sorted); i ++ {
subSorted = append(subSorted, hm.sorted[i])
}
if len(subSorted) == 0 {
hm.AllFailFunc(amount, attach)
return
}
var failTimes int
for i, _ := range subSorted {
requestBuf, responseBuf, e := subSorted[i].channel.Charge(phone, amount, orderId)
hm.OnCharge(subSorted[i].channel, requestBuf, responseBuf, orderId)
// 成功 break
if e == nil {
break
}
failTimes ++
}
if failTimes >= len(subSorted) {
hm.AllFailFunc(amount, attach)
return
}
return
}
最佳实践
笔者在生产中,接入了【欧飞】【力方】【大猿人】三类话费渠道。
在调度时,用法
package huafeitool
var HuafeiTool *core.HuafeiManager
func init() {
HuafeiTool = core.NewHuafeiManager()
// 按照Add顺序进行兜底。
HuafeiTool.Add(&dayuanren.DayuanrenChannel{
})
HuafeiTool.Add(&lifang.LifangChannel{
})
HuafeiTool.Add(&oufei.OufeiChannel{
})
// 全部失败时回调,行为包括【等价物补回】【订单标记失败】【玩家邮件通知】
HuafeiTool.SetAllFailFunc(func(fen int, attach map[string]interface{
}) {
gameId := commonv2.GetInt(attach, "game_id")
userId := commonv2.GetInt(attach, "user_id")
orderId := commonv2.GetString(attach, "order_id")
var hasWithDraw bool
if gameId != 0 && userId != 0 {
// 余额抵扣物补回
// 标记是否补回成功
}
// 订单标记失败
if orderId != "" {
if hasWithDraw {
// 订单状态修改为失败,并且已经补回道具
} else {
// 订单标记失败,并且补回失败
}
}
// 发送邮件通知
if hasWithDraw && gameId != 0 && userId != 0 {
// 发送邮件通知
}
})
// 触发充值时的回调
HuafeiTool.SetOnCharge(func(channel core.HuafeiChannelI, req []byte, resp []byte, orderId string) {
// 某一个渠道充值时,应该把request和response标记打入订单中,方便回溯
})
// 触发notify时的回调
HuafeiTool.SetOnNotify(func(channel core.HuafeiChannelI, req []byte, orderId string) {
// 某一个渠道到账回调收到时,应该将request打入订单中,方便回溯
})
// 触发成功的回调
HuafeiTool.SetOnSuccess(func(channel core.HuafeiChannelI, attach map[string]interface{
}) {
// 修改订单状态成功
// 发放邮件通知
})
}
充值时,使用管理器来充值
huafeitool.HuafeiTool.Charge("<手机号>", 100, "<订单号>", map[string]interface{
}{
// 附加信息
"order_id": <订单号>,
"user_id": <玩家id>,
})
回调逻辑(以大猿人举例)
func (o *DayuanrenChannel) Notify(manager *core.HuafeiManager) gin.HandlerFunc {
return func(c *gin.Context) {
type Param struct {
OrderNumber string `json:"order_number"`
OutTradeNum string `json:"out_trade_num"`
Otime int `json:"otime"`
State int `json:"state"`
}
var param Param
param.OrderNumber = c.DefaultQuery("order_number", "")
param.OutTradeNum = c.DefaultQuery("out_trade_num", "")
param.Otime, _ = strconv.Atoi(c.DefaultQuery("otime", "-1"))
param.State, _ = strconv.Atoi(c.DefaultQuery("state", "-1")) //1-成功 2-失败
manager.OnNotify(&DayuanrenChannel{
}, []byte(c.Request.URL.RawQuery), param.OutTradeNum)
HandleCallback(param.OutTradeNum, param.State, manager)
c.String(200, "success")
}
}
优化后
通过Add顺序,来决定话费兜底顺序
HuafeiTool.Add(&dayuanren.DayuanrenChannel{
})
HuafeiTool.Add(&lifang.LifangChannel{
})
HuafeiTool.Add(&oufei.OufeiChannel{
})