Golang的CSP并发模型


Go实现了两种并发形式,第一种是大家普遍认知的多线程共享内存,其实就是 Java或 C++等语言中的多线程开发;另外一种是Go语言特有的,也是Go语言推荐的 CSP(communicating sequential processes)并发模型。 CSP 并发模型是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享 channel(管道)进行通信的并发模型。Go语言就是借用 CSP 并发模型的一些概念为之实现并发的,但是Go语言并没有完全实现了 CSP 并发模型的所有理论,仅仅是实现了 process 和 channel 这两个概念。 process 就是Go语言中的 goroutine,每个 goroutine 之间是通过 channel 通讯来实现数据共享。

Go的CSP并发模型

CSP并发模型不同于传统的多线程通过共享内存来通信,CSP讲究的是“以通信的方式来共享内存”。请记住下面这句话:

Do not communicate by sharing memory; instead, share memory by communicating.
“不要以共享内存的方式来通信,相反,要通过通信来共享内存。”

普通的线程并发模型,就是像Java、C++、或者Python,他们线程间通信都是通过共享内存的方式来进行的。非常典型的方式就是,在访问共享数据(例如数组、Map、或者某个结构体或对象)的时候,通过锁来访问,因此,在很多时候,衍生出一种方便操作的数据结构,叫做“线程安全的数据结构”。例如Java提供的包”java.util.concurrent”中的数据结构。Go中也实现了传统的线程并发模型,但为了更好地编写并发程序,从设计之初Go语言就注重如何在编程语言层级上设计一个简洁安全高效的抽象模型,让开发人员专注于分解问题和组合方案,而且不用被线程管理和信号互斥这些烦琐的操作分散精力。因此CSP并发模型诞生了。Go的CSP并发模型,是通过goroutinechannel来实现的。

Channel

Golang中使用 CSP中 channel 这个概念。channel 是被单独创建并且可以在进程之间传递,它的通信模式类似于 boss-worker 模式的,一个实体通过将消息发送到channel 中,然后又监听这个 channel 的实体处理,两个实体之间是匿名的,这个就实现实体中间的解耦,其中 channel 是同步的一个消息被发送到 channel 中,最终是一定要被另外的实体消费掉的,在实现原理上其实是一个阻塞的消息队列。也是各个goroutine之间通信的”管道“,有点类似于Linux中的管道。

Goroutine

goroutine 是实际并发执行的实体,它底层是使用协程(coroutine)实现并发,coroutine是一种运行在用户态的用户线程,类似于 greenthread,go底层选择使用coroutine的出发点是因为,它具有以下特点:

  • 用户空间 避免了内核态和用户态的切换导致的成本
  • 可以由语言和框架层进行调度
  • 更小的栈空间允许创建大量的实例

可以看到第二条 用户空间线程的调度不是由操作系统来完成的,像在java 1.3中使用的greenthread的是由JVM统一调度的(后java已经改为内核线程),还有在ruby中的fiber(半协程) 是需要在重新中自己进行调度的,而goroutine是在golang层面提供了调度器,并且对网络IO库进行了封装,屏蔽了复杂的细节,对外提供统一的语法关键字支持,简化了并发程序编写的成本。

简单使用 - 生产者消费者模型

Go语言提倡使用通信的方法代替共享内存,当一个资源需要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制。声明通道时,需要指定将要被共享的数据的类型。可以通过通道共享内置类型、命名类型、结构类型和引用类型的值或者指针。由于没有使用共享资源无需加锁,所以go语言可以轻松创建几十万并发,下面是一个使用Golang协程和使用Java共享内存方式,实现生产者消费者的例子

Golang实现


package main

import (
    "fmt"
    "math/rand"
)

func Consumer(ch <-chan int, result chan<- int) {
    sum := 0
    for i := 0; i < 5; i++ {
        sum += <-ch
    }

    result <- sum
}

func Producer(ch chan<- int) {
    var num int
    for i := 0; i < 5; i++ {
        rand.Seed(20)
        num = rand.Intn(100)
        ch <- num
    }
}

func main() {
    ch := make(chan int)
    result := make(chan int)
    go Producer(ch)
    go Consumer(ch, result)

    fmt.Printf("result: %d\n", <-result)
}

在生产者和消费者之间使用ch通道传递数据,使用reslut通道给主函数返回结果。注意观察Consumer函数和Producer函数的参数列表,这里通道参数的传递略有不同,指明了通道的方向,chan <- 代表我们可以向通道写数据,但是不能使用通道读数据,<- chan 正好相反,只能从通道中读取数据而不可以写入数据。可以看到,go语言实现并发非常简单,借用通道,可以在不同的协程之间方便的传输数据。

Java实现

/*
使用阻塞队列实现生产者消费者模型
 */

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;


//资源类
class ShareData{
    private static final int MAX_CAPACITY = 10; //阻塞队列容量
    private static BlockingQueue<Integer> blockingQueue= new ArrayBlockingQueue<>(MAX_CAPACITY); //阻塞队列
    private  volatile boolean FLAG = true;
    private  AtomicInteger atomicInteger = new AtomicInteger();

    public void produce() throws InterruptedException {
        while (FLAG){
            boolean retvalue = blockingQueue.offer(atomicInteger.incrementAndGet(), 2, TimeUnit.SECONDS);
            if (retvalue==true){
                System.out.println(Thread.currentThread().getName()+"\t 插入队列"+ atomicInteger.get()+"成功"+"资源队列大小= " + blockingQueue.size());
            }else {
                System.out.println(Thread.currentThread().getName()+"\t 插入队列"+ atomicInteger.get()+"失败"+"资源队列大小= " + blockingQueue.size());

            }
          TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName()+"FLAG变为flase,生产停止");
    }

    public void consume() throws InterruptedException {
        Integer result = null;
        while (true){
            result = blockingQueue.poll(2, TimeUnit.SECONDS);
            if (null==result){
                System.out.println("超过两秒没有取道数据,消费者即将退出");
                return;
            }
            System.out.println(Thread.currentThread().getName()+"\t 消费"+ result+"成功"+"\t\t"+"资源队列大小= " + blockingQueue.size());
            Thread.sleep(1500);
        }

    }

    public void stop() {
        this.FLAG = false;
    }
}

public class ProducerConsumer {

    public static void main(String[] args) {
        ShareData v3 = new ShareData();
        new Thread(()->{
            try {
                v3.produce();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "AAA").start();

        new Thread(()->{
            try {
                v3.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "BBB").start();

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        v3.stop();
    }
}

通过对比可以看出来实现生产者消费者模型,Go语言语法上也更加清爽简洁


Author: 顺坚
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint polocy. If reproduced, please indicate source 顺坚 !
评论
 Previous
Java和Golang的线程模型 Java和Golang的线程模型
最近再去看Golang的G-M-P线程模型时发现自己以前理解的不够清楚明白,于是再去仔细拜读了一下Golang线程模型相关的书籍,同时对比着Java的线程模型做了一下梳理,在此记录一下心得。要理解Golang的线程模型必须得从操作系统的线程
2021-03-09
Next 
gRpc框架入门 gRpc框架入门
在微服务这个时代,不论是传输还是内网调用,以及跨语言的传输,RPC都是不二的选择。说到RPC(Remote Process Communication,远程过程调用)就不得不说到进程间通信(Inter-process Communicati
2021-02-17
  TOC