• 【深入理解Kotlin协程】协程中的Channel和Flow & 协程中的线程安全问题


    热数据通道 Channel

    在这里插入图片描述

    Channel 实际上就是 个并发安全的队列,它可以用来连接协程,实现不同协程的通信,代码如代码清单所示

    suspend fun testChannel() {
       
        val channel = Channel<Int>()  
        var i = 0
        //生产者 发
        val producer = GlobalScope.launch {
       
            while (true) {
       
                delay(1000)
                channel.send(i++)
            }
        }
        //消费者 收
        val consumer = GlobalScope.launch {
       
            while (true) {
       
                val value = channel.receive()
                println("received <<<<<<<<<<<<<<<<<< $value")
            }
        }
        producer.join()
        consumer.join()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    上述代码 构造了两个协程 producer 和 consumer, 没有为它们明确指定调度器,所以它们都是采用默认调度器,在 Java 平台上就是基于线程池实现的 Default。 它们可以运行在不同的线程上,也可以运行在同一个线程上,具体执行流程如图 6-2 所示。

    producer 每隔 1s 向 Channel 发送 1 个数,consumer 一直读取 channel 来获取这个数字并打印,显然发送端比接收端更慢,在没有值可以读到的时候, receive 是挂起的,直到有新元素到达。

    这么看来,receive 一定是一个挂起函数,那么 send 呢?

    你会发现 send 也是挂起函数。发送端为什么会挂起?以我们熟知的 BlockingQueue 为例,当我们往其中添加元素的时候,元素在队列里实际上是占用了空间的,如果这个队列空间不足,那么再往其中添加元素的时候就会出现两种情况:

    • 阻塞.等待队列腾出空间
    • 异常,拒绝添加元素。

    send 也会面临同样的问题, Channel 实际上就是个队列,队列中一定存在缓冲区,那么一旦这个缓冲区满了,并且也一直没有人调用 receive 并取走元素, send 就需要挂起,等待接收者取走数据之后再写入 Channel 。

    Channel缓冲区
    public fun <E> Channel(
        capacity: Int = RENDEZVOUS,
        onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
        onUndeliveredElement: ((E) -> Unit)? = null
    ): Channel<E> =
        when (capacity) {
       
            RENDEZVOUS -> {
       
                if (onBufferOverflow == BufferOverflow.SUSPEND)
                    RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel
                else
                    ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
            }
            CONFLATED -> {
       
                require(onBufferOverflow == BufferOverflow.SUSPEND) {
       
                    "CONFLATED capacity cannot be used with non-default onBufferOverflow"
                }
                ConflatedChannel(onUndeliveredElement)
            }
            UNLIMITED -> LinkedListChannel(onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
            BUFFERED -> ArrayChannel( // uses default capacity with SUSPEND
                if (onBufferOverflow == BufferOverflow.SUSPEND) CHANNEL_DEFAULT_CAPACITY else 1,
                onBufferOverflow, onUndeliveredElement
            )
            else -> {
       
                if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST)
                    ConflatedChannel(onUndeliveredElement) // conflated implementation is more efficient but appears to work in the same way
                else
                    ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement)
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    我们构造 Channel 的时候调用了一个名为 Channel 的函数,虽然两个 “Channel" 起来是 样的,但它却确实不是 Channel 的构造函数。在 Kotlin 中我们经常定义 一个顶级函数来伪装成同名类型的构造器,这本质上就是工厂函数。Channel 函数有一个参数叫 capacity, 该参数用于指定缓冲区的容量,RENDEZVOUS 默认值为 0,RENDEZVOUS 本意就是描述“不见不散"的场景, 如果不调用 receive, send 就会一直挂起等待。如果把上面代码中consumer的channel.receive()注释掉,则producer中send方法第一次调用就会挂起。

    • Channel(Channel.RENDEZVOUS ) 的方式是有人接收才会继续发,边收边发,如果没有接受的,则发送者会挂起等待
    • Channel(Channel.UNLIMITED ) 的方式是发送者发送完毕,就直接返回,不管有没有接受者。
    • Channel(Channel.CONFLATED ) 的方式是不管发送者发了多少个,接受者只能收到最后一个,也是发送完就返回了,不管有没有接受者。
    • Channel(Channel.BUFFERED ) 的方式也是发送者发送完就返回了,不管有没有接受者,可以指定buffer大小。
    • Channel(1) 的方式指定管道的容量大小,如果数据超过容量,发送者就会挂起等待,直到有接受者取走数据,发送者才发送下一批数据
    Channel的迭代

    Channel可以通过迭代器迭代访问:

    GlobalScope.launch {
       
        val iterator = channel.iterator()
        while (iterator.hasNext()) {
        // 挂起点
              println("received <<<<<<<<<<<<<<<<<< ${
         iterator.next()}")
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    其中,iterator.hasNext()挂起函数,在判断是否有下个元素的时候就需要去Channel 中读取元素了,这个写法自然可以简化成 for-in

    GlobalScope.launch {
       
        for (element in channel) {
        
              println("received <<<<<<<<<<<<<<<<<< $element")
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    生产者和消费者协程构造器

    我们可以通过 produce 方法启动一个生产者协程,并返回 ReceiveChannel,其他协程就可以用这个 Channel 来接收数据了。反过来,我们可以用 actor 启动一个消费者协程。

    suspend fun producer() {
       
        val receiveChannel = GlobalScope.produce {
       
            for (i in 0..3) {
       
                send(i)
                println("send --------------> $i")
            }
        }
    
        val consumer = GlobalScope.launch {
       
            for (i in receiveChannel) {
       
                println("received <<<<<<<<<<<<<<<< $i")
            }
        }
    
        consumer.join()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    suspend fun consumer() {
       
        val sendChannel = GlobalScope.actor<Int> {
       
            for (i in this) {
       
                println("received <<<<<<<<<<<<<<<< $i")
            }
        }
    
        val producer = GlobalScope.launch {
       
            for (i in 0..3) {
       
                sendChannel.send(i)
                println("send --------------> $i")
            }
        }
    
        producer.join()
    } 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    使用这两种构造器也可以指定Channel对应的缓冲区类型,如:

    val receiveChannel = GlobalScope.produce(capacity = Channel.UNLIMITED) {
       
        for (i in 0..3) {
       
            send(i)
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    ReceiveChannel SendChannel 都是 Channel 的父接口,前者定义了 receive, 后者定义了 send, Channel 也因此既可以使用 receive 又可以使用 send。
    通过 produce 和 actor 这两个协程构造器启动的协程也与返回的 Channel 自然地绑定到了一起,因此在协程结束时返回的 Channel 也会被立即关闭。

    produce 为例,它构造出了一个 ProducerCoroutine 对象,该对象也是 Job 的实现:

    private class ProducerCoroutine<E>(
        parentContext: CoroutineContext, channel: Channel<E>
    ) : ChannelCoroutine<E>(parentContext, channel, true, active = true), ProducerScope<E> {
       
        override val isActive: Boolean
            get() = super.isActive
    
        override fun onCompleted(value: Unit) {
       
            _channel.close() // 协程完成时关闭channel
        }
    
        override fun onCancelled(cause: Throwable, handled: Boolean) {
       
            val processed = _channel.close(cause) // 协程取消时关闭channel
            if (!processed && !handled) handleCoroutineException(context, cause)
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    注意,在协程完成和取消的方法调用 中, 对应的_channel 都会被关闭。produc actor 这两个构造器看上去都很有用,不过目前前者仍被标记为 Experimental CoroutinesApi, 后者则被标记为 ObsoleteCoroutinesApi, 后续仍然可能会有较大的改动。

    Channel的关闭

    对千一个 Channel 如果我们调用了它的 close() 方法,它会立即停止接收新元素,也就是说这时候它的 isClosedForSend 会立即返回 true 而由于 Channel 缓冲区的存在, 这时候可能还有一些元素没有被处理完,因此要等所有的元素都被读取之后 isClosedForReceive 才会返回 true

    一说到关闭,我们很容易想到 I/0, 如果不关闭 1/0 可能会造成资源泄露。那么 Channel 关闭有什么意义呢?前面我们提到过,Channel 内部的资源其实就是个缓冲区,如果我们创建 Channel 而不去关闭它。虽然并不会造成系统资源的泄露,但却会让接收端一直处千挂起等待的状态,因此一定要在适当的时机关闭 Channel。

    究竟由谁来关闭Channel,需要根据业务场景由发送端和接受端之间进行协商决定。如果发送端关闭了Channel,接受端还在调用receive方法,会导致异常,这时就需要进行异常处理:

    suspend fun testChannel2() {
       
        val channel = Channel<Int>()
        //生产者 发
        val producer = GlobalScope.launch {
       
            for (i in 0..3) {
       
                println("sending --------------> $i")
                channel.send(i)
            }
            channel.close() // 发送端关闭channel
        }
        //消费者 收
        val consumer = GlobalScope.launch {
       
            try {
       
                while (true) {
       
                    val value = channel.receive()
    //                 val value = channel.receiveCatching() // 这个方法不会抛出异常
                    println("received <<<<<<<<<<<<<<<<<< $value")
                }
            } catch (e : ClosedReceiveChannelException) {
       
                println("catch ClosedReceiveChannelException: ${
         e.message}")
            }
        }
        producer.join()
        consumer.join()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    发送端关闭了Channel,接受端还在调用receive方法,会抛出ClosedReceiveChannelException异常,如果使用receiveCatching()遇到close时就不会抛出异常,但是会使用null作为返回结果。

    BroadcastChannel

    创建 broadcastCbannel 的方法与创建普通的 Channel 几乎没有区别:

    val broadcastChannel = broadcastChannel<Int>(5)
    
    • 1

    如果要订阅功能,那么只 要调用如下方法

    val receiveChannel = broadcastChannel.openSubscription()
    
    • 1

    这样我们就得到了一个 ReceiveChannel,如果想要想获取订阅的消息,只需要调用它的 receive 函数;如果想要取消订阅则调用 cancel 函数即可。

    我们来看一个比较完整的例子,本示例中我们在发送端发 0 1 2, 并启动 3个协程同时接收广播,相关代码如下所示。

    suspend fun broadcast() {
       
        //下面几种都可以创建一个BroadcastChannel
        //val broadcastChannel = BroadcastChannel(Channel.BUFFERED)
        //val broadcastChannel = Channel(Channel.BUFFERED).broadcast()
        val broadcastChannel = GlobalScope.broadcast {
       
            for (i in 0..2) {
       
                send(i)
            }
        }
        //启动3个子协程作为接受者,每个都能收到
        List(3) {
        index ->
            GlobalScope.launch {
       
                val receiveChannel = broadcastChannel.openSubscription() // 订阅
                for (i in receiveChannel) {
       
                    println("[#$index] received: $i")
                }
            }
        }.joinAll()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    除了直接创建以外,我们也可以用前面定义的普通 Channel 进行转换,代码如下所示。

    // 通过 Channel 实例直接创建广播
    val channel = Channel<Int>()
    val broadcastChannel = channel.broadcast()
    
    • 1
    • 2
    • 3
    Channel 版本的序列生成器
    // 使用channel模拟序列生成器
    val channel = GlobalScope.produce {
       
        println("A")
        send(1)
        println("B")
        send(2)
        println("Done")
    }
    for (item in channel) {
       
        println("get $item")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    冷数据流Flow

    Sequence中不能调用其他挂起函数,不能设置调度器,只能单线程中使用。而Flow可以支持:

    // 序列生成器中不能调用其他挂起函数
    sequence {
       
        (1..3).forEach {
       
            yield(it)
            delay(100) // ERROR
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    创建Flow
    val intFlow = flow {
       
       (1..3).forEach {
       
           emit(it)
           delay(100)
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    Flow 也可以设定它运行时所使用的调度器:

    intFlow.flowDn(Dispatchers.IO)
    
    • 1

    通过 flowOn 设置的调度器只对它之前的操作有影响,因此这里意味着 intFlow 的构造逻辑会在 IO 调度器上执行。
    在这里插入图片描述

    最终读取 intFlow 需要调用 collect 函数, 这个函数也是一个挂起函数。我们启动一个协程来消费 intFlow, 代码如下所示

    suspend fun testFlows(){
       
        val dispatcher = Executors.newSingleThreadExecutor {
       
            Thread(it, "MyThread").also {
        it.isDaemon = true }
        }.asCoroutineDispatc
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
  • 相关阅读:
    设计模式-04-原型模式
    刷题记录:牛客NC24263[USACO 2018 Feb G]Directory Traversal
    后端八股笔记-----mysql
    vue 基于vue-cli3 发布npm 插件
    buuctf crypto 【rsa2】解题记录
    外贸爬虫系统
    JAVA webservice配置xfire
    2022-06-25 jvm调优 1
    【历史上的今天】11 月 9 日:TensorFlow 问世;Mozilla Firefox 发布标准版;英特尔和微软分道扬镳
    docker上安装的Jenkins但没有vi
  • 原文地址:https://blog.csdn.net/lyabc123456/article/details/127873200