Go教程

endless 如何实现不停机重启 Go 程序?

本文主要是介绍endless 如何实现不停机重启 Go 程序?,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

转载请声明出处哦~,本篇文章发布于luozhiyun的博客:https://www.luozhiyun.com/archives/584

前几篇文章讲解了如何实现一个高效的 HTTP 服务,这次我们来看一下如何实现一个永不不停机的 Go 程序。

前提

事情是这样的,在一天风和日丽的周末,我正在看 TiDB 源码的时候,有一位胖友找到我说,Go 是不是每次修改都需要重启才行?由于我才疏学浅不知道有不停机重启这个东西,所以回答是的。然后他说,那完全没有 PHP 好用啊,PHP 修改逻辑完之后直接替换一个文件就可以实现发布,不需要重启。我当时只能和他说可以多 Pod 部署,金丝雀发布等等也可以做到整个服务不停机发布。但是他最后还是带着得以意笑容离去。

当时看着他离去的身影我就发誓,我要研究一下 Go 语言的不停机重启,证明不是 Go 不行,而是我不行 [DOGE] [DOGE] [DOGE],所以就有了这么一篇文章。

那么对于一个不停机重启 Go 程序我们需要解决以下两个问题:

  1. 进程重启不需要关闭监听的端口;
  2. 既有请求应当完全处理或者超时;

后面我们会看一下 endless 是如何做到这两点的。

基本概念

下面先简单介绍一下两个知识点,以便后面的开展

信号处理

Go 信号通知通过在 Channel 上发送 os.Signal 值来工作。如我们如果使用 Ctrl+C,那么会触发 SIGINT 信号,操作系统会中断该进程的正常流程,并进入相应的信号处理函数执行操作,完成后再回到中断的地方继续执行。

func main() {
    sigs := make(chan os.Signal, 1)
    done := make(chan bool, 1)
    // 监听信号
    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) 
    go func() {
        // 接收到信号返回
        sig := <-sigs
        fmt.Println()
        fmt.Println(sig)
        done <- true
    }() 
    fmt.Println("awaiting signal")
    // 等待信号的接收
    <-done
    fmt.Println("exiting")
}

通过上述简单的几行代码,我们就可以监听 SIGINT 和 SIGTERM 信号。当 Go 接收到操作系统发送过来的信号,那么会将信号值放入到 sigs 管道中进行处理。

Fork 子进程

在Go语言中 exec 包为我们很好的封装好了 Fork 调用,并且使用它可以使用 ExtraFiles 很好的继承父进程已打开的文件。

file := netListener.File() // this returns a Dup()
path := "/path/to/executable"
args := []string{
    "-graceful"}
// 产生 Cmd 实例
cmd := exec.Command(path, args...)
// 标准输出
cmd.Stdout = os.Stdout
// 标准错误输出
cmd.Stderr = os.Stderr
cmd.ExtraFiles = []*os.File{file}
// 启动命令
err := cmd.Start()
if err != nil {
    log.Fatalf("gracefulRestart: Failed to launch, error: %v", err)
}

通过调用 exec 包的 Command 命令传入 path(将要执行的命令路径)、args (命令的参数)即可返回 Cmd 实例,通过 ExtraFiles 字段指定额外被新进程继承的已打开文件,最后调用 Start 方法创建子进程。

这里的 netListener.File会通过系统调用 dup 复制一份 file descriptor 文件描述符。

func Dup(oldfd int) (fd int, err error) {
	r0, _, e1 := Syscall(SYS_DUP, uintptr(oldfd), 0, 0)
	fd = int(r0)
	if e1 != 0 {
		err = errnoErr(e1)
	}
	return
}

我们可以看到 dup 的命令介绍:

dup and dup2 create a copy of the file descriptor oldfd.
After successful return of dup or dup2, the old and new descriptors may
be used interchangeably. They share locks, file position pointers and
flags; for example, if the file position is modified by using lseek on
one of the descriptors, the position is also changed for the other.

The two descriptors do not share the close-on-exec flag, however.

通过上面的描述可以知道,返回的新文件描述符和参数 oldfd 指向同一个文件,共享所有的索性、读写指针、各项权限或标志位等。但是不共享关闭标志位,也就是说 oldfd 已经关闭了,也不影响写入新的数据到 newfd 中。

graceful_restart3

上图显示了fork一个子进程,子进程复制父进程的文件描述符表。

