一架梯子,一头程序猿,仰望星空!

tunny Go协程池教程

Tunny是一个用于生成和管理goroutine池的Golang库,允许您使用同步API来限制来自任意数量的goroutine的工作。

Tunny是一个用于生成和管理goroutine池的Golang库,允许您使用同步API来限制来自任意数量的goroutine的工作。

当您的工作来自任意数量的异步源但并行处理能力有限时,固定的goroutine池非常有用。例如,在处理CPU密集型的HTTP请求作业时,您可以创建一个大小与CPU数量相匹配的池。

安装

go get github.com/Jeffail/tunny

或者,使用dep:

dep ensure -add github.com/Jeffail/tunny

使用

对于大多数情况下,您的繁重工作可以用一个简单的func()来表示,在这种情况下您可以使用NewFunc。让我们看看如何使用我们的HTTP请求到CPU计数的例子:

package main

import (
	"io/ioutil"
	"net/http"
	"runtime"

	"github.com/Jeffail/tunny"
)

func main() {
	numCPUs := runtime.NumCPU()

	pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
		var result []byte

		// TODO: 使用payload进行一些CPU密集型操作

		return result
	})
	defer pool.Close()

	http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
		input, err := ioutil.ReadAll(r.Body)
		if err != nil {
			http.Error(w, "Internal error", http.StatusInternalServerError)
		}
		defer r.Body.Close()

		// 将这项工作导入我们的池中。此调用是同步的,并将阻塞直到作业完成。
		result := pool.Process(input)

		w.Write(result.([]byte))
	})

	http.ListenAndServe(":8080", nil)
}

Tunny还支持超时。您可以将上面的Process调用替换为以下代码:

result, err := pool.ProcessTimed(input, time.Second*5)
if err == tunny.ErrJobTimedOut {
	http.Error(w, "Request timed out", http.StatusRequestTimeout)
}

您还可以使用请求的上下文(或任何其他上下文)来处理超时和截止时间。只需将Process调用替换为以下代码:

result, err := pool.ProcessCtx(r.Context(), input)
if err == context.DeadlineExceeded {
	http.Error(w, "Request timed out", http.StatusRequestTimeout)
}

修改池大小

可以使用SetSize(int)在任何时候更改Tunny池的大小。

pool.SetSize(10) // 10个goroutine
pool.SetSize(100) // 100个goroutine

即使其他goroutine仍在处理,这也是安全的。

带状态的Goroutine

有时,在Tunny池中的每个goroutine都需要自己的管理状态。在这种情况下,您应该实现tunny.Worker,其中包括终止、中断(如果一个作业超时并且不再需要)和阻塞下一个作业分配直到满足某个条件的调用。

在使用Worker类型创建池时,您需要提供一个构造函数来生成您的自定义实现:

pool := tunny.New(poolSize, func() Worker {
	// TODO: 在这里进行每个goroutine的状态分配。
	return newCustomWorker()
})

这样,在池大小改变时,Tunny可以清理创建和销毁Worker类型。

排序

积压的作业不能保证按顺序处理。由于当前通道和选择块的实现,积压的作业堆栈将被作为FIFO队列处理。然而,这种行为不是规范的一部分,不应依赖于它。


章节目录