• Kotlin数据流概览


    一 什么是数据流

    数据流以协程为基础构建,可提供多个值。从概念上来讲,数据流是可通过异步方式进行计算处理的一组数据序列

    数据流包含三个实体:

    • 提供方会生成添加到数据流中的数据。得益于协程,数据流还可以异步生成数据。
    • (可选)中介可以修改发送到数据流的值,或修正数据流本身。
    • 使用方则使用数据流中的值。

    二 创建数据流

    如需创建数据流,请使用数据流构建器 API。flow 构建器函数会创建一个新数据流,可使用 emit 函数手动将新值发送到数据流中。

    class NewsRemoteDataSource(
        private val newsApi: NewsApi,
        private val refreshIntervalMs: Long = 5000
    ) {
        val latestNews: Flow<List<ArticleHeadline>> = flow {
            while(true) {
                val latestNews = newsApi.fetchLatestNews()
                emit(latestNews) // Emits the result of the request to the flow
                delay(refreshIntervalMs) // Suspends the coroutine for some time
            }
        }
    }
    
    // Interface that provides a way to make network requests with suspend functions
    interface NewsApi {
        suspend fun fetchLatestNews(): List<ArticleHeadline>
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    flow 构建器在协程内执行。因此,它将受益于相同异步 API,但也存在一些限制:

    • 数据流是有序的。当协程内的提供方调用挂起函数时,提供方会挂起,直到挂起函数返回。在此示例中,提供方会挂起,直到 fetchLatestNews 网络请求完成为止。只有这样,请求结果才会发送到数据流中。
    • 使用 flow 构建器时,提供方不能提供来自不同 CoroutineContext 的 emit 值。因此,请勿通过创建新协程或使用 withContext 代码块,在不同 CoroutineContext 中调用 emit。在这些情况下,可使用其他数据流构建器,例如 callbackFlow。

    三 修改数据流

    中介可以利用中间运算符如map在不使用值的情况下修改数据流。这些运算符都是函数,可在应用于数据流时,设置一系列暂不执行的链式运算,留待将来使用值时执行。

    class NewsRepository(
        private val newsRemoteDataSource: NewsRemoteDataSource,
        private val userData: UserData
    ) {
        /**
         * Returns the favorite latest news applying transformations on the flow.
         * These operations are lazy and don't trigger the flow. They just transform
         * the current value emitted by the flow at that point in time.
         */
        val favoriteLatestNews: Flow<List<ArticleHeadline>> =
            newsRemoteDataSource.latestNews
                // Intermediate operation to filter the list of favorite topics
                .map { news -> news.filter { userData.isFavoriteTopic(it) } }
                // Intermediate operation to save the latest news in the cache
                .onEach { news -> saveInCache(news) }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    四 从数据流中进行收集

    使用终端运算符可触发数据流开始监听值。如需获取数据流中的所有发出值,请使用 collect

    class LatestNewsViewModel(
        private val newsRepository: NewsRepository
    ) : ViewModel() {
    
        init {
            viewModelScope.launch {
                // Trigger the flow and consume its elements using collect
                newsRepository.favoriteLatestNews.collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    数据流收集可能会由于以下原因而停止:

    • 如上例所示,协程收集被取消。此操作也会让底层提供方停止活动。
    • 提供方完成发出数据项。在这种情况下,数据流将关闭,调用 collect 的协程则继续执行。

    五 数据流捕获异常

    使用 catch 中间运算符

    class LatestNewsViewModel(
        private val newsRepository: NewsRepository
    ) : ViewModel() {
    
        init {
            viewModelScope.launch {
                newsRepository.favoriteLatestNews
                    // Intermediate catch operator. If an exception is thrown,
                    // catch and update the UI
                    .catch { exception -> notifyError(exception) }
                    .collect { favoriteNews ->
                        // Update View with the latest favorite news
                    }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    六 在不同 CoroutineContext 中执行

    flow 构建器的提供方会通过从中收集的协程的 CoroutineContext 执行,并且如前所述,它无法从不同 CoroutineContext 对值执行 emit 操作。如需更改数据流的 CoroutineContext,使用中间运算符 flowOn

    class NewsRepository(
        private val newsRemoteDataSource: NewsRemoteDataSource,
        private val userData: UserData,
        private val defaultDispatcher: CoroutineDispatcher
    ) {
        val favoriteLatestNews: Flow<List<ArticleHeadline>> =
            newsRemoteDataSource.latestNews
                .map { news -> // Executes on the default dispatcher
                    news.filter { userData.isFavoriteTopic(it) }
                }
                .onEach { news -> // Executes on the default dispatcher
                    saveInCache(news)
                }
                // flowOn affects the upstream flow ↑
                .flowOn(defaultDispatcher)
                // the downstream flow ↓ is not affected
                .catch { exception -> // Executes in the consumer's context
                    emit(lastCachedNews())
                }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    七 Jetpack 库中的数据流

    Flow with Room 接收有关数据库更改的通知

    @Dao
    abstract class ExampleDao {
        @Query("SELECT * FROM Example")
        abstract fun getExamples(): Flow<List<Example>>
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    八 将基于回调的 API 转换为数据流

    callbackFlow 是一个数据流构建器,允许您将基于回调的 API 转换为数据流。

    class FirestoreUserEventsDataSource(
        private val firestore: FirebaseFirestore
    ) {
        // Method to get user events from the Firestore database
        fun getUserEvents(): Flow<UserEvents> = callbackFlow {
    
            // Reference to use in Firestore
            var eventsCollection: CollectionReference? = null
            try {
                eventsCollection = FirebaseFirestore.getInstance()
                    .collection("collection")
                    .document("app")
            } catch (e: Throwable) {
                // If Firebase cannot be initialized, close the stream of data
                // flow consumers will stop collecting and the coroutine will resume
                close(e)
            }
    
            // Registers callback to firestore, which will be called on new events
            val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
                if (snapshot == null) { return@addSnapshotListener }
                // Sends events to the flow! Consumers will get the new events
                try {
                    offer(snapshot.getEvents())
                } catch (e: Throwable) {
                    // Event couldn't be sent to the flow
                }
            }
    
            // The callback inside awaitClose will be executed when the flow is
            // either closed or cancelled.
            // In this case, remove the callback from Firestore
            awaitClose { subscription?.remove() }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
  • 相关阅读:
    JSP JAVA javaweb企业仓库库存管理系统(仓库进销存管理系统ssm库存管理系统仓库管理系统)
    环状分组柱状图 Python
    基于51单片机的指纹考勤器
    Java 中的 void 和 Kotlin 的 Unit
    uboot启动参数详解和一些细节
    线性回归分析----学院综测成绩能显示出什么信息?
    如何获得jd商品分类API数据
    浏览器支持http-flv协议
    #每日一题合集#牛客JZ13-JZ22
    DETR训练自己数据集心得
  • 原文地址:https://blog.csdn.net/baopengjian/article/details/134035082