endless 不停机重启示例

我这里稍微写一下 endless 的使用示例给没有用过 endless 的同学看看,熟悉 endless 使用的同学可以跳过。

import (
	"log"
	"net/http"
	"os"
	"sync"
	"time"

	"github.com/fvbock/endless"
	"github.com/gorilla/mux"
)

func handler(w http.ResponseWriter, r *http.Request) {
	duration, err := time.ParseDuration(r.FormValue("duration"))
	if err != nil {
		http.Error(w, err.Error(), 400)
		return
	}
	time.Sleep(duration)
	w.Write([]byte("Hello World"))
}

func main() {
	mux1 := mux.NewRouter()
	mux1.HandleFunc("/sleep", handler)

	w := sync.WaitGroup{}
	w.Add(1)
	go func() {
		err := endless.ListenAndServe("127.0.0.1:5003", mux1)
		if err != nil {
			log.Println(err)
		}
		log.Println("Server on 5003 stopped")
		w.Done()
	}()
	w.Wait()
	log.Println("All servers stopped. Exiting.")

	os.Exit(0)
}

下面验证一下 endless 创建的不停机服务:

# 第一次构建项目
go build main.go
# 运行项目,这时就可以做内容修改了
./endless &
# 请求项目,60s后返回
curl "http://127.0.0.1:5003/sleep?duration=60s" &
# 再次构建项目,这里是新内容
go build main.go
# 重启,17171为pid
kill -1 17171
# 新API请求
curl "http://127.0.0.1:5003/sleep?duration=1s" 

运行完上面的命令我们可以看到,对于第一个请求返回的是:Hello world,在发送第二个请求之前,我将 handler 里面的返回值改成了:Hello world2222,然后进行构建重启。

由于我设置了 60s 才返回第一个请求,第二个请求设置的是 1s 返回,所以这里会先返回第二个请求的值,然后再返回第一个请求的值。

整个时间线如下所示:

graceful_restart2

并且在等待第一个请求返回期间,可以看到同时有两个进程在跑:

$ ps -ef |grep main
root      84636  80539  0 22:25 pts/2    00:00:00 ./main
root      85423  84636  0 22:26 pts/2    00:00:00 ./main

在第一个请求响应之后,我们再看进程可以发现父进程已经关掉了,实现了父子进程无缝切换:

$ ps -ef |grep main
root      85423      1  0 22:26 pts/2    00:00:00 ./main

实现原理

在实现上,我这里用的是 endless 的实现方案,所以下面原理和代码都通过它的代码进行讲解。

我们要做的不停机重启,实现原理如上图所示:

  1. 监听 SIGHUP 信号;
  2. 收到信号时 fork 子进程(使用相同的启动命令),将服务监听的 socket 文件描述符传递给子进程;
  3. 子进程监听父进程的 socket,这个时候父进程和子进程都可以接收请求;
  4. 子进程启动成功之后发送 SIGTERM 信号给父进程,父进程停止接收新的连接,等待旧连接处理完成(或超时);
  5. 父进程退出,升级完成;

代码实现

我们从上面的示例可以看出,endless 的入口是 ListenAndServe 函数:

 func ListenAndServe(addr string, handler http.Handler) error {
    // 初始化 server
	server := NewServer(addr, handler)
    // 监听以及处理请求
	return server.ListenAndServe()
}

这个方法分为两部分,先是初始化 server,然后再监听以及处理请求。

初始化 Server

我们首先看一下一个 endless 服务的 Server 结构体是怎样:

type endlessServer struct {
	// 用于继承 http.Server 结构
	http.Server
	// 监听客户端请求的 Listener
	EndlessListener  net.Listener  
	// 用于记录还有多少客户端请求没有完成
	wg               sync.WaitGroup
	// 用于接收信号的管道
	sigChan          chan os.Signal
	// 用于重启时标志本进程是否是为一个新进程
	isChild          bool
	// 当前进程的状态
	state            uint8 
    ...
}

这个 endlessServer 除了继承 http.Server 所有字段以外,因为还需要监听信号以及判断是不是一个新的进程,所以添加了几个状态位的字段:

  • wg:标记还有多少客户端请求没有完成;
  • sigChan:用于接收信号的管道;
  • isChild:用于重启时标志本进程是否是为一个新进程;
  • state:当前进程的状态。

下面我们看看如何初始化 endlessServer :

func NewServer(addr string, handler http.Handler) (srv *endlessServer) {
	runningServerReg.Lock()
	defer runningServerReg.Unlock()
 	
    socketOrder = os.Getenv("ENDLESS_SOCKET_ORDER")
	// 根据环境变量判断是不是子进程
	isChild = os.Getenv("ENDLESS_CONTINUE") != "" 
    // 由于支持多 server,所以这里需要设置一下 server 的顺序
    if len(socketOrder) > 0 {
		for i, addr := range strings.Split(socketOrder, ",") {
			socketPtrOffsetMap[addr] = uint(i)
		}
	} else {
		socketPtrOffsetMap[addr] = uint(len(runningServersOrder))
	}

	srv = &endlessServer{
		wg:      sync.WaitGroup{},
		sigChan: make(chan os.Signal),
		isChild: isChild,
		...
		state: STATE_INIT,
		lock:  &sync.RWMutex{},
	}

	srv.Server.Addr = addr
	srv.Server.ReadTimeout = DefaultReadTimeOut
	srv.Server.WriteTimeout = DefaultWriteTimeOut
	srv.Server.MaxHeaderBytes = DefaultMaxHeaderBytes
	srv.Server.Handler = handler
 
	runningServers[addr] = srv
	...
	return
}

这里初始化都是我们在 net/http 里面看到的一些常见的参数,包括 ReadTimeout 读取超时时间、WriteTimeout 写入超时时间、Handler 请求处理器等,不熟悉的可以看一下这篇:《 一文说透 Go 语言 HTTP 标准库 https://www.luozhiyun.com/archives/561 》。

需要注意的是,这里是通过 ENDLESS_CONTINUE 环境变量来判断是否是个子进程,这个环境变量会在 fork 子进程的时候写入。因为 endless 是支持多 server 的,所以需要用 ENDLESS_SOCKET_ORDER变量来判断一下 server 的顺序。

ListenAndServe

func (srv *endlessServer) ListenAndServe() (err error) {
	addr := srv.Addr
	if addr == "" {
		addr = ":http"
	}
	// 异步处理信号量
	go srv.handleSignals()
	// 获取端口监听
	l, err := srv.getListener(addr)
	if err != nil {
		log.Println(err)
		return
	}
	// 将监听转为 endlessListener
	srv.EndlessListener = newEndlessListener(l, srv)

	// 如果是子进程,那么发送 SIGTERM 信号给父进程
	if srv.isChild {
		syscall.Kill(syscall.Getppid(), syscall.SIGTERM)
	}

	srv.BeforeBegin(srv.Addr)
	// 响应Listener监听,执行对应请求逻辑
	return srv.Serve()
}

这个方法其实和 net/http 库是比较像的,首先获取端口监听,然后调用 Serve 处理请求发送过来的数据,大家可以打开文章《 一文说透 Go 语言 HTTP 标准库 https://www.luozhiyun.com/archives/561 》对比一下和 endless 的异同。

但是还是有几点不一样的,endless 为了做到平滑重启需要用到信号监听处理,并且在 getListener 的时候也不一样,如果是子进程需要继承到父进程的 listen fd,这样才能做到不关闭监听的端口。

handleSignals 信号处理

graceful_restart4

信号处理主要是信号的一个监听,然后根据不同的信号循环处理。

