把任务队列 delayed 移植到 Go 了

标签:Go

4 年前我用 Python 写了个叫 delayed 的任务队列,经过几年发展后,公司有了 Go 和 Python 相互调用的需求。
目前的 delayed 实现是用 pickle 来做序列化的,其实之前也写过用 JSON 来做序列化的版本,但是遇到了几个问题:
  1. JSON 会丢失一些对象类型,例如无法区分 tuple 和 list。
  2. JSON 无法直接编码二进制字符串(bytes)。
  3. JSON 不支持原本 pickle 能支持的很多类型。
当时因为这些问题我就放弃了,但是现在看来,这些问题有的可以解决,有的可以容忍,所以还是决定开发 Go 的版本。

先看看序列化方案,主要从兼容性、易用性、速度和空间占用的角度来考虑。参考 go_serialization_benchmarks,性能最好的都是生成代码的。
但是生成代码意味着需要定义 schema,然后执行 go generate。但 Go 的 struct 实际也协商了 schema,有点重复工作的意思,而且这增加了开发的成本,有可能修改参数类型后忘记改 schema 和重新生成。
从调用方的角度来看,肯定是希望定义一个函数,然后直接就能用它比较好,而不是每定义一个函数,还需要维护一份 schema,并且记得生成代码。程序员的惰性决定了,一旦这件事是有成本的,就会尽量避免去做。因此我决定不采用生成代码的方案。
在不生成代码的方案中,MessagePack 是比较好的选择,比 JSON 短,可以传输二进制数据,且 Redis 的 Lua 脚本也内置了 cmsgpack。这样如果后续有什么特殊的需求,也可以直接用 Lua 脚本来处理任务,无需从 Redis 中取出,编辑完再塞回去。
经测试,shamaton/msgpackmsgpack-python 是其中性能较好的库。
而序列化 struct[]interface{} 的性能差不多,但前者的空间占用更多,主要是会当成 map 处理,字段名会占用空间,而后者则可能丢失类型(例如 int 序列化再反序列化后可能变成 uint8),导致接口参数类型不一致。

再看看函数绑定方案。
Go 不像 Python 那么动态,不能从路径字符串生成函数的引用,因此需要一个主动的注册过程。一种方案是像 HTTP 服务一样自定义路径,并与函数绑定;另一种是用 runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name() 来获取函数路径。后者更方便些。

接着是函数调用方案,Go 的函数调用接口有主要三种方案:
  1. type Handler func(arg interface{}):这种方式会丢失参数类型,MessagePack 在反序列化时不知道真实的类型,只能转成 map[string]interface{},较难使用。
  2. type Handler func(ctx Context):需要把参数单独序列化,再调用 ctx 的各种 Bind() 接口,把参数转换成真正的函数参数类型,较为麻烦,类似大部分 HTTP 框架的 handler 实现。
  3. 任意 func 类型,使用反射来调用:
    func test(a string, b int) int {
            return len(a) * b
    }
    
    func main() {
            f := reflect.ValueOf(test)
            a := reflect.ValueOf("abc")
            b := reflect.ValueOf(5)
            fmt.Println(f.Call([]reflect.Value{a, b})[0].Int())
    }
    这种方案可以适配任意函数,非常易用。但性能较差,且因为 MessagePack 在序列化时可能丢失类型,参数 b 在反序列化时可能因为节省空间变成了 uint8(5),导致调用时 panic。
    我在看 rpcx 的实现时,发现它为了解决这个问题,同时存储了参数的类型,然后用反射生成对应类型的值:
    b := 5
    bt := reflect.TypeOf(b)
    b2 := reflect.New(bt)
    bp := b2.Interface().(*int)
    经测试,原生函数调用大概 0.25ns,加了类型断言大概 8ns,反射调用的性能则与参数和返回值的数量相关(上述函数约 280ns,但去掉返回值后降为 180ns,且减少了一次内存分配)。
    但是这种方案仍然不适合使用列表来存储参数,因为 []interface{}[]reflect.Value 是没有元素类型的,需要针对每个参数调用一次反序列化,实现非常低效。
    为了支持任意数目的参数,我又翻了翻 shamaton/msgpack 的代码,发现了一对隐藏的接口:msgpack.MarshalAsArray()msgpack.UnmarshalAsArray()。一般在序列化 struct 时,会当成 map 来处理;而这对方法则是当成数组来处理,即第 0 个字段作为第 0 个元素,以此类推。所以我可以动态地构建一个 struct,然后用 msgpack.UnmarshalAsArray() 反序列化成对应的 struct,此时 struct 的字段类型和函数接口是符合的,再构建对应的 []reflect.Value 作为参数即可:
    func x(a, b uint8) uint8 {
            return a + b
    }
    
    func main() {
            f := reflect.TypeOf(x)
            a0 := f.In(0)
            a1 := f.In(1)
            fields := []reflect.StructField{{
                    Name: "F0",  // 字段名必须是大写(即可导出的)才能被 msgpack.Marshal() 序列化
                    Type: a0,
            }, {
                    Name: "F1",
                    Type: a1,
            }}
            st := reflect.StructOf(fields)
            d, _ := msgpack.Marshal([]interface{}{2, 3})
            s := reflect.New(st)
            msgpack.UnmarshalAsArray(d, s.Interface())
            args := []reflect.Value{s.Elem().Field(0), s.Elem().Field(1)}
            fmt.Println(reflect.ValueOf(x).Call(args)[0])
    }
    这个实现因为增加了太多的反射,性能显而易见地慢。
    此外还需要解决可变参数的问题,reflect 库并没有提供检测的方法,...int[]int 类型的参数都会被判定为 []int,但是函数类型的字符串表示中会包含 ...,因此可以用 strings.Contains(f.String(), "...") 来判断。当判定为可变参数函数后,调用方法从 Call() 改成 CallSlice(),可变参数作为最后一个 reflect.Value 类型的参数传入即可。
考虑到任务处理时间 + Redis 调用时间至少是毫秒级的,即使采用反射来调用函数,也仅增加了万分之一的时间,是可以接受的。为了增加调用者的易用性,这里决定采用第三种实现方案。

然后看看子进程方案。
在 Python 中可以简单地使用 os.fork() 来调用 C 语言的 fork() 函数,而 Go 并没有直接提供这样的接口,无论是 os/exec.Cmd() 还是 syscall.ForkExec(),都是封装成了 fork - exec 的形式,不符合 delayed 的需求。
而 delayed 之所以需要 fork(),原因是可以控制进程的执行时间,在超时后可以强制干掉,而 goroutine 是不能在调用端结束的,需要在协程内主动退出。
于是我又找了一番,经测试发现可以这样实现:
func main() {
	r1, r2, err := syscall.Syscall(syscall.SYS_FORK, 0, 0, 0)
	// r1: 子进程 pid
	// r2:0 是父进程,1 是子进程
	if err != 0 {
		fmt.Println("Failed to fork:", err)
		return
	}
	fmt.Println(r1, r2)
}
需要注意的是,多线程程序使用 fork() 时,只有当前线程会被克隆,而 Go 的 runtime 和第三方库都可能有后台线程,这可能导致奇怪的问题。
另外还发现个 bug,子进程调用 os.Getpid() 会返回原进程(即父进程)的 pid,但 os.Getppid() 则是正常的。(某大佬回复说可能是因为缓存,避免每次都执行系统调用,而 syscall.Syscall(syscall.SYS_GETPID, 0, 0, 0) 是正常的。)

创建完子进程,还需要等待子进程退出,Go 提供了阻塞的方案:
proc, err := os.FindProcess(pid)
if err != nil {
        panic(err)
}

