Chào mừng độc giả Cafedev đến với mảng kiến thức mới trên hành trình lập trình – Kotlin với Channels. Được biết đến với tính linh hoạt và hiệu suất, Kotlin Channels mở ra một thế giới mới của lập trình bất đồng bộ. Cùng chia sẻ kiến thức và trải nghiệm với Cafedev về cách Channels làm cho việc xử lý đa nhiệm trở nên đơn giản và mạnh mẽ. Hãy đồng hành cùng chúng tôi, khám phá sức mạnh của Kotlin Channels và ứng dụng chúng trong thế giới lập trình hiện đại.

Giá trị Deferred cung cấp một cách thuận tiện để truyền một giá trị duy nhất giữa các coroutine. Channels cung cấp một cách để truyền một luồng giá trị.

1. Cơ bản về Channel

Một Channel về mặt khá tương tự như BlockingQueue. Một sự khác biệt chính là thay vì có một hoạt động put chặn, nó có một send tạm dừng và thay vì có một hoạt động take chặn, nó có một receive tạm dừng.

val channel = Channel<Int>()
launch {
    // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
    for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")

Bạn có thể lấy mã đầy đủ tại đây.


Đầu ra của đoạn mã này là:

1
4
9
16
25
Done!

2. Đóng và vòng lặp trong channels

Khác với một hàng đợi, một channel có thể được đóng để chỉ định không có thêm phần tử nào sẽ đến. Trên phía bên nhận, việc sử dụng một vòng lặp for thông thường để nhận các phần tử từ channel là tiện lợi.


Về mặt khái niệm, một close giống như việc gửi một mã đặc biệt để đóng kênh. Việc lặp sẽ dừng ngay lập tức khi token đóng này được nhận, vì vậy có một đảm bảo rằng tất cả các phần tử đã được gửi trước khi đóng sẽ được nhận:

val channel = Channel<Int>()
launch {
    for (x in 1..5) channel.send(x * x)
    channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")

Bạn có thể lấy mã đầy đủ tại đây.

3. Xây dựng nhà sản xuất channel

Mô hình mà một coroutine đang sản xuất một chuỗi các phần tử khá phổ biến. Điều này là một phần của mô hình producer-consumer thường được tìm thấy trong mã nguồn đồng thời. Bạn có thể trừu tượng hóa một nhà sản xuất như vậy thành một hàm nhận kênh(channel) làm tham số của nó, nhưng điều này đi ngược lại ý thức thông thường là kết quả phải được trả về từ hàm.


Có một builder coroutine thuận tiện có tên là produce giúp việc thực hiện từ phía nhà sản xuất và một hàm mở rộng consumeEach, thay thế một vòng lặp for ở phía người tiêu thụ:

val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")

Bạn có thể lấy mã đầy đủ tại đây.

4. Các đường ống(Pipelines)

Một đường ống là một mô hình trong đó một coroutine đang sản xuất một luồng giá trị, có thể là vô hạn:

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}


Và một coroutine hoặc nhiều coroutine khác đang tiêu thụ luồng đó, thực hiện một số xử lý và sản xuất một số kết quả khác. Trong ví dụ dưới đây, các số chỉ được bình phương:

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
}


Mã chính khởi động và kết nối toàn bộ đường ống:

val numbers = produceNumbers() // produces integers from 1 and on
val squares = square(numbers) // squares integers
repeat(5) {
    println(squares.receive()) // print first five
}
println("Done!") // we are done
coroutineContext.cancelChildren() // 

Bạn có thể lấy mã đầy đủ tại đây.

Tất cả các hàm tạo coroutine đều được định nghĩa như là các phần mở rộng trên CoroutineScope, để chúng ta có thể tin cậy vào concurrency có cấu trúc để đảm bảo rằng chúng ta không có coroutine toàn cầu lơ lửng trong ứng dụng của mình.

5. Số nguyên tố với đường ống

Hãy đưa đường ống lên cao với một ví dụ tạo số nguyên tố bằng một đường ống của coroutines. Chúng ta bắt đầu với một chuỗi số vô hạn.

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}


Các giai đoạn đường ống sau lọc một luồng số đang đến, loại bỏ tất cả các số chia hết cho số nguyên tố cụ thể:

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
}


Bây giờ chúng ta xây dựng đường ống của mình bằng cách bắt đầu một luồng số từ 2, lấy một số nguyên tố từ kênh hiện tại và khởi chạy giai đoạn đường ống mới cho mỗi số nguyên tố được tìm thấy:
Plain TextnumbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...


Ví dụ sau in ra mười số nguyên tố đầu tiên, chạy toàn bộ đường ống trong ngữ cảnh của luồng chính. Vì tất cả các coroutines được khởi chạy trong phạm vi của coroutine [runBlocking] chính, chúng ta không cần phải duy trì một danh sách rõ ràng của tất cả các coroutines chúng ta đã khởi chạy. Chúng ta sử dụng hàm mở rộng [cancelChildren][kotlin.coroutines.CoroutineContext.cancelChildren] để hủy bỏ tất cả các coroutine con sau khi đã in ra mười số nguyên tố đầu tiên.