func (srv *endlessServer) handleSignals() {
	var sig os.Signal
	// 注册信号监听
	signal.Notify(
		srv.sigChan,
		hookableSignals...,
	)
	// 获取pid
	pid := syscall.Getpid()
	for {
		sig = <-srv.sigChan
		// 在处理信号之前触发hook
		srv.signalHooks(PRE_SIGNAL, sig)
		switch sig {
		// 接收到平滑重启信号
		case syscall.SIGHUP:
			log.Println(pid, "Received SIGHUP. forking.")
			err := srv.fork()
			if err != nil {
				log.Println("Fork err:", err)
			} 
		// 停机信号
		case syscall.SIGINT:
			log.Println(pid, "Received SIGINT.")
			srv.shutdown()
		// 停机信号
		case syscall.SIGTERM:
			log.Println(pid, "Received SIGTERM.")
			srv.shutdown()
		...
		// 在处理信号之后触发hook
		srv.signalHooks(POST_SIGNAL, sig)
	}
}

这一部分的代码十分简洁,当我们用kill -1 $pid 的时候这里 srv.sigChan 就会接收到相应的信号,并进入到 case syscall.SIGHUP 这块逻辑代码中。

需要注意的是,在上面的 ListenAndServe 方法中子进程会像父进程发送 syscall.SIGTERM 信号也会在这里被处理,执行的是 shutdown 停机逻辑。

在进入到 case syscall.SIGHUP 这块逻辑代码之后会调用 fork 函数,下面我们再来看看 fork 逻辑:

func (srv *endlessServer) fork() (err error) {
	runningServerReg.Lock()
	defer runningServerReg.Unlock()

	// 校验是否已经fork过
	if runningServersForked {
		return errors.New("Another process already forked. Ignoring this one.")
	} 
	runningServersForked = true

	var files = make([]*os.File, len(runningServers))
	var orderArgs = make([]string, len(runningServers))
	// 因为有多 server 的情况,所以获取所有 listen fd
	for _, srvPtr := range runningServers { 
		switch srvPtr.EndlessListener.(type) {
		case *endlessListener: 
			files[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.EndlessListener.(*endlessListener).File()
		default: 
			files[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.tlsInnerListener.File()
		}
		orderArgs[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.Server.Addr
	}
	// 环境变量
	env := append(
		os.Environ(),
    // 启动endless 的时候,会根据这个参数来判断是否是子进程
		"ENDLESS_CONTINUE=1",
	)
	if len(runningServers) > 1 {
		env = append(env, fmt.Sprintf(`ENDLESS_SOCKET_ORDER=%s`, strings.Join(orderArgs, ",")))
	}

	// 程序运行路径
	path := os.Args[0]
	var args []string
	// 参数
	if len(os.Args) > 1 {
		args = os.Args[1:]
	}

	cmd := exec.Command(path, args...)
	// 标准输出
	cmd.Stdout = os.Stdout
	// 错误
	cmd.Stderr = os.Stderr
	cmd.ExtraFiles = files
	cmd.Env = env  
	err = cmd.Start()
	if err != nil {
		log.Fatalf("Restart: Failed to launch, error: %v", err)
	} 
	return
}

fork 这块代码首先会根据 server 来获取不同的 listen fd 然后封装到 files 列表中,然后在调用 cmd 的时候将文件描述符传入到 ExtraFiles 参数中,这样子进程就可以无缝托管到父进程监听的端口。

需要注意的是,env 参数列表中有一个 ENDLESS_CONTINUE 参数,这个参数会在 endless 启动的时候做校验:

func NewServer(addr string, handler http.Handler) (srv *endlessServer) {
	runningServerReg.Lock()
	defer runningServerReg.Unlock()

	socketOrder = os.Getenv("ENDLESS_SOCKET_ORDER")
	isChild = os.Getenv("ENDLESS_CONTINUE") != ""
  ...
}

下面我们再看看 接收到 SIGTERM 信号后,shutdown 会怎么做:

func (srv *endlessServer) shutdown() {
	if srv.getState() != STATE_RUNNING {
		return
	}

	srv.setState(STATE_SHUTTING_DOWN)
    // 默认 DefaultHammerTime 为 60秒
	if DefaultHammerTime >= 0 {
		go srv.hammerTime(DefaultHammerTime)
	}
	// 关闭存活的连接
	srv.SetKeepAlivesEnabled(false)
	err := srv.EndlessListener.Close()
	if err != nil {
		log.Println(syscall.Getpid(), "Listener.Close() error:", err)
	} else {
		log.Println(syscall.Getpid(), srv.EndlessListener.Addr(), "Listener closed.")
	}
}

shutdown 这里会先将连接关闭,因为这个时候子进程已经启动了,所以不再处理请求,需要把端口的监听关了。这里还会异步调用 srv.hammerTime 方法等待60秒把父进程的请求处理完毕才关闭父进程。

getListener 获取端口监听

func (srv *endlessServer) getListener(laddr string) (l net.Listener, err error) {
	// 如果是子进程
	if srv.isChild {
		var ptrOffset uint = 0
		runningServerReg.RLock()
		defer runningServerReg.RUnlock()
		// 这里还是处理多个 server 的情况
		if len(socketPtrOffsetMap) > 0 {
			// 根据server 的顺序来获取 listen fd 的序号
			ptrOffset = socketPtrOffsetMap[laddr] 
		}
		// fd 0,1,2是预留给 标准输入、输出和错误的,所以从3开始
		f := os.NewFile(uintptr(3+ptrOffset), "")
		l, err = net.FileListener(f)
		if err != nil {
			err = fmt.Errorf("net.FileListener error: %v", err)
			return
		}
	} else {
		// 父进程 直接返回 listener
		l, err = net.Listen("tcp", laddr)
		if err != nil {
			err = fmt.Errorf("net.Listen error: %v", err)
			return
		}
	}
	return
}

这里如果是父进程没什么好说的,直接创建一个端口监听并返回就好了。

但是对于子进程来说是有一些绕,首先说一下 os.NewFile 的参数为什么要从3开始。因为子进程在继承父进程的 fd 的时候0,1,2是预留给 标准输入、输出和错误的,所以父进程给的第一个fd在子进程里顺序排就是从3开始了,又因为 fork 的时候cmd.ExtraFiles 参数传入的是一个 files,如果有多个 server 那么会依次从3开始递增。

如下图,前三个 fd 是预留给 标准输入、输出和错误的,fd 3 是根据传入 ExtraFiles 的数组列表依次递增的。

graceful_restart3

其实这里我们也可以用开头的例子做一下试验:

# 第一次构建项目
go build main.go
# 运行项目,这时就可以做内容修改了
./endless &
# 这个时候我们看看父进程打开的文件
lsof  -P -p 17116
COMMAND   PID USER   FD      TYPE  DEVICE SIZE/OFF     NODE NAME
...
main    18942 root    0u      CHR   136,2      0t0        5 /dev/pts/2
main    18942 root    1u      CHR   136,2      0t0        5 /dev/pts/2
main    18942 root    2u      CHR   136,2      0t0        5 /dev/pts/2
main    18942 root    3u     IPv4 2223979      0t0      TCP localhost:5003 (LISTEN)
# 请求项目,60s后返回
curl "http://127.0.0.1:5003/sleep?duration=60s" & 
# 重启,17116为父进程pid
kill -1 17116
# 然后我们看一下 main 程序的进程应该有两个
ps -ef |grep ./main
root      17116  80539  0 04:19 pts/2    00:00:00 ./main
root      18110  17116  0 04:21 pts/2    00:00:00 ./main
# 可以看到子进程pid 为18110,我们看看该进程打开的文件
lsof  -P -p 18110
COMMAND   PID USER   FD      TYPE  DEVICE SIZE/OFF     NODE NAME
...
main    19073 root    0r      CHR     1,3      0t0     1028 /dev/null
main    19073 root    1u      CHR   136,2      0t0        5 /dev/pts/2
main    19073 root    2u      CHR   136,2      0t0        5 /dev/pts/2
main    19073 root    3u     IPv4 2223979      0t0      TCP localhost:5003 (LISTEN)
main    19073 root    4u     IPv4 2223979      0t0      TCP localhost:5003 (LISTEN)
# 新API请求
curl "http://127.0.0.1:5003/sleep?duration=1s" 

总结

通过上面的介绍,我们通过 endless 学习了在 Go 服务中如何做到不停机也可以重启服务,相信这个功能在很多场景下都会用到,没用到的同学也可以尝试在自己的系统上玩一下。

热重启总的来说它允许服务重启期间,不中断已经建立的连接,老服务进程不再接受新连接请求,新连接请求将在新服务进程中受理。对于原服务进程中已经建立的连接,也可以将其设为读关闭,等待平滑处理完连接上的请求及连接空闲后再行退出。

通过这种方式,可以保证已建立的连接不中断,新的服务进程也可以正常接受连接请求。

Reference

https://goteleport.com/blog/golang-ssh-bastion-graceful-restarts/

https://grisha.org/blog/2014/06/03/graceful-restart-in-golang/

https://stackoverflow.com/questions/28370646/how-do-i-fork-a-go-process/28371586#28371586

https://xixiliguo.github.io/post/golang-exec/

https://github.com/fvbock/endless

https://golang.org/pkg/os/signal/

https://stackoverflow.com/questions/11635219/dup2-dup-why-would-i-need-to-duplicate-a-file-descriptor

http://www.hitzhangjie.pro/blog/2020-08-28-go程序如何实现热重启/

扫码_搜索联合传播样式-白色版 1

这篇关于endless 如何实现不停机重启 Go 程序?的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!