state, err := proc.Wait()
if err != nil {
        panic(err)
}
然而 Wait() 方法并不接受 WNOHANG 参数,这使得无法同时监听子进程退出和等待超时。
另一个可行方案是监听 SIGCHLD 信号:
func main() {
        rand.Seed(time.Now().UnixNano())
        r1, r2, _ := syscall.Syscall(syscall.SYS_FORK, 0, 0, 0)
        if r2 == 1 {
                fmt.Println("sleep")
                r := rand.Intn(1000)
                time.Sleep(time.Millisecond * time.Duration(r))
                fmt.Println("wake up")
                if r < 500 {
                        os.Exit(1)
                }
        } else {
                pid := int(r1)
                sigChan := make(chan os.Signal, 1)
                signal.Notify(sigChan, syscall.SIGCHLD)
                timer := time.NewTimer(time.Millisecond * time.Duration(rand.Intn(1000)))
                for {
                        select {
                        case <-sigChan:
                                proc, err := os.FindProcess(pid)
                                if err != nil {
                                        fmt.Println("Failed to find process:", err)
                                        return
                                }

                                err = proc.Signal(syscall.Signal(0))
                                if err != nil {
                                        fmt.Println("Failed to signal:", err)
                                        return
                                }

                                state, err := proc.Wait() // 结束僵尸进程
                                if err != nil {
                                        fmt.Println("Failed to wait:", err)
                                        return
                                }
                                fmt.Println(state)
                                return
                        case <-timer.C:
                                fmt.Println("timeout")
                                proc, err := os.FindProcess(pid)
                                if err != nil {
                                        fmt.Println("Failed to find process:", err)
                                        return
                                }

                                err = proc.Kill()
                                if err != nil {
                                        fmt.Println("Failed to signal:", err)
                                        return
                                }

                                proc.Release()
                        }
                }
        }
}
这个方案的 bug 是外部进程可以对它发出 SIGCHLD 信号,此时没有办法判断子进程是否真的已经退出,只能阻塞在 Wait()

最终我发现 Go 提供了 syscall.Wait4(),相当于 waitpid() 的超集:
for {
        select {
        case <-sigChan:
                var status syscall.WaitStatus
                p, err := syscall.Wait4(pid, &status, syscall.WNOHANG, nil)
                if err != nil {
                        fmt.Println("Failed to wait4:", err)
                        return
                }

                if p == pid && (status.Exited() || status.Signaled()) {
                        // p 为 0 时表示没有子进程退出
                        // 子进程退出时,status.Exited() 为 true,status.ExitStatus() 为返回值,被信号退出时不会返回 true
                        // status.Signaled() 表示是否是信号导致子进程退出,此时 status.Signal() 为接收到的信号
                        return
                }
        case <-timer.C:
                fmt.Println("timeout")
                syscall.Kill(pid, syscall.SIGKILL)
        }
}
注意只能用 SIGKILL 信号,Go 会在后台线程里处理 SIGTERMSIGQUIT 等信号,但是 fork() 时并不会复制后台线程,导致收到信号时崩溃。

我们还需要在管理进程和执行进程间传递任务和执行结果,这种进程间通信使用 pipe 是比较通用的做法。Go 提供了 os.Pipe()io.Pipe() 两种方式创建 pipe,其中后者的内部实现使用了 channel,但是并没有暴露出来,因此都只有同步的 Read()Write() 接口。
为了与上述的 select 组合使用,必须起一个新的协程,把读出来的数据写到一个 channel 里,让 select 去选择,这种实现有点丑。

综上所述,想把 Python 模型的子进程版本移植到 Go 充满了异味,且引入了太多复杂性和不稳定性(第三方库可能起了后台线程导致 fork() 后崩溃),而子进程唯一的作用是任务执行超时可以被父进程干掉。
因此现阶段先不实现该模型,有需要时也可以用一个外部进程来检测日志,或用 Redis 来存储任务的取出时间,分析是否需要干掉。

