【Koltin Flow(一)】五种创建flow的方式
【Koltin Flow(二)】Flow操作符之末端操作符
【Koltin Flow(三)】Flow操作符之中间操作符(一)
【Koltin Flow(三)】Flow操作符之中间操作符(二)
【Koltin Flow(三)】Flow操作符之中间操作符(三)
retryWhen控制重试,两个回调参数cause为发生的异常,attempt为当前重试下标,从0开始。
flow<Int> {
if (index < 2) {
index++
throw RuntimeException("runtime exception index $index")
}
emit(100)
}.retry(2).catch {
Log.e(TAG.TAG, "ex is $it")
}.collect {
Log.d(TAG.TAG, "retry(2) $it")
}
index = 0
flow<Int> {
if (index < 2) {
index++
throw RuntimeException("runtime exception index $index")
}
emit(100)
}.retry {
it is RuntimeException
}.catch {
Log.e(TAG.TAG, "ex is $it")
}.collect {
Log.d(TAG.TAG, "retry{} $it")
}
index = 0
flow<Int> {
if (index < 2) {
index++
throw RuntimeException("runtime exception index $index")
}
emit(100)
}.retryWhen { cause, attempt ->
Log.d(TAG.TAG, "cause is $cause,attempt is $attempt")
cause is RuntimeException
} .catch {
Log.e(TAG.TAG, "ex is $it")
}.collect {
Log.d(TAG.TAG, "retryWhen $it")
}
2022-08-02 10:26:55.301 4775-4801/edu.test.demo D/Test-TAG: retry(2) 100
2022-08-02 10:26:55.302 4775-4801/edu.test.demo D/Test-TAG: retry{} 100
2022-08-02 10:26:55.304 4775-4801/edu.test.demo D/Test-TAG: cause is java.lang.RuntimeException: runtime exception index 1,attempt is 0
2022-08-02 10:26:55.304 4775-4801/edu.test.demo D/Test-TAG: cause is java.lang.RuntimeException: runtime exception index 2,attempt is 1
2022-08-02 10:26:55.304 4775-4801/edu.test.demo D/Test-TAG: retryWhen 100
代码1
(1..10).asFlow().cancellable().catch {
Log.e(TAG.TAG,"ex is $it")
}.collect {
if (it == 5){
cancel()
}
Log.d(TAG.TAG, " (1..10).asFlow() cancellable $it")
}
代码2
flow {
repeat(10){
emit(it)
}
}.collect {
if (it == 5){
cancel()
}
Log.d(TAG.TAG, "flow{} cancellable $it")
}
日志1
2022-08-02 11:03:27.029 6421-6448/edu.test.demo D/Test-TAG: (1..10).asFlow() cancellable 1
2022-08-02 11:03:27.030 6421-6448/edu.test.demo D/Test-TAG: (1..10).asFlow() cancellable 2
2022-08-02 11:03:27.030 6421-6448/edu.test.demo D/Test-TAG: (1..10).asFlow() cancellable 3
2022-08-02 11:03:27.030 6421-6448/edu.test.demo D/Test-TAG: (1..10).asFlow() cancellable 4
2022-08-02 11:03:27.035 6421-6448/edu.test.demo D/Test-TAG: (1..10).asFlow() cancellable 5
日志2
2022-08-02 11:10:51.501 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 0
2022-08-02 11:10:51.501 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 1
2022-08-02 11:10:51.501 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 2
2022-08-02 11:10:51.501 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 3
2022-08-02 11:10:51.502 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 4
2022-08-02 11:10:51.505 6561-6586/edu.test.demo D/Test-TAG: flow{} cancellable 5
/**
* Creates a flow that produces values from the range.
*/
public fun IntRange.asFlow(): Flow<Int> = flow {
forEach { value ->
emit(value)
}
}
不是unsafeFlow呀,直接也是flow呀,这里注意导入:
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
看出来了吧,是unsafeFlow…,到这里asFlow就解释的差不多了。
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
SafeFlow,是不是从名字也可以看出一些端倪,如果不清楚在继续往下看:
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T>{
......
}
看到了吧,CancellableFlow,可取消的flow,再看
/**
* Internal marker for flows that are [cancellable].
*/
internal interface CancellableFlow<out T> : Flow<T>
是不是更清楚了,flow{},生成的本身就是CancellableFlow,到此处flow{},也解释的差不多了。
public fun <T> Flow<T>.cancellable(): Flow<T> =
when (this) {
is CancellableFlow<*> -> this // Fast-path, already cancellable
else -> CancellableFlowImpl(this)
}
两种情况,一种本身就是CancellableFlow,直接返回自己,另外一种,则创建CancellableFlowImpl,再看看CancellableFlowImpl:
private class CancellableFlowImpl<T>(private val flow: Flow<T>) : CancellableFlow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
flow.collect {
currentCoroutineContext().ensureActive()
collector.emit(it)
}
}
}
可以看到最后得出的也是CancellableFlow。
flow {
Log.d(TAG.TAG,"Default-emit current is ${Thread.currentThread().name}")
emit(10)
}.flowOn(Dispatchers.Default).collect {
Log.d(TAG.TAG,"Default-collect current is ${Thread.currentThread().name}")
}
flow {
Log.d(TAG.TAG,"Main-emit current is ${Thread.currentThread().name}")
emit(10)
}.flowOn(Dispatchers.Main).collect {
Log.d(TAG.TAG,"Main-collect current is ${Thread.currentThread().name}")
}
2022-08-02 11:44:26.978 7102-7128/edu.test.demo D/Test-TAG: Default-emit current is DefaultDispatcher-worker-1
2022-08-02 11:44:26.985 7102-7128/edu.test.demo D/Test-TAG: Default-collect current is DefaultDispatcher-worker-1
2022-08-02 11:44:27.115 7102-7102/edu.test.demo D/Test-TAG: Main-emit current is main
2022-08-02 11:44:27.118 7102-7128/edu.test.demo D/Test-TAG: Main-collect current is DefaultDispatcher-worker-1