var cur = numbersFrom(2)
repeat(10) {
    val prime = cur.receive()
    println(prime)
    cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // cancel all children to let main finish

Bạn có thể lấy mã đầy đủ tại đây.


Đầu ra của đoạn mã này là:

2
3
5
7
11
13
17
19
23
29

Lưu ý rằng bạn có thể xây dựng cùng một đường ống bằng cách sử dụng iterator builder coroutine từ thư viện chuẩn. Thay thế produce bằng iterator, send bằng yield, receive bằng next, ReceiveChannel bằng Iterator, và loại bỏ coroutine scope. Bạn sẽ không cần runBlocking hoặc. Tuy nhiên, ưu điểm của một đường ống sử dụng channels như đã hiển thị ở trên là nó thực sự có thể sử dụng nhiều lõi CPU nếu bạn chạy nó trong ngữ cảnh Dispatchers.Default.


Dù sao, đây là một cách vô cùng không hiệu quả để tìm số nguyên tố. Trong thực tế, đường ống thường liên quan đến một số cuộc gọi tạm dừng khác (như cuộc gọi không đồng bộ đến các dịch vụ từ xa) và những đường ống này không thể được xây dựng bằng sequence/iterator, vì chúng không cho phép tạm dừng tùy ý, khác với produce, mà là hoàn toàn không đồng bộ.

6. Fan-out

Nhiều coroutine có thể nhận từ cùng một channel, phân phối công việc giữa chúng. Hãy bắt đầu với một coroutine sản xuất mà đều đặn sản xuất các số nguyên (mười số mỗi giây):

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}


Sau đó, chúng ta có thể có nhiều coroutine xử lý. Trong ví dụ này, chúng chỉ in ra id và số nhận được:

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }
}


Bây giờ hãy khởi chạy năm bộ xử lý và để họ làm việc gần một giây. Xem điều gì sẽ xảy ra:

val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all

Bạn có thể lấy mã đầy đủ tại đây.


Kết quả sẽ tương tự như sau, mặc dù các id của bộ xử lý nhận mỗi số nguyên cụ thể có thể khác nhau:

Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10

Lưu ý rằng việc hủy bỏ một coroutine sản xuất đóng kênh của nó, do đó cuối cùng làm chấm dứt vòng lặp qua channel mà các coroutine xử lý đang thực hiện.


Cũng chú ý cách chúng ta rõ ràng lặp qua kênh với vòng lặp for để thực hiện fan-out trong mã launchProcessor. Khác với consumeEach, mẫu vòng lặp for này hoàn toàn an toàn để sử dụng từ nhiều coroutine. Nếu một trong những coroutine xử lý thất bại, thì các coroutine khác vẫn sẽ tiếp tục xử lý kênh, trong khi một processor được viết thông qua consumeEach luôn tiêu thụ (hủy) kênh cơ sở khi hoàn thành bình thường hoặc bất thường.

7. Fan-in

Nhiều coroutine có thể gửi đến cùng một kênh. Ví dụ, hãy có một kênh chứa chuỗi và một hàm treo có thể lặp đi lặp lại gửi một chuỗi cụ thể đến kênh này với độ trễ cụ thể:

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}


Bây giờ, hãy xem điều gì sẽ xảy ra nếu chúng ta khởi chạy một số coroutine gửi chuỗi (trong ví dụ này, chúng ta khởi chạy chúng trong ngữ cảnh của luồng chính như là con của coroutine chính):

val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
    println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish

Bạn có thể xem toàn bộ mã tại đây.


Kết quả là:

foo
foo
BAR!
foo
foo
BAR!

8. Buffered channels

Các kênh hiển thị cho đến nay không có bộ đệm. Kênh không đệm chuyển các phần tử khi người gửi và người nhận gặp nhau (gọi là hẹn hò). Nếu gọi send trước, thì nó bị tạm ngưng cho đến khi receive được gọi, nếu gọi receive trước, nó sẽ bị tạm ngưng cho đến khi send được gọi.
Cả hàm tạo Channel() và bộ xây dựng produce đều nhận một tham số tùy chọn là capacity để chỉ định kích thước bộ đệm. Bộ đệm cho phép người gửi gửi nhiều phần tử trước khi tạm ngưng, tương tự như BlockingQueue với một dung lượng đã chỉ định, nó sẽ chặn khi bộ đệm đầy.
Hãy xem hành vi của đoạn mã sau:

val channel = Channel<Int>(4) // create buffered channel
val sender = launch { // launch sender coroutine
    repeat(10) {
        println("Sending $it") // print before sending each element
        channel.send(it) // will suspend when buffer is full
    }
}
// don't receive anything... just wait....
delay(1000)
sender.cancel() // cancel sender coroutine

