operatorfun<T> Iterable<T>.plus(element: T): List<T> { if (thisis Collection) returnthis.plus(element) val result = ArrayList<T>() result.addAll(this) result.add(element) return result }
inlinefun<T, R> Iterable<T>.map( transform: (T) -> R ): List<R> { val size = if (thisis Collection<*>) this.size else10 val destination = ArrayList<R>(size) for (item inthis) destination.add(transform(item)) return destination }
而不是不可变集合:
1 2 3 4 5 6 7 8 9
// This is not how map is implemented inlinefun<T, R> Iterable<T>.map( transform: (T) -> R ): List<R> { var destination = listOf<R>() for (item inthis) destination += transform(item) return destination }
inlinefunrequestNewToken( hasToken: Boolean, crossinline onRefresh: () -> Unit, noinline onGenerate: () -> Unit ) { if (hasToken) { httpCall("get-token", onGenerate) // We must use // noinline to pass function as an argument to a // function that is not inlined } else { httpCall("refresh-token") { onRefresh() // We must use crossinline to // inline function in a context where // non-local return is not allowed onGenerate() } } }
val fibonacci: Sequence<BigDecimal> = sequence { var current = 1.toBigDecimal() var prev = 1.toBigDecimal() yield(prev) while (true) { yield(current) val temp = prev prev = current current += temp } }
// BAD SOLUTION, DO NOT USE COLLECTIONS FOR // POSSIBLY BIG FILES File("ChicagoCrimes.csv") .readLines() .drop(1) // Drop descriptions of the columns .mapNotNull { it.split(",").getOrNull(6) } // Find description .filter { "CANNABIS"in it } .count() .let(::println)
这段程序在我电脑上的运行结果是:
script
1
OutOfMemoryError.n> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
File("ChicagoCrimes.csv").useLines { lines -> // The type of `lines` is Sequence<String> lines.drop(1) // Drop descriptions of the columns .mapNotNull { it.split(",").getOrNull(6) } // Find description .filter { "CANNABIS"in it } .count() .let { println(it) } // 318185 }
@Suppress("UNCHECKED_CAST") funrunCallLoop(): R { while (true) { // Note: cont is set to null in DeepRecursiveScopeImpl.resumeWith when the whole computation completes val result = this.result val cont = this.cont ?: return (result as Result<R>).getOrThrow() // done -- final result // The order of comparison is important here for that case of rogue class with broken equals if (UNDEFINED_RESULT == result) { // call "function" with "value" using "cont" as completion val r = try { // This is block.startCoroutine(this, value, cont) function.startCoroutineUninterceptedOrReturn(this, value, cont) } catch (e: Throwable) { cont.resumeWithException(e) continue } // If the function returns without suspension -- calls its continuation immediately if (r !== COROUTINE_SUSPENDED) cont.resume(r as R) } else { // we returned from a crossFunctionCompletion trampoline -- call resume here this.result = UNDEFINED_RESULT // reset result back cont.resumeWith(result) } } }
$ jdk8-64/java -jar jol-cli.jar internals java.lang.Object # Running 64-bit HotSpot VM. # Using compressed oop with 3-bit shift. # Using compressed klass with 3-bit shift.
Instantiated the sample instance via default constructor.
java.lang.Object object internals: OFFSET SIZE TYPE DESCRIPTION VALUE 0 4 (object header) 05 00 00 00 # Mark word 4 4 (object header) 00 00 00 00 # Mark word 8 4 (object header) 00 10 00 00 # (not mark word) 12 4 (loss due to the next object alignment) Instance size: 16 bytes Space losses: 0 bytes internal + 4 bytes external = 4 bytes total
funsimple(): Sequence<Int> = sequence { // sequence builder for (i in1..3) { Thread.sleep(1000) // pretend we are computing it yield(i) // yield next value } }
funsimple(): Flow<Int> = flow { // flow builder for (i in1..3) { delay(100) // pretend we are doing something useful here emit(i) // emit next value } }
funmain() = runBlocking<Unit> { // Launch a concurrent coroutine to check if the main thread is blocked launch { for (k in1..3) { println("I'm not blocked $k") delay(100) } } // Collect the flow simple().collect { value -> println(value) } }
通过flow,我们可以实现异步的数据流。
2、Flow是冷数据流
冷数据流:每次调用collect函数时都会触发执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
funsimple(): Flow<Int> = flow { println("Flow started") for (i in1..3) { delay(100) emit(i) } }
funmain() = runBlocking<Unit> { println("Calling simple function...") val flow = simple() println("Calling collect...") flow.collect { value -> println(value) } println("Calling collect again...") flow.collect { value -> println(value) } }
3、Flow的取消
1 2 3 4 5 6 7 8 9 10 11 12 13 14
funsimple(): Flow<Int> = flow { for (i in1..3) { delay(100) println("Emitting $i") emit(i) } }
funmain() = runBlocking<Unit> { withTimeoutOrNull(250) { // Timeout after 250ms simple().collect { value -> println(value) } } println("Done") }
4、如何构建Flow?
flow{ ... }
flowOf
.asFlow()
5、Flow操作符
5.1 中间操作符
map
filter
transform
take
5.2 终止操作符
collect
toList / toSet
first / single
reduce / fold
5.3 buffer当流的处理速度慢于发射速度时,通过buffer提前缓存可以提高效率:
优化前:耗时1200ms
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
funsimple(): Flow<Int> = flow { for (i in1..3) { delay(100) // pretend we are asynchronously waiting 100 ms emit(i) // emit next value } }
funmain() = runBlocking<Unit> { val time = measureTimeMillis { simple().collect { value -> delay(300) // pretend we are processing it for 300 ms println(value) } } println("Collected in $time ms") }
优化后:耗时1000ms
1 2 3 4 5 6 7 8 9
val time = measureTimeMillis { simple() .buffer() // buffer emissions, don't wait .collect { value -> delay(300) // pretend we are processing it for 300 ms println(value) } } println("Collected in $time ms")
5.4 conflate当生产速度大于消费速度,忽略未来得及处理的值
1 2 3 4 5 6 7 8 9 10 11 12 13
val time = measureTimeMillis { simple() .conflate() // conflate emissions, don't process each one .collect { value -> delay(300) // pretend we are processing it for 300 ms println(value) } } println("Collected in $time ms") //输出: //1 //3 //Collected in 758 ms
5.5 collectLatest 如果消费者很慢,取消它,新值过来时再重新启动。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
val time = measureTimeMillis { simple() .collectLatest { value -> // cancel & restart on the latest value println("Collecting $value") delay(300) // pretend we are processing it for 300 ms println("Done $value") } } println("Collected in $time ms") //output: //Collecting 1 //Collecting 2 //Collecting 3 //Done 3 //Collected in 741 ms
5.6 多个flow合并:zip / combine
使用zip操作符
1 2 3 4 5 6 7 8
val nums = (1..3).asFlow() // numbers 1..3 val strs = flowOf("one", "two", "three") // strings nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string .collect { println(it) } // collect and print //output: //1 -> one //2 -> two //3 -> three
当两个flow不同步时:
1 2 3 4 5 6 7 8 9 10 11 12
//速度不同步时,使用zip,到达同步点才输出 val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms val startTime = System.currentTimeMillis() // remember the start time nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip" .collect { value -> // collect and print println("$value at ${System.currentTimeMillis() - startTime} ms from start") } //output: //1 -> one at 428 ms from start //2 -> two at 828 ms from start //3 -> three at 1230 ms from start
1 2 3 4 5 6 7 8 9 10 11 12 13 14
////速度不同步时,使用combine,有值到达即输出,无视同步 val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms val startTime = System.currentTimeMillis() // remember the start time nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine" .collect { value -> // collect and print println("$value at ${System.currentTimeMillis() - startTime} ms from start") } //output: //1 -> one at 452 ms from start //2 -> one at 651 ms from start //2 -> two at 854 ms from start //3 -> two at 952 ms from start //3 -> three at 1256 ms from start