Cafedev, ngôi nhà chia sẻ kiến thức và đam mê lập trình, hôm nay mở đầu cho chủ đề “Kotlin với Asynchronous Flow”. Trong không gian này, chúng ta sẽ khám phá sức mạnh của ngôn ngữ lập trình Kotlin khi kết hợp với Asynchronous Flow, mở ra những khả năng mới đầy hứa hẹn. Tại Cafedev, chúng ta không chỉ chia sẻ kiến thức, mà còn tạo nên cộng đồng sôi động, nơi mà niềm đam mê lập trình được truyền đạt qua từng dòng code. Hãy cùng nhau bắt đầu hành trình này tại Cafedev!

Một hàm treo (suspending function) trả về bất đồng bộ một giá trị duy nhất, nhưng làm thế nào để trả về nhiều giá trị được tính toán bất đồng bộ? Đây là nơi mà Kotlin Flow xuất hiện.

1. Đại diện cho nhiều giá trị

Nhiều giá trị có thể được đại diện trong Kotlin bằng cách sử dụng [collections]. Ví dụ, chúng ta có thể có một hàm simple trả về một [List] bao gồm ba số và sau đó in chúng bằng cách sử dụng [forEach]:

fun simple(): List<Int> = listOf(1, 2, 3)
 
fun main() {
    simple().forEach { value -> println(value) } 
}

Output:

1
2
3

1.1 Sequences

Nếu chúng ta tính toán các số với một đoạn mã chặn tài nguyên CPU (mỗi tính toán mất 100ms), sau đó chúng ta có thể đại diện cho các số bằng một [Sequence]:

fun simple(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it
        yield(i) // yield next value
    }
}

fun main() {
    simple().forEach { value -> println(value) } 
}

Bạn có thể lấy toàn bộ mã nguồn tại đây.


Mã này xuất ra các số giống nhau, nhưng nó đợi 100ms trước khi in mỗi số.

1.2 Hàm treo (Suspending functions)

Tuy nhiên, tính toán này chặn luồng chính đang chạy mã. Khi giá trị được tính toán bằng mã bất đồng bộ, chúng ta có thể đánh dấu hàm simple với từ khóa suspend, để nó có thể thực hiện công việc mà không chặn và trả kết quả dưới dạng một danh sách:

suspend fun simple(): List<Int> {
    delay(1000) // pretend we are doing something asynchronous here
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    simple().forEach { value -> println(value) } 
}

Bạn có thể lấy toàn bộ mã nguồn tại đây.


Mã này in các số sau khi đợi một giây.

1.3 Flows

Sử dụng kiểu kết quả List có nghĩa là chúng ta chỉ có thể trả tất cả các giá trị cùng một lúc. Để đại diện cho dãy giá trị đang được tính toán bất đồng bộ, chúng ta có thể sử dụng kiểu [Flow][Flow] giống như chúng ta sử dụng kiểu Sequence cho giá trị được tính toán đồng bộ:

fun simple(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // Collect the flow
    simple().collect { value -> println(value) } 
}

Bạn có thể lấy toàn bộ mã nguồn tại đây.


Đoạn mã này đợi 100ms trước khi in mỗi số mà không chặn luồng chính. Điều này được xác minh bằng cách in “I’m not blocked” mỗi 100ms từ một coroutine riêng chạy trong luồng chính:

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

Lưu ý những khác biệt sau trong mã với [Flow] từ các ví dụ trước:

  • Một hàm xây dựng kiểu [Flow] được gọi là [flow][_flow].
  • Mã bên trong khối xây dựng flow { ... } có thể tạm dừng.
  • Hàm simple không còn được đánh dấu bằng từ khóa suspend.
  • Giá trị được phát ra từ flow bằng cách sử dụng hàm [emit][FlowCollector.emit].
  • Giá trị được thu thập từ flow bằng cách sử dụng hàm [collect][collect].

Chúng ta có thể thay thế [delay] bằng Thread.sleep trong phần thân của flow { ... } của simple và thấy rằng luồng chính bị chặn trong trường hợp này.

2. Flows là các luồng cố định

Flows là các luồng lạnh tương tự như các dãy — mã bên trong khối xây dựng [flow] không chạy cho đến khi flow được thu thập. Điều này trở nên rõ ràng trong ví dụ sau:

fun simple(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling simple function...")
    val flow = simple()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}

Bạn có thể lấy mã đầy đủ từ đây.


In ra:

Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

Điều này là một lý do chính tại sao hàm simple (trả về một flow) không được đánh dấu bằng từ khóa suspend. Cuộc gọi simple() chính nó trả về nhanh chóng và không đợi đến bất cứ điều gì. Flow bắt đầu lại từ đầu mỗi khi nó được thu thập, đó là lý do tại sao chúng ta thấy “Flow started” mỗi khi gọi collect lại.

3. Cơ bản về huỷ một Flow