Bạn có thể xem toàn bộ mã tại đây.


Nó in “sending” năm lần sử dụng một kênh có bộ đệm với dung lượng là bốn:

Sending 0
Sending 1
Sending 2
Sending 3
Sending 4

Bốn phần tử đầu tiên được thêm vào bộ đệm và người gửi bị tạm ngưng khi cố gắng gửi phần tử thứ năm.

9 Các Kênh được xử lý công bằng

Các thao tác gửi và nhận đối với các kênh là công bằng đối với thứ tự của chúng được gọi từ nhiều coroutine. Chúng được phục vụ theo thứ tự đầu tiên vào đầu tiên ra, ví dụ: coroutine đầu tiên gọi receive sẽ nhận được phần tử. Trong ví dụ sau, hai coroutine “ping” và “pong” đang nhận đối tượng “ball” từ kênh “table” chung.

data class Ball(var hits: Int)

fun main() = runBlocking {
    val table = Channel<Ball>() // a shared table
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0)) // serve the ball
    delay(1000) // delay 1 second
    coroutineContext.cancelChildren() // game over, cancel them
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // receive the ball in a loop
        ball.hits++
        println("$name $ball")
        delay(300) // wait a bit
        table.send(ball) // send the ball back
    }
}

Bạn có thể xem toàn bộ mã tại đây.


Coroutine “ping” được khởi chạy trước, vì vậy nó là coroutine đầu tiên nhận được bóng. Mặc dù coroutine “ping” ngay lập tức bắt đầu nhận bóng ngay sau khi gửi nó trở lại bàn, bóng lại được coroutine “pong” nhận được, vì nó đã đợi nó từ trước đó:

ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)

Lưu ý rằng đôi khi các kênh có thể tạo ra các thực hiện có vẻ không công bằng do tính chất của bộ thực thi đang được sử dụng. Xem vấn đề này để biết chi tiết.

19. Kênh Ticker

Kênh Ticker là một kênh hẹn hò đặc biệt tạo ra Unit mỗi lần giãn cách được cho phép kể từ lần tiêu thụ cuối cùng từ kênh này. Mặc dù có vẻ là vô ích khi đứng một mình, nó là một khối xây dựng hữu ích để tạo ra các [produce] pipelines và toán tử phức tạp khác thực hiện xử lý phụ thuộc vào thời gian. Kênh Ticker có thể được sử dụng trong select để thực hiện hành động on tick.


Để tạo một kênh như vậy, sử dụng một phương thức nhà máy là [ticker]. Để chỉ định rằng không cần thêm phần tử, hãy sử dụng phương thức ReceiveChannel.cancel trên nó.
Bây giờ hãy xem nó hoạt động trong thực tế:

fun main() = runBlocking<Unit> {
    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement") // no initial delay

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay
    println("Next element is not ready in 50 ms: $nextElement")

    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 100 ms: $nextElement")

    // Emulate large consumption delays
    println("Consumer pauses for 150ms")
    delay(150)
    // Next element is available immediately
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")
    // Note that the pause between `receive` calls is taken into account and next element arrives faster
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } 
    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")

    tickerChannel.cancel() // indicate that no more elements are needed
}

Bạn có thể xem toàn bộ mã tại đây.


Nó in ra các dòng sau:

Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit

Lưu ý rằng ticker nhận thức được các tạm ngưng có thể xảy ra và, theo mặc định, điều chỉnh độ trễ của phần tử sản xuất tiếp theo nếu có tạm ngưng, cố gắng duy trì một tỷ lệ cố định của các phần tử được sản xuất.
Theo tùy chọn, một tham số mode bằng TickerMode.FIXED_DELAY có thể được chỉ định để duy trì một độ trễ cố định giữa các phần tử.

Cảm ơn độc giả Cafedev đã dành thời gian theo dõi cùng chúng tôi hành trình khám phá về Kotlin with Channels. Đừng quên đăng ký nhận thông báo để không bỏ lỡ những bài viết mới và cập nhật từ Cafedev. Chúng tôi hy vọng rằng thông qua những chia sẻ này, bạn đã có cái nhìn sâu sắc hơn về cách Kotlin Channels làm cho lập trình trở nên hiệu quả và sáng tạo hơn. Hãy tiếp tục đồng hành và chia sẻ kiến thức cùng Cafedev, nơi mà lập trình không chỉ là công việc, mà còn là niềm đam mê và sự sáng tạo.”

Các nguồn kiến thức MIỄN PHÍ VÔ GIÁ từ cafedev tại đây

Nếu bạn thấy hay và hữu ích, bạn có thể tham gia các kênh sau của Cafedev để nhận được nhiều hơn nữa:

Chào thân ái và quyết thắng!

Đăng ký kênh youtube để ủng hộ Cafedev nha các bạn, Thanks you!