既然去掉了超时自动干掉的功能,那么任务的超时时间也需要去掉,但是因为 sweeper 依赖它来确定任务是否需要被重新放回队列,因此无法简单地删除这个字段。
其实 sweeper 只需要在回收任务时检查被取出的任务当前是否有 worker 在执行,没有就把它放回去即可。
因此需要维护一个正在执行的任务表,先让 worker 在启动时随机生成一个字符串作为 worker_id,然后在运行时起一个后台线程,每隔一段时间(假设 15 秒)执行 SETEX worker_id 60 1
假设队列名是 default,使用如下的几个 key:
  • default:list 类型,存储任务列表
  • default_noti:list 类型,用于通知有新任务
  • default_processing:hash 类型,field 为 worker_id,value 为正在处理的任务
在添加任务时,同时往 defaultdefault_noti 插入数据。
在拉取任务时,用 BLPOP 来监听 default_noti,一旦获取到数据,就执行这段 Lua 脚本:从 defaultLPOP 一个任务,然后 HSET default_processing worker_id task_data
在回收任务时,sweeper 遍历 default_processing 中的数据,然后检查该 worker 是否仍然存活(GET worker_id),不存活时就放回任务队列。
这里用两个 list 来处理的原因是避免 BLPOP 了任务后,进程挂了导致任务丢失。因为使用了 Lua 脚本,任务数据肯定在 defaultdefault_processing 中,顶多只会丢掉一个通知,而 sweeper 是可以把通知补上的。
这样优化后,去掉了之前使用的 sorted set,插入和取出任务的速度从 O(log(N)) 变成了 O(1),但是与 Python 版不兼容,因此需要写两套 Lua 脚本,或者把 Python 版也改成同样的实现。既然是个新版本,这里选择修改 Python 版本的实现。

主要的问题解决后,再来看看实现时的一些优化。

对于任务类型,我希望它的字段是私有的,这样就不需要考虑序列化和反序列化时字段被外部修改过,但 shamaton/msgpack 在序列化 struct 时,又要求字段是可导出的才能被序列化。为此我嵌套了一个 struct 来实现:
type RawGoTask struct {
	FuncPath string
	Payload  []byte // serialized arg
}

type GoTask struct {
	raw  RawGoTask // make it unexported but can be serialized by MessagePack
	arg  interface{}
	data []byte // serialized data
}

而在序列化任务时,需要序列化两次,先把 GoTask.arg 序列化成 GoTask.raw.Payload,再把 GoTask.raw 序列化成 GoTask.data
func (t *GoTask) Serialize() (data []byte, err error) {
	if len(t.data) != 0 {
		return t.data, nil
	}

	if t.arg != nil {
		t.raw.Payload, err = msgpack.MarshalAsArray(t.arg)
		if err != nil {
			log.Errorf("Failed to serialize task.arg: %v", err)
			return
		}
	}

	t.data, err = msgpack.MarshalAsArray(&t.raw)
	if err != nil {
		log.Errorf("Failed to serialize task.data: %v", err)
		return
	}
	return t.data, nil
}

func DeserializeGoTask(data []byte) (task *GoTask, err error) {
	t := &GoTask{
		data: data,
	}
	err = msgpack.UnmarshalAsArray(data, &t.raw)
	if err != nil {
		log.Errorf("Failed to deserialize task: %v", err)
		return
	}
	return t, nil
}
这里的原因是我希望任务的参数 GoTask.arg 可以接受任意类型,因此类型需要定义成 interface{}
但是如何能让它被反序列化成符合函数接口的类型呢?
如果我将 GoTask.raw.FuncPathGoTask.arg 一次性序列化到 GoTask.data,那么在反序列化时,我无法知道 GoTask.arg 的实际类型;而分开序列化后,我在第一次反序列化时可以拿到 GoTask.raw.FuncPath,就可以找到对应的函数,并拿到它的参数类型,然后再根据这个类型来反序列化 GoTask.arg

而在封装处理函数时,还需要解决前面提到的慢的问题,我的做法是提前构造:
type Handler struct {
	fn         reflect.Value
	path       string
	argCount   int
	arg        interface{}
	args       []reflect.Value
	isVariadic bool
}

