funsimple(): Sequence<Int> = sequence { // sequence builder for (i in1..3) { Thread.sleep(1000) // pretend we are computing it yield(i) // yield next value } }
funsimple(): Flow<Int> = flow { // flow builder for (i in1..3) { delay(100) // pretend we are doing something useful here emit(i) // emit next value } }
funmain() = runBlocking<Unit> { // Launch a concurrent coroutine to check if the main thread is blocked launch { for (k in1..3) { println("I'm not blocked $k") delay(100) } } // Collect the flow simple().collect { value -> println(value) } }
通过flow,我们可以实现异步的数据流。
2、Flow是冷数据流
冷数据流:每次调用collect函数时都会触发执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
funsimple(): Flow<Int> = flow { println("Flow started") for (i in1..3) { delay(100) emit(i) } }
funmain() = runBlocking<Unit> { println("Calling simple function...") val flow = simple() println("Calling collect...") flow.collect { value -> println(value) } println("Calling collect again...") flow.collect { value -> println(value) } }
3、Flow的取消
1 2 3 4 5 6 7 8 9 10 11 12 13 14
funsimple(): Flow<Int> = flow { for (i in1..3) { delay(100) println("Emitting $i") emit(i) } }
funmain() = runBlocking<Unit> { withTimeoutOrNull(250) { // Timeout after 250ms simple().collect { value -> println(value) } } println("Done") }
funsimple(): Flow<Int> = flow { for (i in1..3) { delay(100) // pretend we are asynchronously waiting 100 ms emit(i) // emit next value } }
funmain() = runBlocking<Unit> { val time = measureTimeMillis { simple().collect { value -> delay(300) // pretend we are processing it for 300 ms println(value) } } println("Collected in $time ms") }
优化后:耗时1000ms
1 2 3 4 5 6 7 8 9
val time = measureTimeMillis { simple() .buffer() // buffer emissions, don't wait .collect { value -> delay(300) // pretend we are processing it for 300 ms println(value) } } println("Collected in $time ms")
5.4 conflate当生产速度大于消费速度,忽略未来得及处理的值
1 2 3 4 5 6 7 8 9 10 11 12 13
val time = measureTimeMillis { simple() .conflate() // conflate emissions, don't process each one .collect { value -> delay(300) // pretend we are processing it for 300 ms println(value) } } println("Collected in $time ms") //输出: //1 //3 //Collected in 758 ms
5.5 collectLatest 如果消费者很慢,取消它,新值过来时再重新启动。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
val time = measureTimeMillis { simple() .collectLatest { value -> // cancel & restart on the latest value println("Collecting $value") delay(300) // pretend we are processing it for 300 ms println("Done $value") } } println("Collected in $time ms") //output: //Collecting 1 //Collecting 2 //Collecting 3 //Done 3 //Collected in 741 ms
5.6 多个flow合并:zip / combine
使用zip操作符
1 2 3 4 5 6 7 8
val nums = (1..3).asFlow() // numbers 1..3 val strs = flowOf("one", "two", "three") // strings nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string .collect { println(it) } // collect and print //output: //1 -> one //2 -> two //3 -> three
当两个flow不同步时:
1 2 3 4 5 6 7 8 9 10 11 12
//速度不同步时,使用zip,到达同步点才输出 val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms val startTime = System.currentTimeMillis() // remember the start time nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip" .collect { value -> // collect and print println("$value at ${System.currentTimeMillis() - startTime} ms from start") } //output: //1 -> one at 428 ms from start //2 -> two at 828 ms from start //3 -> three at 1230 ms from start
1 2 3 4 5 6 7 8 9 10 11 12 13 14
////速度不同步时,使用combine,有值到达即输出,无视同步 val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms val startTime = System.currentTimeMillis() // remember the start time nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine" .collect { value -> // collect and print println("$value at ${System.currentTimeMillis() - startTime} ms from start") } //output: //1 -> one at 452 ms from start //2 -> one at 651 ms from start //2 -> two at 854 ms from start //3 -> two at 952 ms from start //3 -> three at 1256 ms from start