Golang-并发退出
在工作中可能要起多个 Goroutine 跑任务,有时候遇到问题需要中止所以的 Goroutine 暂停或者退出。
控制并发通常有三种方法
全局共享变量
channel 通信
Context 包
全局共享变量
这种模式最为简单,所有的 Goroutine 轮询变量,来决定要不要继续跑任务。
running := true
f:= func() {
for running {
fmt.Println("sub proc running")
time.Sleep(time.Second)
}
}
go f()
go f()
go f()
time.Sleep(time.Second)
running = false
fmt.Println("set running=false")
time.Sleep(time.Second)
fmt.Println("main proc exit")
channel 通信
一种更通用灵活的实现方式。下面是一种从外部中止所有goroutine的示例
import (
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
func main() {
stop := make(chan bool)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func(stop <-chan bool) {
defer wg.Done()
consumer(stop)
}(stop)
}
waitForSignal()
close(stop)
fmt.Println("stopping all jobs")
wg.Wait()
}
func consumer(stop <-chan bool) {
for {
select {
case <-stop:
fmt.Println("exit sub goroutine")
return
default:
fmt.Println("running ...")
time.Sleep(20*time.Microsecond)
}
}
}
func waitForSignal() {
sigs := make(chan os.Signal)
signal.Notify(sigs, os.Interrupt)
signal.Notify(sigs, syscall.SIGTERM)
<-sigs
}
Context
Context的创建和调用关系是层层递进的,也就是我们通常所说的链式调用,类似数据结构里的树,从根节点开始,每一次调用就衍生一个叶子节点。
type favContextKey string
func main() {
wg := &sync.WaitGroup{}
values := []string{"https://www.baidu.com", "https://www.zhihu.com"}
ctx, cancelFunc := context.WithCancel(context.Background())
for _, value := range values {
wg.Add(1)
subCtx := context.WithValue(ctx, favContextKey("url"), value)
go reqUrl(subCtx, wg)
}
go func() {
time.Sleep(time.Second * 3)
cancelFunc()
}()
wg.Wait()
fmt.Println("exit main goroutine")
}
func reqUrl(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
url, _ := ctx.Value(favContextKey("url")).(string)
for {
select {
case <-ctx.Done():
fmt.Printf("stopping getting url %s\n", url)
return
default:
get, err := http.Get(url)
if get.StatusCode == http.StatusOK && err == nil {
all, _ := ioutil.ReadAll(get.Body)
subCtx := context.WithValue(ctx, favContextKey("resp"),fmt.Sprintf("%s%x",url, md5.Sum(all)))
wg.Add(1)
go showResp(subCtx,wg)
}
get.Body.Close()
time.Sleep(time.Second)
}
}
}
func showResp(ctx context.Context,wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctx.Done():
fmt.Println("stop showing resp")
return
default:
fmt.Println("printing: ", ctx.Value(favContextKey("resp")))
time.Sleep(time.Second)
}
}
}
context 超时退出
func main() {
wg:= &sync.WaitGroup{}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
wg.Add(1)
go cancelByContext(ctx, wg)
wg.Wait()
}
func cancelByContext(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctx.Done():
fmt.Println("cancel goroutine by context:", ctx.Err())
return
default:
if hander(-1) != nil {
return
}
time.Sleep(time.Second)
}
}
}
func hander(x int) error {
fmt.Println("hander ", x)
if x > 0 {
return errors.New("x > 0")
}
return nil
}
panic
func main() {
wg := &sync.WaitGroup{}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
go func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctx.Done():
fmt.Println("ctx done")
return
default:
}
}
}(ctx, wg)
wg.Add(1)
go cancelByPanic(wg)
wg.Wait()
}
func cancelByPanic(wg *sync.WaitGroup) {
defer func() {
if err := recover(); err != nil {
fmt.Println("cancel goroutine by context: ", err)
}
}()
defer wg.Done()
for {
if hander(1) != nil {
return
}
time.Sleep(time.Second)
}
}
func hander(x int) error {
fmt.Println("hander ", x)
if x > 0 {
return errors.New("x > 0")
}
return nil
}
Last updated
Was this helpful?