热点新闻
Flow使用笔记
2023-07-06 06:40  浏览:462  搜索引擎搜索“爱农网”
温馨提示:信息一旦丢失不一定找得到,请务必收藏信息以备急用!本站所有信息均是注册会员发布如遇到侵权请联系文章中的联系方式或客服删除!
联系我时,请说明是在爱农网看到的信息,谢谢。
展会发布 展会网站大全 报名观展合作 软文发布

Flow是什么

Flow用于表达多个连续的异步过程。

实现方式为使用协程封装成生产者消费者模式,上游流负责生产,下游流负责消耗。

Flow创建

创建

fun simpleFlow() = flow { for (i in 1..3) { delay(100) emit(i) } }

消费

fun createFlowTest() { runBlocking { simpleFlow().collect { Log.v(TAG, "收到数据111 = $it") } Log.v(TAG, "======") simpleFlow().collect { Log.v(TAG, "收到数据2222 = $it") } } }

以上输出为

收到数据111 = 1 收到数据111 = 2 收到数据111 = 3 ====== 收到数据2222 = 1 收到数据2222 = 2 收到数据2222 = 3

runBlocking { //flowOf构造器构造 flowOf(1, 2, 3) .onEach { delay(100) } .collect { Log.v(TAG, "flowOf 构造方法 = $it") } //asFlow 构造 (1..5).asFlow().onEach { delay(100) }.collect { Log.v(TAG, "asFlow 构造方法 = $it") } }

以上输出为

flowOf 构造方法 = 1 flowOf 构造方法 = 2 flowOf 构造方法 = 3 asFlow 构造方法 = 1 asFlow 构造方法 = 2 asFlow 构造方法 = 3 asFlow 构造方法 = 4 asFlow 构造方法 = 5

接收

Flow为冷流,冷流不会发射数据,只有到了收集(末端操作符)的时候,数据才开始生产并被发射出去。

collect()

上游流emit()之后,下游流在collect()中接收数据

中间操作符

filter() 过滤

map()转换

runBlocking { (1..10).asFlow().onEach { delay(100) }.filter { it % 2 == 0 }.map { it * 2 }.collect { Log.v(TAG, "中间操作符filter map 测试 = $it") } }

输出结果为

中间操作符filter map 测试 = 4 中间操作符filter map 测试 = 8 中间操作符filter map 测试 = 12 中间操作符filter map 测试 = 16 中间操作符filter map 测试 = 20

flowOn() 指定上游流是否使用子协程

runBlocking { flow { Log.v(TAG, "flow :${Thread.currentThread().name}") for (i in 1..5) { delay(100) emit(i) } }.flowOn(Dispatchers.Default) .collect { Log.v(TAG, "collect:${Thread.currentThread().name} $it") } }

输出结果为

flow :DefaultDispatcher-worker-1 collect:main 1 collect:main 2 collect:main 3 collect:main 4 collect:main 5

可以看出,只改变了发射的协程,没改变接收的

launchIn() 让flow下游使用全新的携程上下文

runBlocking { flow { Log.v(TAG, "flow :${Thread.currentThread().name}") for (i in 1..5) { delay(100) emit(i) } }.flowOn(Dispatchers.Default) .onEach { Log.v(TAG, "collect:${Thread.currentThread().name} $it") } .launchIn(CoroutineScope(Dispatchers.IO)) .join()//主线程等待这个协程执行结束 }

输出结果为

flow :DefaultDispatcher-worker-3 collect:DefaultDispatcher-worker-1 1 collect:DefaultDispatcher-worker-2 2 collect:DefaultDispatcher-worker-3 3 collect:DefaultDispatcher-worker-2 4 collect:DefaultDispatcher-worker-3 5

可以看出,上下游协程都发生了改变

背压 :

一般情况下,上下游执行同步

runBlocking { val flow = flow { for (i in 1..3) { delay(100) emit(i) } } val time = measureTimeMillis { flow .collect { delay(300) Log.v(TAG, "$it") } } Log.v(TAG, "time : $time ms") }

输出结果为 1223 ms,可以看出,上下游是同步执行的

buff() 上游流不等待下游流接收,而是发射到缓冲区

runBlocking { val flow = flow { for (i in 1..3) { delay(100) emit(i) } } val time = measureTimeMillis { flow .buffer(50)//指定缓存区大小为50个 .collect { delay(300) Log.v(TAG, "$it") } } Log.v(TAG, "time : $time ms") }

输出结果为time : 1078 ms,可以看出,当使用缓冲区时,上游的生产(除第一次生产外)的结果存放到缓冲区内

  • 还可以选择如下函数,来指定背压时的丢弃策略

conflate() 下游流来不及处理的,丢掉

collectLast() 只接收上游流发射的最后一个元素

take(n) 只取前n个

末端操作

collect(),收集,不赘述

flod() 、 reduce()将元素整合

runBlocking { val value = flow { for (i in 1..3) { emit(i) } }.map { it * it }.fold(0) { acc, value -> acc + value } Log.v(TAG, "value = $value") }

输出结果为 value = 14

runBlocking { val value = flow { for (i in 1..3) { emit(i) } }.reduce { accumulator, value -> accumulator + value } Log.v(TAG, "value = $value") }

输出结果为 value = 6

组合操作

zip()

runBlocking { val flow1 = flow { for (i in 1..3) { emit(i) } } val flow2 = flowOf("one", "two", "three") flow1.zip(flow2) { a, b -> "$a -> $b" }.collect { value -> Log.v(TAG, value) } }

输出结果为

1 -> one 2 -> two 3 -> three

如果两个flow长度不一致的情况,按照短的处理

展平操作符

flatMapConcat()

runBlocking { flow { for (i in 1..3) { emit(i) } }.flatMapConcat { flow { emit("first $it") emit("second $it") } }.collect { Log.v(TAG, "result = $it") } }

输出结果为

result = first 1 result = second 1 result = first 2 result = second 2 result = first 3 result = second 3

以上操作一般用于集合的集合之中

catch()

runBlocking { val flow = flow { for (i in 1..3) { emit(i) throw RuntimeException() } }.catch { e -> Log.v(TAG, "caught: $e") }.collect { println(it) } }

输出结果为 caught: java.lang.RuntimeException。可以看出,成功捕获了异常

下篇文章,从源码的角度看看flow是如何运行的

发布人:d1d1****    IP:125.64.05.***     举报/删稿
展会推荐
让朕来说2句
评论
收藏
点赞
转发