go语言之路

Golang并发编程之并发下载

Golang并发编程之并发下载

参考链接

sync

Go语言提供了sync和channel两种方式支持协程(goroutine)的并发。

例如我们希望并发下载N个资源,多个并发协程之间不需要通信,那么就可以使用sync.WaitGroup

等待所有并发协程执行结束。

package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

func main() {
    for i := 0; i < 10; i++ {
        go wget(i)
    }
    wg.Wait()
}

func wget(url int) {
    wg.Add(1)
    defer wg.Done()
    fmt.Println("正在下载...", url)
    //模拟下载时间 sleep 1秒
    time.Sleep(time.Second)
}
  • wg.Add(1):为 wg 添加一个计数,wg.Done(),减去一个计数。

  • go wget():启动新的协程并发执行 download 函数。

  • wg.Wait():等待所有的协程执行结束。

    image-20210404115636728

串行下载需要10s的时间,而并发后只需要1s。

channel

通过channel发送信号

package main

import (
    "fmt"
    "time"
)

var ch = make(chan int, 1) //创建一个大小为1的缓冲通道

func main(){
    for i := 0; i < 10; i++ {
        go wget(i)
    }

    for i := 0; i < 10; i++ {
        //接收信号
        msg := <- ch
        fmt.Println("Finish", msg)
    }
}

func wget(url int) {
    fmt.Println("正在下载", url)
    time.Sleep(time.Second)
    //发送信号
    ch <- url
}

使用 channel 信道,可以在协程之间传递消息。阻塞等待并发协程返回消息。

image-20210404120907885

context

参考链接

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    for i := 0; i < 10; i++ {
        go wget(ctx, i)
    }
    time.Sleep(1 * time.Second)
    cancel()
}

func wget(ctx context.Context, url int) {
    for {
        select {
        case <-ctx.Done():
            return
        default:
            fmt.Println("正在下载:", url)
            time.Sleep(1 * time.Second)
        }
    }
}

image-20210408161028478

为什么需要context?

WaitGroup 和信道(channel) 是常见的两种并发控制的方式。

如果并发启动了多个子协程,需要等待所有的字协程完成任务,WaitGroup 非常适用于这类场景。

例如并发下载文件,wg.Wait() 会等待所有的子协程任务全部完成,所有子协程结束后,才会执行 wg.Wait() 后面的代码。

WaitGroup 只是傻傻地等待子协程结束,但是并不能主动通知子协程退出。

加入开启了一个定时轮询地子协程,有没有什么办法通知该子协程退出呢?

这种场景下,可以使用 select + chan 的机制。

package main

import (
    "fmt"
    "time"
)

var stop chan bool

func reqTask(name string) {
    for {
        select {
        case <-stop:
            fmt.Println("stop", name)
            return
        default:
            fmt.Println(name, "send request")
            time.Sleep(1 * time.Second)
        }
    }
}

func main() {
    stop = make(chan bool)
    go reqTask("worker1")
    time.Sleep(3 * time.Second)
    stop <- true
    time.Sleep(1 * time.Second)
}

image-20210408155524960

更复杂的场景如何做并发控制呢?比如子协程中开启了新的子协程,或者需要同时控制多个子协程。

这种场景下,select + chan 的方式就显得力不从心了。

Go提供了 Context 标准库可以解决这类场景的问题, Context 的作用和它的名字很像,上下文,即子协程的上下文。

Context 的两个主要的功能:

  • 通知子协程退出(正常退出,超时退出)
  • 传递必要的参数

context.WithCancel

用于创建可取消的 Context 对象,即可以主动通知子协程退出。

控制单个协程

使用 Context 改写上述的例子,效果与 selext + chan 相同。

package main

import (
    "context"
    "fmt"
    "time"
)

func reqTask(ctx context.Context, name string) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("stop", name)
            return
        default:
            fmt.Println(name, "send request")
            time.Sleep(1 * time.Second)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    go reqTask(ctx, "worker1")
    time.Sleep(3 * time.Second)
    cancel()
}
  • context.Background() 创建根Context,通常在main函数,初始化和测试代码中创建,作为顶层Context
  • context.WithCancel(parent) 创建可取消的子Contedt,同时返回函数cancel
  • 在子协程中,使用select调用 <- ctx.Done() 判断是否需要退出
  • 主协程中,调用 cancel() 函数通知子协程退出

退出多个协程

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    go reqTask(ctx, "worker1")
    go reqTask(ctx, "worker2")

    time.Sleep(3 * time.Second)
    cancel()
    time.Sleep(3 * time.Second)
}

为每个子协程传递相同的上下文ctx即可,调用cancel()函数后该Context控制的所有子协程都会退出。

image-20210408155419083

context.WithValue

如果需要往子协程中传递参数,可以使用 context.WithValue()

package main

import (
    "context"
    "fmt"
    "time"
)

type Options struct{ Interval time.Duration }

func reqTask(ctx context.Context, name string) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("stop", name)
            return
        default:
            fmt.Println(name, "send request")
            op := ctx.Value("options").(*Options)
            time.Sleep(op.Interval * time.Second)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    vCtx := context.WithValue(ctx, "options", &Options{1})

    go reqTask(vCtx, "worker1")
    go reqTask(vCtx, "worker2")

    time.Sleep(3 * time.Second)
    cancel()
    time.Sleep(3 * time.Second)
}
  • context.WithValue() 创建了一个基于 ctx 的子 Context,并携带了值 options
  • 在子协程中,使用 ctx.Value("options") 获取到传递的值,读取/修改该值。

context.WithTimeout

如果需要控制子协程的执行时间,可以使用context.WithTimeout创建具有超时机制的Context对象。

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    go reqTask(ctx, "worker1")
    go reqTask(ctx, "worker2")

    time.Sleep(3 * time.Second)
    fmt.Println("before cancel")
    cancel()
    time.Sleep(3 * time.Second)
}

WithTimeout()的使用与 WithCancel() 类似,多了一个参数,用于设置超时时间。执行结果如下:

image-20210408161922123

因为超时时间设置为 2s,但是 main 函数中,3s 后才会调用 cancel(),因此,在调用 cancel() 函数前,子协程因为超时已经退出了。

context.WithDeadline

超时退出可以控制子协程的最长执行时间,那context.WithDeadLine()则可以控制子协程的最迟退出时间。

package main

import (
    "context"
    "fmt"
    "time"
)

func reqTask(ctx context.Context, name string) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("stop", name, ctx.Err())
            return
        default:
            fmt.Println(name, "send request")
            time.Sleep(1 * time.Second)
        }
    }
}

func main() {
    ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second))
    go reqTask(ctx, "worker1")
    go reqTask(ctx, "worker2")

    time.Sleep(3 * time.Second)
    fmt.Println("before cancel")
    cancel()
    time.Sleep(3 * time.Second)
}
  • WithDeadline 用于设置截止时间。在这个例子中,将截止时间设置为1s后,cancel() 函数在 3s 后调用,因此子协程将在调用 cancel() 函数前结束。
  • 在子协程中,可以通过 ctx.Err() 获取到子协程退出的错误原因。

image-20210408162256765

可以看到,子协程 worker1worker2 均是因为截止时间到了而退出。

golang提取json

上一篇

如何给一个开源项目贡献?

下一篇

你也可能喜欢

3 条评论

  1. 可以补充下context,另一个协程间的控制方式

    1. @大g 好的,感谢!

发表评论

您的电子邮件地址不会被公开。 必填项已用 * 标注

提示:点击验证后方可评论!

插入图片

个人微信公众号

we-tuiguang

qq交流群

群号:1046260719

微信扫一扫

微信扫一扫