Flows tuân theo quy tắc chung về hủy hợp tác của các coroutine. Như thường lệ, thu thập flow có thể bị hủy khi flow tạm ngưng trong một hàm tạm ngưng có thể hủy (như [delay]). Ví dụ sau cho thấy cách flow bị hủy sau một khoảng thời gian khi chạy trong một khối [withTimeoutOrNull] và dừng thực thi mã của nó:

fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // Timeout after 250ms 
        simple().collect { value -> println(value) } 
    }
    println("Done")
}

Bạn có thể lấy mã đầy đủ từ đây.


Lưu ý cách chỉ có hai số được phát ra bởi flow trong hàm simple, tạo ra đầu ra như sau:

Emitting 1
1
Emitting 2
2
Done

Xem phần Kiểm tra hủy Flow để biết thêm chi tiết.

4. Các công cụ xây dựng Flow

Công cụ xây dựng flow { ... } từ các ví dụ trước đó là công cụ cơ bản nhất. Có các công cụ xây dựng khác cho phép khai báo flows:


* Công cụ xây dựng [flowOf] định nghĩa một flow phát ra một bộ giá trị cố định.
* Các bộ sưu tập và dãy số khác nhau có thể được chuyển đổi thành flows bằng cách sử dụng hàm mở rộng .asFlow().


Ví dụ, đoạn mã in ra các số từ 1 đến 3 từ một flow có thể được viết lại như sau:

// Convert an integer range to a flow
(1..3).asFlow().collect { value -> println(value) }

Bạn có thể lấy mã đầy đủ từ đây.

5. Toán tử trung gian của flow

Flows có thể được biến đổi bằng cách sử dụng các toán tử, giống như bạn sẽ biến đổi các bộ sưu tập và dãy số. Các toán tử trung gian được áp dụng cho một flow upstream và trả về một flow downstream. Các toán tử này là lạnh, giống như flows. Một cuộc gọi đến một toán tử như vậy không phải là một hàm tạm ngưng. Nó hoạt động nhanh chóng, trả về định nghĩa của một flow mới đã được biến đổi.


Các toán tử cơ bản mang các tên quen thuộc như [map] và [filter]. Một sự khác biệt quan trọng của những toán tử này so với các dãy số là các khối mã bên trong chúng có thể gọi các hàm tạm ngưng.


Ví dụ, một flow của các yêu cầu đến có thể được ánh xạ sang kết quả của nó với toán tử [map], ngay cả khi thực hiện một yêu cầu là một hoạt động kéo dài được triển khai bởi một hàm tạm ngưng:

suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}

Bạn có thể lấy mã đầy đủ từ đây.


Nó tạo ra ba dòng sau đây, mỗi dòng xuất hiện sau một giây:

response 1
response 2
response 3

5.1 Toán tử biến đổi

Trong số các toán tử biến đổi của flow, toán tử tổng quát nhất được gọi là [transform]. Nó có thể được sử dụng để mô phỏng các biến đổi đơn giản như [map] và [filter], cũng như triển khai các biến đổi phức tạp hơn. Bằng cách sử dụng toán tử transform, chúng ta có thể [phát ra][FlowCollector.emit] các giá trị tùy ý một số lần tùy ý.


Ví dụ, bằng cách sử dụng transform, chúng ta có thể phát ra một chuỗi trước khi thực hiện một yêu cầu không đồng bộ kéo dài và theo sau đó là một phản hồi:

1..3).asFlow() // a flow of requests
    .transform { request ->
        emit("Making request $request") 
        emit(performRequest(request)) 
    }
    .collect { response -> println(response) }

Bạn có thể lấy mã đầy đủ từ đây.


Đầu ra của đoạn mã này là:

Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

5.2 Toán tử giới hạn kích thước

Các toán tử trung gian giới hạn kích thước như [take] hủy thực thi flow khi giới hạn tương ứng được đạt đến. Hủy bỏ trong các coroutine luôn được thực hiện bằng cách ném một ngoại lệ, để tất cả các hàm quản lý tài nguyên (như các khối try { ... } finally { ... }) hoạt động bình thường trong trường hợp hủy bỏ:

fun numbers(): Flow<Int> = flow {
    try {                          
        emit(1)
        emit(2) 
        println("This line will not execute")
        emit(3)    
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // take only the first two
        .collect { value -> println(value) }
}    

Bạn có thể lấy mã đầy đủ từ đây.

Đầu ra của đoạn mã này rõ ràng cho thấy rằng việc thực thi phần thân của flow { ... } trong hàm numbers() đã dừng sau khi phát ra số thứ hai:

1
2
Finally in numbers

6. Toán tử luồng terminal

Các toán tử terminal trên luồng là hàm treo bắt đầu một bộ sưu tập của luồng. Toán tử [collect] là một trong những toán tử cơ bản nhất, nhưng còn các toán tử terminal khác, giúp làm cho nó dễ dàng hơn:


* Chuyển đổi thành các bộ sưu tập khác nhau như [toList] và [toSet].
* Các toán tử để lấy giá trị [first] và đảm bảo rằng một luồng phát ra một giá trị [single].
* Giảm luồng xuống một giá trị với [reduce] và [fold].
Ví dụ:

val sum = (1..5).asFlow()
    .map { it * it } // squares of numbers from 1 to 5                           
    .reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)

