最近在学习golang,众所周知,golang中的goroutine和channel 作为golang中的扛鼎之作,换句话说,不会灵活运用goroutine和channel,那么就不能算是真正了解了这门语言。
而goreman作为一款轻量级的多进程管理工具,代码量只有不到1000行,然而使用了许多巧妙地设计,和大量使用goroutine和channel,以及使用了golang的rpc,net,flag,context等工作中出镜率极高的包,是学习golang的佳作,那么,现在就让我们一起来看看吧!
goreman源码地址:https://github.com/mattn/goreman
先附上goreman的架构图
main函数作为程序的入口,做了一些系统的初始化,和根据用户行为调用各自的处理函数
func main() {
var err error
//初始化一些系统配置
cfg := readConfig()
if cfg.BaseDir != "" {
err = os.Chdir(cfg.BaseDir)
if err != nil {
fmt.Fprintf(os.Stderr, "goreman: %s\n", err.Error())
os.Exit(1)
}
}
//解析命令行参数
cmd := cfg.Args[0]
switch cmd {
case "check":
err = check(cfg)
case "help":
usage()
case "run":
if len(cfg.Args) >= 2 {
cmd, args := cfg.Args[1], cfg.Args[2:]
err = run(cmd, args, cfg.Port)
} else {
usage()
}
case "export":
if len(cfg.Args) == 3 {
format, path := cfg.Args[1], cfg.Args[2]
err = export(cfg, format, path)
} else {
usage()
}
case "start":
//os 的signal 通道,用于接收信号,对信号进行处理
c := notifyCh()
//启动
err = start(context.Background(), c, cfg)
case "version":
showVersion()
default:
usage()
}
if err != nil {
fmt.Fprintf(os.Stderr, "%s: %v\n", os.Args[0], err.Error())
os.Exit(1)
}
}
我们只看核心的start行为和run command行为
start行为做了两件事:
打开signal channel
用于捕获操作系统的三个信号,对应架构图右边的用户命令行执行ctrl+c命令,向操作系统发出退出信号,此信号被goreman捕获,也就是向channel 3发送消息。
捕获消息的代码在main.go的notifyCh()函数
//程序终止(interrupt)信号, 在用户键入INTR字符(通常是Ctrl-C)时发出,用于通知前台进程组终止进程。
const sigint = unix.SIGINT
//程序结束(terminate)信号, 与SIGKILL不同的是该信号可以被阻塞和处理。通常用来要求程序自己正常退出,shell命令kill缺省产生这个信号。如果进程终止不了,我们才会尝试SIGKILL
//kill -9 会产生SIGKILL信号,不会被阻塞或者忽略
const sigterm = unix.SIGTERM
//登录Linux时,系统会分配给登录用户一个终端(Session)。在这个终端运行的所有程序,包括前台进程组和后台进程组,一般都属于这个 Session。当用户退出Linux登录时,
//前台进程组和后台有对终端输出的进程将会收到SIGHUP信号,可以被捕获和忽略
const sighup = unix.SIGHUP
func notifyCh() <-chan os.Signal {
sc := make(chan os.Signal, 10)
//收到这三个信号的通知,将其输入chan,做一些处理,否则不捕获
signal.Notify(sc, sigterm, sigint, sighup)
return sc
}
启动被管理进程start()
start函数概括下来做了三件事:
- 解析启动配置文件Procfile
//解析Procfile文件,将cfg赋值
err := readProcfile(cfg)
- 启动一个rpc server,用于多个客户端之间的交互,对应架构图左边的用户流,另一个客户端在start之后向goreman发送stop的消息,则对应rpc message的channel,发起rpc调用,处理用户行为。
if *startRPCServer {
//开启一个rpc服务,来接收另一个客户端发来的消息,这里不能阻塞程序的执行,所以创建一个gorutine去执行
go startServer(ctx, rpcChan, cfg.Port)
}
- 进行进程的启动,此函数使用了大量的golang中的同步原语,值得注意的是,在全部的执行流中,proc作为共享变量,对他的操作需要加锁,为了防止线程安全问题,建议直接加大锁,降低锁粒度能提高程序性能,但保证程序的线程安全是最重要的,除非你真的对程序的执行了如指掌
// spawn all procs.
func startProcs(sc <-chan os.Signal, rpcCh <-chan *rpcMessage, exitOnError bool) error {
var wg sync.WaitGroup
errCh := make(chan error, 1)
for _, proc := range procs {
//处理每一个proc,
startProc(proc.name, &wg, errCh)
}
allProcsDone := make(chan struct{
}, 1)
if *exitOnStop {
//开一个协程,去如果startProc函数wg done了,则发送allprocsDone的消息
go func() {
wg.Wait()
allProcsDone <- struct{
}{
}
}()
}
//进入阻塞状态,等待
for {
select {
//收到其他客户端的rpc消息
case rpcMsg := <-rpcCh:
switch rpcMsg.Msg {
// TODO: add more events here.
case "stop":
for _, proc := range rpcMsg.Args {
if err := stopProc(proc, nil); err != nil {
rpcMsg.ErrCh <- err
break
}
}
close(rpcMsg.ErrCh)
default:
panic("unimplemented rpc message type " + rpcMsg.Msg)
}
//收到错误消息
case err := <-errCh:
if exitOnError {
stopProcs(os.Interrupt)
return err
}
//收到所有都停止的消息
case <-allProcsDone:
return stopProcs(os.Interrupt)
//收到命令行ctrl+c 停止所有进程的消息
case sig := <-sc:
return stopProcs(sig)
}
}
}
// start specified proc. if proc is started already, return nil.
func startProc(name string, wg *sync.WaitGroup, errCh chan<- error) error {
proc := findProc(name)
if proc == nil {
return errors.New("unknown name: " + name)
}
proc.mu.Lock()
// 在这里对进程已经运行做判断
if proc.cmd != nil {
proc.mu.Unlock()
return nil
}
if wg != nil {
wg.Add(1)
}
go func() {
//详细的执行命令过程
spawnProc(name, errCh)
if wg != nil {
wg.Done()
}
proc.mu.Unlock()
}()
return nil
}