func (h *Handler) Call(payload []byte) (result []reflect.Value, err error) {
	if h.argCount > 0 && len(payload) > 0 {
		err := msgpack.UnmarshalAsArray(payload, h.arg)
		if err != nil {
			log.Errorf("Failed to unmarshal payload: %v", err)
			return nil, err
		}
	}
	if h.isVariadic {
		return h.fn.CallSlice(h.args), nil
	}
	return h.fn.Call(h.args), nil
}
可以看到,*Handler.Call() 在执行时,中间用到的所有值都提前分配到 Handler 的 struct 里了,除了 msgpack.UnmarshalAsArray() 以外,不会有内存分配和反射的开销。而我们的执行模式是一次取一个任务,执行完再处理下一个,因此不会遇到有两个任务同时使用同一个 Handler 的并发问题。

那么如何提前构造好这些参数呢?这就需要用到 NewHandler() 了:
func NewHandler(f interface{}) (h *Handler) {
	fn := reflect.ValueOf(f)
	if fn.Kind() != reflect.Func {
		return nil
	}

	path := runtime.FuncForPC(fn.Pointer()).Name()
	if path == "" {
		return nil
	}

	fnType := fn.Type()
	h = &Handler{
		fn:       fn,
		path:     path,
		argCount: fnType.NumIn(),
	}

	// the rest fields can be reused among tasks, because the worker won't handle tasks concurrently
	if h.argCount == 0 {
		h.args = []reflect.Value{}
	} else {
		h.isVariadic = strings.Contains(fnType.String(), "...")
		if h.argCount == 1 {
			argType := fnType.In(0)
			arg := reflect.New(argType)
			h.arg = arg.Interface()
			h.args = []reflect.Value{arg.Elem()}
		} else {
			fields := make([]reflect.StructField, h.argCount)
			for i := 0; i < h.argCount; i++ {
				arg := fnType.In(i)
				fields[i] = reflect.StructField{
					Name: "F" + strconv.Itoa(i),
					Type: arg,
				}
			}
			argType := reflect.StructOf(fields)
			arg := reflect.New(argType)
			argElem := arg.Elem()
			h.arg = arg.Interface()
			h.args = make([]reflect.Value, h.argCount)
			for i := 0; i < h.argCount; i++ {
				h.args[i] = argElem.Field(i)
			}
		}
	}
	return
}
这里有 3 个分支:
  • 对于无参数的函数,构造一个空的 []reflect.Value 作为参数列表即可。
  • 对于 1 个参数的函数,假设参数类型是 intarg 是这个参数类型的指针的反射值,即指向 *intreflect.Valueh.argarg 的实际值,即 *int 类型;arg.Elem()arg 内部的 *int 所指向的 int 的反射值;h.args 则是只包含一个 int 反射值的 []reflect.Value,可以直接作为 h.fn.Call() 的参数。
  • 对于 2 个或更多参数的函数,假设参数类型是 intstring;先构造一个 struct,假设类型名叫 Args,2 个字段分别是 intstring 类型;然后创建一个 *Args 类型的反射值作为 argargElemarg 所指向的 Args 变量的反射值;h.arg*Args 类型;h.argsh.argCount 长度的 []reflect.Value,里面的每个元素都是 argElem 的字段。这样在执行 msgpack.UnmarshalAsArray(payload, h.arg) 时,h.arg 因为是 *Args 类型,可以正确地处理类型;h.args 里的各个元素也指向了 h.arg 的各个字段,所以类型也是正确的。
这里主要利用的点是 reflect.Value 内部是用 unsafe.Pointer 实现的,因此它也相当于是一个指针。当 h.arg 这个指针和 h.args 这个指针数组指向同一个 struct 时,修改前一个的值,后一个的值也会跟着变化,没有额外开销。

其他的部分比较好懂就不解释了,大家可以直接看 go-delayed 源码。

0条评论 你不来一发么↓

    想说点什么呢?