Bạn có thể lấy mã đầy đủ từ đây.

In ấn một số duy nhất:

55

7. Luồng là một tuần tự

Mỗi bộ sưu tập cá nhân của một luồng được thực hiện tuần tự trừ khi sử dụng các toán tử đặc biệt hoạt động trên nhiều luồng. Bộ sưu tập hoạt động trực tiếp trong coroutine gọi một toán tử terminal. Không có coroutine mới được khởi chạy theo mặc định. Mỗi giá trị được phát ra được xử lý bởi tất cả các toán tử trung gian từ phía trên đến phía dưới và sau đó được chuyển đến toán tử terminal.
Xem ví dụ sau đây lọc các số nguyên chẵn và ánh xạ chúng thành chuỗi:

(1..5).asFlow()
    .filter {
        println("Filter $it")
        it % 2 == 0              
    }              
    .map { 
        println("Map $it")
        "string $it"
    }.collect { 
        println("Collect $it")
    }  

Bạn có thể lấy mã đầy đủ từ đây.


Sản xuất:

Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

8. Ngữ cảnh của luồng

Bộ sưu tập của một luồng luôn xảy ra trong ngữ cảnh của coroutine gọi. Ví dụ, nếu có một luồng simple, thì đoạn mã sau chạy trong ngữ cảnh được chỉ định bởi tác giả của mã này, không phụ thuộc vào chi tiết triển khai của luồng simple:

withContext(context) {
    simple().collect { value ->
        println(value) // run in the specified context
    }
}

Thuộc tính này của một luồng được gọi là bảo tồn ngữ cảnh.


Do đó, theo mặc định, mã trong cấu trúc flow { ... } chạy trong ngữ cảnh được cung cấp bởi một bộ sưu tập của luồng tương ứng. Ví dụ, xem xét triển khai của hàm simple in ra luồng mà nó được gọi và phát ra ba số:

fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}  

fun main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") } 
}            

Bạn có thể lấy mã đầy đủ từ đây.


Chạy đoạn mã này sản xuất:

[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3

simple().collect được gọi từ luồng chính, phần thân của luồng simple cũng được gọi trong luồng chính. Điều này là mặc định hoàn hảo cho mã chạy nhanh hoặc mã không đồng bộ không quan tâm đến ngữ cảnh thực thi và không chặn caller.

8.1 Một điểm rơi vào bẫy thường gặp khi sử dụng withContext

Tuy nhiên, mã tiêu thụ CPU chạy lâu có thể cần được thực thi trong ngữ cảnh của [Dispatchers.Default] và mã cập nhật UI có thể cần được thực thi trong ngữ cảnh của [Dispatchers.Main]. Thông thường, [withContext] được sử dụng để thay đổi ngữ cảnh trong mã sử dụng coroutine Kotlin, nhưng mã trong cấu trúc flow { ... } phải tuân thủ thuộc tính bảo tồn ngữ cảnh và không được phép [phát][FlowCollector.emit] từ một ngữ cảnh khác.


Hãy thử chạy đoạn mã sau:

fun simple(): Flow<Int> = flow {
    // The WRONG way to change context for CPU-consuming code in flow builder
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
            emit(i) // emit next value
        }
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> println(value) } 
}            

Bạn có thể lấy mã đầy đủ từ đây.


Đoạn mã này tạo ra ngoại lệ sau đây:

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
		Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
		but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
		Please refer to 'flow' documentation or use 'flowOn' instead
	at ...

8.2 Toán tử flowOn

Ngoại lệ liên quan đến hàm [flowOn] nên được sử dụng để thay đổi ngữ cảnh của việc phát ra luồng. Cách đúng để thay đổi ngữ cảnh của một luồng được hiển thị trong ví dụ dưới đây, cũng in ra tên của các luồng tương ứng để chỉ ra cách mọi thứ hoạt động:

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder

fun main() = runBlocking<Unit> {
    simple().collect { value ->
        log("Collected $value") 
    } 
}            

Bạn có thể lấy mã đầy đủ từ đây.


Lưu ý cách flow { ... } hoạt động trong luồng nền, trong khi bộ sưu tập diễn ra trong luồng chính:

Một điều khác để quan sát ở đây là toán tử [flowOn] đã thay đổi tính tuần tự mặc định của luồng. Bây giờ bộ sưu tập diễn ra trong một coroutine (“coroutine#1”) và phát ra diễn ra trong một coroutine khác (“coroutine#2”) đang chạy trong một luồng khác đồng thời với coroutine đang thu thập. Toán tử [flowOn] tạo ra một coroutine khác cho một luồng nguồn khi nó cần thay đổi [CoroutineDispatcher] trong ngữ cảnh của nó.

9. Đệm (Buffering)

Chạy các phần khác nhau của một luồng trong các coroutine khác nhau có thể hữu ích từ góc độ thời gian tổng cộng để thu thập luồng, đặc biệt là khi có các hoạt động không đồng bộ chạy lâu. Ví dụ, xem xét trường hợp khi phát ra từ luồng simple chậm, mất 100 ms để tạo ra một phần tử; và bộ sưu tập cũng chậm, mất 300 ms để xử lý một phần tử. Hãy xem nó mất bao lâu để thu thập một luồng như vậy với ba số:

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple().collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
    }   
    println("Collected in $time ms")
}

Bạn có thể lấy mã đầy đủ từ đây.


Nó tạo ra một kết quả giống như sau, với toàn bộ quá trình thu thập mất khoảng 1200 ms (ba số, 400 ms cho mỗi số):

1
2
3
Collected in 1220 ms

Chúng ta có thể sử dụng một toán tử [buffer] trên một luồng để chạy mã phát ra của luồng simple đồng thời với mã thu thập, thay vì chạy chúng tuần tự:

val time = measureTimeMillis {
    simple()
        .buffer() // buffer emissions, don't wait
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")

Bạn có thể lấy mã đầy đủ từ đây.


Nó tạo ra các số giống nhưng nhanh hơn, vì chúng ta đã hiệu quả tạo ra một đường ống xử lý, chỉ cần chờ 100 ms cho số đầu tiên và sau đó chỉ tốn 300 ms để xử lý mỗi số. Như vậy, mất khoảng 1000 ms để chạy:

1
2
3
Collected in 1071 ms

Lưu ý rằng toán tử [flowOn] sử dụng cơ chế đệm giống nhau khi nó cần thay đổi [CoroutineDispatcher], nhưng ở đây chúng ta yêu cầu đệm một cách rõ ràng mà không thay đổi ngữ cảnh thực thi.

9.1 Hợp nhất (Conflation)

Khi một luồng đại diện cho các kết quả phần của hoạt động hoặc cập nhật trạng thái hoạt động, có thể không cần phải xử lý mỗi giá trị, mà chỉ cần giữ lại những giá trị mới nhất. Trong trường hợp này, toán tử [conflate] có thể được sử dụng để bỏ qua các giá trị trung gian khi một bộ sưu tập chậm quá để xử lý chúng. Dựa trên ví dụ trước:

val time = measureTimeMillis {
    simple()
        .conflate() // conflate emissions, don't process each one
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")

Bạn có thể lấy mã đầy đủ từ đây.


Chúng ta thấy rằng trong khi số đầu tiên vẫn đang được xử lý, số thứ hai và thứ ba đã được tạo ra, vì vậy số thứ hai đã bị hợp nhất và chỉ số mới nhất (số thứ ba) được gửi đến bộ sưu tập:

1
3
Collected in 758 ms

9.2 Xử lý giá trị mới nhất

Hợp nhất là một cách để tăng tố xử lý khi cả người phát ra và người sưu tập đều chậm. Điều này được thực hiện bằng cách loại bỏ các giá trị được phát ra. Cách khác là hủy bỏ một bộ sưu tập chậm và khởi động lại nó mỗi khi có một giá trị mới được phát ra. Có một loạt các toán tử xxxLatest thực hiện cùng logic cơ bản của một toán tử xxx, nhưng hủy bỏ mã trong khối của chúng khi có một giá trị mới được phát ra. Hãy thử chuyển từ [conflate] sang [collectLatest] trong ví dụ trước:

val time = measureTimeMillis {
    simple()
        .collectLatest { value -> // cancel & restart on the latest value
            println("Collecting $value") 
            delay(300) // pretend we are processing it for 300 ms
            println("Done $value") 
        } 
}   
println("Collected in $time ms")

Bạn có thể lấy mã đầy đủ từ đây.


Vì thân của [collectLatest] mất 300 ms, nhưng các giá trị mới được phát ra mỗi 100 ms, chúng ta thấy rằng khối chạy trên mỗi giá trị, nhưng chỉ hoàn thành cho giá trị cuối cùng:

Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms

10. Kết hợp nhiều luồng

Có nhiều cách để kết hợp nhiều luồng.

10.1 Zip

Giống như hàm mở rộng [Sequence.zip] trong thư viện tiêu chuẩn Kotlin, luồng có một toán tử [zip] kết hợp các giá trị tương ứng của hai luồng:

val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings 
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
    .collect { println(it) } // collect and print

Bạn có thể lấy mã đầy đủ từ đây.


Ví dụ này in ra:

1 -> one
2 -> two
3 -> three

10.2 Kết hợp (Combine)

Khi luồng đại diện cho giá trị mới nhất của một biến hoặc hoạt động (xem thêm phần liên quan về hợp nhất), có thể cần thực hiện một phép tính phụ thuộc vào các giá trị mới nhất của các luồng tương ứng và tính toán lại nó mỗi khi bất kỳ luồng nguồn nào phát ra một giá trị. Gia đình tương ứng của các toán tử được gọi là [combine].
Ví dụ, nếu các số trong ví dụ trước cập nhật mỗi 300ms, nhưng chuỗi cập nhật mỗi 400 ms,thì ghép chúng bằng cách sử dụng toán tử [zip] vẫn sẽ tạo ra kết quả giống nhau, mặc dù kết quả được in ra mỗi 400 ms:

Chúng ta sử dụng một toán tử trung gian [onEach] trong ví dụ này để trì hoãn mỗi phần tử và làm cho mã phát ra các luồng mẫu trở nên rõ ràng và ngắn gọn hơn.

val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time 
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 


Bạn có thể lấy mã đầy đủ từ đây.

Tuy nhiên, khi sử dụng toán tử [combine] ở đây thay vì [zip]:

val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms          
val startTime = System.currentTimeMillis() // remember the start time 
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 

Bạn có thể lấy mã đầy đủ từ đây.


Chúng ta nhận được một đầu ra khác biệt, nơi mỗi lần phát từ dòng nums hoặc strs đều in một dòng:

1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start

11. Làm phẳng các luồng

Các luồng đại diện cho các chuỗi giá trị được nhận bất đồng bộ, vì vậy rất dễ rơi vào tình huống mỗi giá trị kích thích một yêu cầu cho một chuỗi giá trị khác. Ví dụ, chúng ta có thể có một hàm sau trả về một luồng của hai chuỗi cách nhau 500 ms:

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500) // wait 500 ms
    emit("$i: Second")
}

Bây giờ nếu chúng ta có một luồng của ba số nguyên và gọi requestFlow trên mỗi số như sau:

(1..3).asFlow().map { requestFlow(it) }

Thì chúng ta sẽ có một luồng của các luồng (Flow<flow></flow) cần được làm phẳng thành một luồng duy nhất để tiếp tục xử lý.
Cho đối với việc làm phẳng, các bộ toán tử [flatten][Sequence.flatten] và [flatMap][Sequence.flatMap] của các bộ sưu tập và chuỗi có sẵn. Tuy nhiên, do tính bất đồng bộ của các luồng, chúng yêu cầu các chế độ làm phẳng khác nhau, và do đó, tồn tại một họ các toán tử làm phẳng trên các luồng.

11.1 flatMapConcat

Việc nối các luồng của các luồng được cung cấp bởi các toán tử [flatMapConcat] và [flattenConcat]. Chúng là những phương thức tương đương trực tiếp nhất của các toán tử tương ứng trên chuỗi. Chúng đợi cho đến khi luồng bên trong hoàn thành trước khi bắt đầu thu thập luồng tiếp theo như ví dụ sau đây thể hiện:

val startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms 
    .flatMapConcat { requestFlow(it) }                                                                           
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 

Bạn có thể lấy mã đầy đủ từ đây.


Tính tuần tự của [flatMapConcat] rõ ràng thấy trong đầu ra.

1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start

11.2 flatMapMerge

Một hoạt động làm phẳng khác là thu thập đồng thời tất cả các luồng đầu vào và hợp nhất giá trị của chúng thành một luồng duy nhất để giá trị được phát ra càng sớm càng tốt. Điều này được thực hiện bởi các toán tử [flatMapMerge] và [flattenMerge]. Cả hai đều chấp nhận một tham số concurrency tùy chọn giới hạn số lượng luồng đồng thời được thu thập cùng một lúc (mặc định là [DEFAULT_CONCURRENCY]).

val startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
    .flatMapMerge { requestFlow(it) }                                                                           
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 

Bạn có thể lấy mã đầy đủ từ đây.


Tính đồng thời của [flatMapMerge] rõ ràng:

1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start

Lưu ý rằng [flatMapMerge] gọi khối mã của nó ({ requestFlow(it) } trong ví dụ này) theo tuần tự, nhưng thu thập các luồng kết quả đồng thời, đây là tương đương với việc thực hiện một map { requestFlow(it) } tuần tự trước và sau đó gọi [flattenMerge] trên kết quả.

12.3 flatMapLatest

Tương tự như toán tử [collectLatest], mà đã được mô tả trong phần “Xử lý giá trị mới nhất”, có chế độ làm phẳng tương ứng “Latest” nơi việc thu thập luồng trước đó bị hủy ngay khi luồng mới được phát ra. Điều này được thực hiện bởi toán tử [flatMapLatest].

val startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
    .flatMapLatest { requestFlow(it) }                                                                           
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 

Bạn có thể lấy mã đầy đủ từ đây.


Đầu ra ở đây trong ví dụ này là một minh họa tốt về cách [flatMapLatest] hoạt động:

1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start

Lưu ý rằng [flatMapLatest] hủy toàn bộ mã trong khối của nó ({ requestFlow(it) } trong ví dụ này) khi nhận được một giá trị mới. Điều này không làm thay đổi trong ví dụ cụ thể này, vì cuộc gọi đến requestFlow chính nó nhanh chóng, không treo, và không thể bị hủy bỏ. Tuy nhiên, sự khác biệt trong đầu ra sẽ rõ ràng nếu chúng ta sử dụng các hàm treo như delay trong requestFlow.

12. Các ngoại lệ của Flow

Thu thập Flow có thể hoàn tất với một ngoại lệ khi bộ phát hoặc mã bên trong các toán tử ném ra một ngoại lệ. Có một số cách để xử lý những ngoại lệ này.

12.1 Sử dụng try-catch trong Collector

Một collector có thể sử dụng khối [try/catch][exceptions] của Kotlin để xử lý các ngoại lệ:

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value ->         
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}            

Bạn có thể lấy mã đầy đủ từ đây.


Đoạn mã này thành công bắt lấy một ngoại lệ trong toán tử terminal [collect] và, như chúng ta thấy, không còn giá trị nào được phát ra sau đó:

Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2

12.2 Bắt tất cả mọi thứ

Ví dụ trước thực sự bắt mọi ngoại lệ xảy ra trong bộ phát hoặc trong bất kỳ toán tử trung gian hoặc terminal nào. Ví dụ, hãy thay đổi mã để giá trị được phát ra được [ánh xạ][map] thành chuỗi, nhưng mã tương ứng tạo ra một ngoại lệ:

fun simple(): Flow<String> = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}            

Bạn có thể lấy mã đầy đủ từ đây.


Ngoại lệ này vẫn được bắt và quá trình thu thập bị dừng lại:

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

13. Tính minh bạch của ngoại lệ

Nhưng làm thế nào mã của bộ phát có thể đóng gói hành vi xử lý ngoại lệ của nó?


Các luồng phải là minh bạch đối với ngoại lệ và việc [emit] giá trị trong khối flow { ... } từ bên trong khối try/catch là vi phạm của tính minh bạch đối với ngoại lệ. Điều này đảm bảo rằng một collector ném một ngoại lệ có thể luôn bắt nó bằng cách sử dụng try/catch như trong ví dụ trước đó.


Bộ phát có thể sử dụng một toán tử [catch] giữ tính minh bạch này về ngoại lệ và cho phép đóng gói xử lý ngoại lệ của nó. Cơ thể của toán tử catch có thể phân tích một ngoại lệ và phản ứng với nó theo các cách khác nhau tùy thuộc vào ngoại lệ nào đã được bắt:


* Ngoại lệ có thể được ném lại bằng cách sử dụng throw.
* Ngoại lệ có thể được chuyển thành phát ra giá trị bằng cách sử dụng [emit][FlowCollector.emit] từ cơ thể của [catch].
* Ngoại lệ có thể bị bỏ qua, ghi nhật ký, hoặc được xử lý bởi một số mã khác.


Ví dụ, hãy phát ra văn bản khi bắt được một ngoại lệ:

simple()
    .catch { e -> emit("Caught $e") } // emit on exception
    .collect { value -> println(value) }

Bạn có thể lấy mã đầy đủ từ đây.


Đầu ra của ví dụ vẫn giống nhau, mặc dù chúng ta không còn có try/catch xung quanh mã nữa.

13.1 Bắt minh bạch

Toán tử trung gian [catch], tôn trọng tính minh bạch đối với ngoại lệ, chỉ bắt các ngoại lệ từ nguồn cấp (đó là ngoại lệ từ tất cả các toán tử ở phía trên catch, nhưng không phải dưới nó). Nếu khối trong collect { ... } (đặt dưới catch) ném một ngoại lệ, nó sẽ thoát:

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> println("Caught $e") } // does not catch downstream exceptions
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}            

Bạn có thể lấy mã đầy đủ từ đây.


Một thông điệp “Bắt được …” không được in ra mặc dù có một toán tử catch:

Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
	at ...

13.2 Bắt một cách rõ ràng

Chúng ta có thể kết hợp tính rõ ràng của toán tử [catch] với mong muốn xử lý tất cả các ngoại lệ, bằng cách di chuyển cơ thể của toán tử [collect] vào [onEach] và đặt nó trước toán tử catch. Việc thu thập của luồng này phải được kích hoạt bằng cách gọi collect() mà không có tham số:

simple()
    .onEach { value ->
        check(value <= 1) { "Collected $value" }                 
        println(value) 
    }
    .catch { e -> println("Caught $e") }
    .collect()

Bạn có thể lấy mã đầy đủ từ đây.


Bây giờ chúng ta có thể thấy một thông điệp “Bắt được …” được in ra và vì vậy chúng ta có thể bắt tất cả các ngoại lệ mà không cần sử dụng một khối try/catch một cách tường minh:

Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2

14. Hoàn thành luồng

Khi việc thu thập luồng hoàn tất (bình thường hoặc ngoại lệ), có thể cần thực hiện một hành động. Như bạn có thể đã thấy, điều này có thể được thực hiện theo hai cách: bằng cách thực hiện hoặc bằng cách rõ ràng.

14.1 Biểu diễn Khối finally

Ngoài try/catch, một bộ thu thập cũng có thể sử dụng một khối finally để thực hiện một hành động khi việc collect hoàn tất.

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } finally {
        println("Done")
    }
}  

Bạn có thể lấy mã đầy đủ từ đây.


Mã này in ra ba số được tạo ra bởi luồng simple theo sau bởi chuỗi “Done”:

1
2
3
Done

14.2 Xử lý một cách rõ ràng

Đối với cách tiếp cận rõ ràng, luồng có toán tử trung gian [onCompletion] được gọi khi luồng đã được thu thập hoàn toàn.
Ví dụ trước đó có thể được viết lại bằng cách sử dụng một toán tử [onCompletion] và tạo ra cùng đầu ra:

simple()
    .onCompletion { println("Done") }
    .collect { value -> println(value) }

Bạn có thể lấy mã đầy đủ từ đây.

Ưu điểm chính của [onCompletion] là một tham số Throwable có thể là giá trị null của lambda có thể được sử dụng để xác định liệu việc thu thập luồng đã hoàn tất một cách bình thường hay ngoại lệ. Trong ví dụ sau, luồng simple ném một ngoại lệ sau khi phát ra số 1:

fun simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}            

Bạn có thể lấy mã đầy đủ từ đây.


Như bạn có thể mong đợi, nó in ra:

1
Flow completed exceptionally
Caught exception

Toán tử [onCompletion], khác với [catch], không xử lý ngoại lệ. Như chúng ta có thể thấy từ mã ví dụ ở trên, ngoại lệ vẫn chảy xuống dòng. Nó sẽ được chuyển đến các toán tử [onCompletion] tiếp theo và có thể được xử lý với một toán tử catch.

14.3 Hoàn tất thành công

Một sự khác biệt khác với toán tử [catch] là [onCompletion] thấy tất cả các ngoại lệ và nhận một ngoại lệ null chỉ khi hoàn tất thành công của luồng nguồn cấp (mà không bị hủy hoặc thất bại).

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}

Bạn có thể lấy mã đầy đủ từ đây.


Chúng ta có thể thấy rằng nguyên nhân hoàn tất không phải là null, vì luồng đã bị hủy do ngoại lệ ở hạng mục dưới:

1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2

15. Chế độ biểu diễn so với chế độ rõ ràng

Bây giờ chúng ta biết cách thu thập luồng và xử lý hoàn thành cũng như ngoại lệ của nó theo cả hai cách biểu diễn và rõ ràng. Câu hỏi tự nhiên ở đây là, phương pháp nào được ưa chuộng và tại sao? Là một thư viện, chúng tôi không ủng hộ bất kỳ phương pháp cụ thể nào và tin rằng cả hai lựa chọn đều hợp lý và nên được chọn dựa trên sở thích và phong cách mã nguồn của bạn.

16. Bắt đầu luồng

Rất dễ dàng sử dụng luồng để biểu diễn các sự kiện không đồng bộ đang đến từ một nguồn nào đó. Trong trường hợp này, chúng ta cần một phương trình của hàm addEventListener đăng ký một đoạn mã với phản ứng cho các sự kiện đang đến và tiếp tục công việc tiếp theo. Toán tử [onEach] có thể đóng vai trò này. Tuy nhiên, onEach là một toán tử trung gian. Chúng ta cũng cần một toán tử terminal để thu thập luồng. Nếu không, việc gọi đơn giản là onEach sẽ không có tác dụng.
Nếu chúng ta sử dụng toán tử terminal [collect] sau onEach, thì mã sau nó sẽ đợi cho đến khi luồng được thu thập:

// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- Collecting the flow waits
    println("Done")
}   

Bạn có thể lấy mã đầy đủ từ đây.


Như bạn có thể thấy, nó in ra:

Event: 1
Event: 2
Event: 3
Done

Toán tử terminal [launchIn] rất hữu ích ở đây. Bằng cách thay thế collect bằng launchIn, chúng ta có thể bắt đầu một bộ thu thập của luồng trong một coroutine riêng biệt, để sau đó, việc thực thi của mã tiếp theo sẽ ngay lập tức tiếp tục:

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- Launching the flow in a separate coroutine
    println("Done")
}  

Bạn có thể lấy mã đầy đủ từ đây.


Nó in ra:

Done
Event: 1
Event: 2
Event: 3

Tham số yêu cầu cho launchIn phải chỉ định một [CoroutineScope] trong đó coroutine để thu thập luồng được bắt đầu. Trong ví dụ trên, phạm vi này đến từ builder coroutine [runBlocking], vì vậy trong khi luồng đang chạy, phạm vi [runBlocking] này đợi cho đến khi coroutine con của nó hoàn tất và giữ cho hàm chính từ việc trả về và kết thúc ví dụ này.


Trong ứng dụng thực tế, một phạm vi sẽ đến từ một thực thể với tuổi thọ có hạn. Ngay khi tuổi thọ của thực thể này kết thúc, phạm vi tương ứng bị hủy bỏ, hủy bỏ thu thập của luồng tương ứng. Như vậy, cặp onEach { ... }.launchIn(scope) hoạt động giống như addEventListener. Tuy nhiên, không cần thiết phải có hàm removeEventListener tương ứng, vì hủy bỏ và công cụ lập trình có cấu trúc phục vụ mục đích này.

Lưu ý rằng [launchIn] cũng trả về một [Job], có thể được sử dụng để [cancel] coroutine thu thập luồng tương ứng chỉ mà không hủy toàn bộ phạm vi hoặc [Job] nó.

16.1 Kiểm tra hủy luồng

Vì sự thuận tiện, builder [flow] thực hiện thêm các kiểm tra [ensureActive] cho việc hủy bỏ trên mỗi giá trị được phát ra. Điều này có nghĩa là một vòng lặp bận rộn phát ra từ flow { ... } có thể bị hủy bỏ:

fun foo(): Flow<Int> = flow { 
    for (i in 1..5) {
        println("Emitting $i") 
        emit(i) 
    }
}

fun main() = runBlocking<Unit> {
    foo().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

Bạn có thể lấy mã đầy đủ từ đây.


Chúng ta chỉ nhận được các số lên đến 3 và một [CancellationException] sau khi cố gắng phát ra số 4:

Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c

Tuy nhiên, hầu hết các toán tử luồng khác không thực hiện thêm các kiểm tra hủy bỏ riêng cho hiệu suất. Ví dụ, nếu bạn sử dụng phần mở rộng [IntRange.asFlow] để viết một vòng lặp bận rộn tương tự và không treo ở bất kỳ nơi nào, thì không có kiểm tra hủy bỏ:

fun main() = runBlocking<Unit> {
    (1..5).asFlow().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

Bạn có thể lấy mã đầy đủ từ đây.


Tất cả các số từ 1 đến 5 đều được thu thập và việc hủy bỏ chỉ được phát hiện trước khi trả về từ runBlocking:

1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23

16.2 Làm cho luồng bận rộn có thể hủy bỏ

Trong trường hợp bạn có một vòng lặp bận rộn với các coroutine, bạn phải kiểm tra hủy bỏ một cách tường minh. Bạn có thể thêm .onEach { currentCoroutineContext().ensureActive() }, nhưng cũng có một toán tử sẵn có để thực hiện điều đó là [cancellable]:

fun main() = runBlocking<Unit> {
    (1..5).asFlow().cancellable().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

Bạn có thể lấy mã đầy đủ từ đây.


Với toán tử cancellable, chỉ có các số từ 1 đến 3 được thu thập:

1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365

17. Luồng và Reactive Streams

Đối với những người quen thuộc với Reactive Streams hoặc các framework phản ứng như RxJava và project Reactor, thiết kế của Flow có vẻ rất quen thuộc.


Thực sự, thiết kế của nó được lấy cảm hứng từ Reactive Streams và các hiện thực khác của nó. Nhưng mục tiêu chính của Flow là có thiết kế đơn giản nhất có thể, thân thiện với Kotlin và sự treo không và tôn trọng công việc lập trình có cấu trúc. Đạt được mục tiêu này sẽ không thể nếu không có những người tiên phong trong lĩnh vực phản ứng và công việc to lớn của họ. Bạn có thể đọc câu chuyện đầy đủ trong bài viết Reactive Streams and Kotlin Flows.


Mặc dù khác biệt về khái niệm, Flow một luồng phản ứng và có khả năng chuyển đổi nó thành Publisher phản ứng (tuân theo quy định spec và TCK) và ngược lại. Các bộ chuyển đổi như vậy được cung cấp bởi kotlinx.coroutines ngay từ khi cài đặt và có thể được tìm thấy trong các mô-đun phản ứng tương ứng (kotlinx-coroutines-reactive cho Reactive Streams, kotlinx-coroutines-reactor cho Project Reactor và kotlinx-coroutines-rx2/kotlinx-coroutines-rx3 cho RxJava2/RxJava3). Các mô-đun tích hợp bao gồm chuyển đổi từ và sang Flow, tích hợp với Context của Reactor và cách thân thiện với việc tạm dừng để làm việc với các thực thể phản ứng khác nhau.

Tạm biệt tại Cafedev, nơi chúng ta đã cùng nhau khám phá thế giới của Kotlin với Asynchronous Flow. Cảm ơn bạn đã đồng hành và chia sẻ đam mê lập trình tại ngôi nhà trí tuệ này. Cafedev không chỉ là nơi để học hỏi, mà còn là không gian tận hưởng niềm vui từ sự sáng tạo và kết nối. Chúng ta đã đồng lòng xây dựng cộng đồng, để niềm đam mê lập trình không ngừng lan tỏa. Hẹn gặp lại bạn tại Cafedev với những chủ đề mới, những hành trình khám phá mới!”

Các nguồn kiến thức MIỄN PHÍ VÔ GIÁ từ cafedev tại đây

Nếu bạn thấy hay và hữu ích, bạn có thể tham gia các kênh sau của Cafedev để nhận được nhiều hơn nữa:

Chào thân ái và quyết thắng!

Đăng ký kênh youtube để ủng hộ Cafedev nha các bạn, Thanks you!