Flow 具有异步挂起 suspend 响应式编程,可以使用挂起函数来异步生产和消费事件,Flow 的设计灵感也来源于响应式流以及其各种实现。
- suspend fun test1() {
- flow<Int> {
- (0..4).forEach {
- emit(it)//生产者发送数据
- }
- }.collect {
- println(it)
- }
- }
flow {} 函数创建了一个冷数据流 Flow ,通过 emit 来发射数据,然后通过 collect 函数来收集这些数据。但是因为 collect 是挂起函数,挂起函数的调用又必须在另一个挂起函数或者协程作用域中。此时就需要我们使用协程来执行。
- fun main() {
- runBlocking {
- test1()
- }
- }
- findViewById
(R.id.textView).setOnClickListener() { - lifecycleScope.launch {
- flow1()
- }
- }
-
- private suspend fun flow1() {
- flow<Int> {
- (0..4).forEach {
- Log.i("TAG", "flow:${currentCoroutineContext()}")
- emit(it)//生产者发送数据
- }
- }.collect {
- Log.i("TAG", "collect:${currentCoroutineContext()} it:$it")
- }
- }
lifecycleScope.launch 默认是主线程执行的,按照协程的执行原理,我们可以确定上面例子中所有的执行操作都是在主线程上:
flow:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate]
collect:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate] it:0
flow:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate]
collect:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate] it:1
flow:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate]
collect:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate] it:2
flow:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate]
collect:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate] it:3
flow:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate]
collect:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate] it:4
当我们调用 flowOn切换线程时
- private suspend fun flow1() {
- flow<Int> {
- (0..2).forEach {
- Log.i("TAG", "flow:${currentCoroutineContext()}")
- emit(it)//生产者发送数据
- }
- }.flowOn(Dispatchers.IO)
- .collect {
- Log.i("TAG", "collect:${currentCoroutineContext()} it:$it")
- }
- }
可以看到 flow 代码块中的执行已经切换到另外一个线程执行。但是 collect 中的代码依然执行在主线程上。
输出:
- flow:[ProducerCoroutine{Active}@9064722, Dispatchers.IO]
- flow:[ProducerCoroutine{Active}@9064722, Dispatchers.IO]
- collect:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate] it:0
- collect:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate] it:1
- collect:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate] it:2
增加map,看看
- private suspend fun flow1() {
- flow<Int> {
- (0..2).forEach {
- Log.i("TAG", "flow:${currentCoroutineContext()}")
- emit(it)//生产者发送数据
- }
- }.flowOn(Dispatchers.IO)
- .map {
- Log.i("TAG", "map:${currentCoroutineContext()}")
- it
- }
- .collect {
- Log.i("TAG", "collect:${currentCoroutineContext()} it:$it")
- }
- }
输出:
- flow:[ProducerCoroutine{Active}@9064722, Dispatchers.IO]
- flow:[ProducerCoroutine{Active}@9064722, Dispatchers.IO]
- map:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate]
- collect:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate] it:0
- map:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate]
- collect:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate] it:1
- map:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate]
- collect:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate] it:2
总结:
1、flowOn 可以将执行此流的上下文更改为指定的上下文。
2、flowOn可以进行组合使用。
3、flowOn只影响前面没有自己上下文的操作符。已经有上下文的操作符不受后面 flowOn影响。
4、不管 flowOn 如何切换线程, collect 始终是运行在调用它的协程调度器上。
1))onStart: 在上游流启动之前被调用
2)onEach:在上游流的每个值被下游发出之前调用。
3)onCompletion:在流程完成或者取消后调用,并将取消异常或失败作为操作的原因参数传递。
- private suspend fun flow2(){
- flow {
- Log.d("TAG","flow")
- emit(1)
- }.onStart {
- Log.d("TAG","onStart")
- }.onEach {
- Log.d("TAG","onEach")
- }.onCompletion {
- Log.d("TAG","onCompletion")
- }.collect {
- Log.d("TAG","collect")
- }
- }
输出:
onStart
flow
onEach
collect
onCompletion
- private suspend fun flow3() {
- flow {
- Log.d("TAG", "flow")
- emit(1)
- throw NullPointerException("空指针")
- }.onStart {
- Log.d("TAG", "onStart")
- }.onEach {
- Log.d("TAG", "onEach")
- }.catch {
- Log.e("TAG", "catch $it")
- }.onCompletion {
- Log.d("TAG", "onCompletion")
- }.collect {
- Log.d("TAG", "collect")
- }
- }
输出:
onStart
flow
onEach
collectonCompletion
catch java.lang.NullPointerException: 空指针
- private fun flow13() {
- var index = 0
- lifecycleScope.launch {
- flow {
- if (index < 2) {
- index++
- Log.e("TAG", "出现错误:$index")
- throw RuntimeException("runtime exception index $index")
- }
- emit(100)
- }.retry(2).catch {
- Log.e("TAG", "catch: $it")
- }.collect {
- Log.d("TAG", "collect $it")
- }
- }
- }
输出
出现错误:1
出现错误:2
collect 100
3)retryWhen
- private fun flow14() {
- var index = 0
- lifecycleScope.launch {
- flow {
- if (index < 2) {
- index++
- Log.e("TAG", "出现错误:$index")
- throw RuntimeException("runtime exception index $index")
- }
- emit(100)
- }.retryWhen{ cause, attempt ->
- Log.e("TAG","cause is $cause,attempt is $attempt")
- cause is RuntimeException
- }.catch {
- Log.e("TAG", "catch: $it")
- }.collect {
- Log.d("TAG", "collect $it")
- }
- }
- }
出现错误:1
cause is java.lang.RuntimeException: runtime exception index 1,attempt is 0
出现错误:2
cause is java.lang.RuntimeException: runtime exception index 2,attempt is 1
collect 100
- private fun flow4() {
- lifecycleScope.launch {
- (1..3).asFlow().transform {
- emit(it)
- emit("transform $it")
- }.collect {
- println("collect: $it")
- }
- }
- }
transfrom 操作符任意值任意此,其他转换操作符都是基于 transform 进行扩展。比如:可以在执行长时间运行的异步请求之前,发射一个字符串并跟踪这个响应。
输出:
collect: 1
collect: transform 1
collect: 2
collect: transform 2
collect: 3
collect: transform 3
数据转换操作符
- private fun flow5() {
- lifecycleScope.launch {
- flow {
- emit(1)
- }.map {
- Log.d("TAG", "第一次转换")
- it * 5
- }.map {
- Log.d("TAG", "第二次转换")
- "map $it"
- }.collect {
- Log.d("TAG", "最终转换后值:$it")
- }
- }
- }
输出:
第一次转换
第二次转换
最终转换后值:map 5
fliter 操作符主要是对数据进行一个过滤,返回仅包含与给定匹配的原始流的值的流。
fliter 还有很多同类型操作符,如:filterNot / filterIsInstance / filterNotNull
- private fun flow6() {
- lifecycleScope.launch {
- (1..3).asFlow().filter {
- it < 2
- }.collect {
- println("it:$it")
- }
- }
- }
输出:
it:1
zip 操作符用于组合两个流中的相关值,与 RxJava 中的 zip 功能一样;
- private fun flow7() {
- val flow1 = (1..3).asFlow()
- val flow2 = flowOf("one", "two", "three")
- lifecycleScope.launch {
- flow2.zip(flow1) { value1, value2 ->
- "$value1:$value2"
- }.collect {
- Log.d("TAG", "collect:$it")
- }
- }
- }
输出:
collect:one:1
collect:two:2
collect:three:3
take 操作符返回包含第一个计数元素的流,当发射次数大于等于 count 的值时,通过抛出异常来取消执行。
- private fun flow8() {
- lifecycleScope.launch {
- (1..3).asFlow().take(2)
- .collect {
- Log.d("TAG", "it:$it")
- }
- }
- }
输出:
it:1
it:2
takeWhile 操作符与 filter 类似,不过他是当遇到条件判断为 false 的时候,将会中断后续的操作。
- private fun flow9() {
- lifecycleScope.launch {
- flowOf(1, 1, 2, 3, 1, 4).map {
- delay(100)
- it
- }.takeWhile {
- it == 1
- }.collect {
- Log.d("TAG", "it:$it")
- }
- }
- }
输出:
it:1
it:1
drop 操作符与 take 相反,它是丢弃掉指定的 count 数量后执行后续的流。
- private fun flow10() {
- lifecycleScope.launch {
- (1..3).asFlow().drop(2)
- .collect {
- Log.d("TAG", "it:$it")
- }
- }
- }
输出:
it:3
collect 是最基础的末端操作符,基本上每一个例子当中我们都是使用 collect。
toList 操作符是将我们的流转换成一个List集合
- private fun flow11() {
- lifecycleScope.launch {
- val list = (1..5).asFlow().toList()
- Log.d("TAG", "toList:$list")
- }
- }
输出:
toList:[1, 2, 3, 4, 5]
- private fun flow12() {
- lifecycleScope.launch {
- val time = measureTimeMillis {
- (1..3).asFlow().map {
- delay(100)
- it
- }.buffer().collect {
- delay(300)
- Log.d("TAG", "it:$it")
- }
- }
- Log.d("TAG","collected in $time ms")
- }
- }
输出:
it:1
it:2
it:3
collected in 1060 ms
参考: