Bây giờ chúng ta đã biết chi tiết thế nào là Swift Combine trong bài trước, đã đến lúc tìm hiểu về Schedulers là gì?

  1. Những Schedulers nào dùng trong Combine?
  2. Làm cách nào để chuyển đổi giữa các Schedulers?
  3. Làm cách nào để thực hiện công việc không đồng bộ với Combine?
  4. Sử khác biệt giữa receive(on:)subscribe(on:)?

Định nghĩa một Scheduler

Scheduler là cơ chế đồng bộ hoá của Combine framework, trong đó nó sẽ định nghĩa bối cảnh cho nơi(where) và khi(when) công việc được thực hiện.

Combine không làm việc trực tiếp với threads. Thay vào đó, nó cho phép Publishers hoạt động trên các Scheduler cụ thể.

Nơi(where) có nghĩa là vòng lặp chạy hiện tại, hàng đợi gửi hoặc hàng đợi hoạt động.

Khi nào(when) có nghĩa là thời gian ảo, theo đồng hồ của scheduler. Công việc được thực hiện bởi scheduler sẽ chỉ tuân thủ theo đồng hồ của scheduler, và nó có thể không tương ứng với thời gian thực của hệ thống.

Các loại scheduler trong Combine

Combine framework cung cấp các loại scheduler khác nhau, nhưng tất cả đều phải conform với Scheduler protocol:

  • DispatchQueue. Thực hiện công việc trên một dispatch queues cụ thể: serial, concurrent, main và global. Bạn sẽ thường sử dụng hàng đợi serial và global cho công việc background và hàng đợi main cho công việc liên quan đến UI. Kể từ Xcode 11 GM Seed, không nên sử dụng hàng đợi đồng thời(concurrent).
  • OperationQueue. Thực hiện công việc trên một operation queue cụ thể. Tương tự như dispatch queues, hãy sử dụng OperationQueue.main cho công việc UI và các hàng đợi khác cho công việc background. Theo cuộc trò chuyện này trên diễn đàn Swift, nó không khuyến khích sử dụng operation queues với maxConcurrentOperations lớn hơn 1.
  • RunLoop. Thực hiện công việc trên một vòng lặp chạy cụ thể.
  • ImmediateScheduler. Thực hiện các hành động ngay lập tức. Nó sẽ chấm dứt ứng dụng với lỗi nghiêm trọng nếu bạn cố gắng thực hiện công việc bị trì hoãn với scheduler này.

Sử dụng RunLoop.main, DispatchQueue.main hoặc OperationQueue.main để thực hiện các công việc liên quan đến UI.

Ngoài ImmediateScheduler, Combine không giới thiệu bất kỳ loại scheduler mới nào cả. Thay vào đó, nó mở rộng các API đa luồng trong Swift để trở thành scheduler.

Scheduler mặc định

Ngay cả khi bạn không chỉ định bất kỳ scheduler nào, Combine sẽ cung cấp cho bạn một bộ mặc định. Scheduler sử dụng cùng một thread giống với các phần tử được tạo. Giả sử, nếu bạn gửi phần tử từ background thread thì bạn cũng sẽ nhận được nó trên background thread.

let subject = PassthroughSubject<Int, Never>()

let token = subject.sink(receiveValue: { value in
    print(Thread.isMainThread)
})

subject.send(1)

DispatchQueue.global().async {
    subject.send(2)
}

Line 1: In true nếu giá trị được nhận trên luồng chính và false nếu khác.

Line 7: Gửi 1 từ main thread.

Line 9: Gửi 2 từ background thread.

Nó sẽ in:

true
false

Như mong đợi, các giá trị được nhận trên các threads khác nhau.

Chuyển đổi giữa các Schedulers

Các hoạt động tiêu tốn tài nguyên thường được xử lý trong background, để giao diện người dùng không bị treo. Kết quả của nó sau đó được xử lý trên main thread. Combine làm điều này bằng cách chuyển đổi giữa các Schedulers thông qua việc sử dụng hai phương thức: subscribe(on:)receive(on:).

receive(on:)

Phương pháp này thay đổi một scheduler cho tất cả các publishers đi sau nó.

Just(1)
   .map { _ in print(Thread.isMainThread) }
   .receive(on: DispatchQueue.global())
   .map { print(Thread.isMainThread) }
   .sink { print(Thread.isMainThread) }

Nó sẽ in:

true
false
false

Quá trình này được hình dung như sau:

Tất cả các toán tử ở bên phải receive(on:)cung cấp các phần tử trên DispatchQueue.global() scheduler.

subscribe(on:)

Phương thức subscribe(on:) giúp thay đổi các scheduler và nó được sử dụng để thực hiện subscribe, cancel và request các hoạt động. Một chuỗi các combine sẽ ở trên cùng một scheduler trong suốt quá trình xử lý, trừ khi trong quá trình đó có gọi phương thức receive(on:) để chỉ định một scheduler khác.

Just(1)
   .subscribe(on: DispatchQueue.global())
   .map { _ in print(Thread.isMainThread) }
   .sink { print(Thread.isMainThread) }

Nó sẽ in:

false
false

Quá trình này được hình dung như sau:

Tất cả các hoạt động đều xảy ra trên scheduler DispatchQueue.global()

Vị trí subscribe(on:) không quan trọng, vì nó sẽ ảnh hưởng tới toàn bộ quá trình. Đoạn code này tương đương với đoạn trước:

Just(1) 
   .map { _ in print(Thread.isMainThread) }
   .subscribe(on: DispatchQueue.global()) // Position of subscribe(on:) has changed
   .sink { print(Thread.isMainThread) }

Mặc dù vị trí của subscribe(on:) đặt ở giữa nhưng tất cả các sự kiện và giá trị vẫn được nhận trên scheduler DispatchQueue.global().

Bạn có thể nhận thấy rằng khi gọi subscribe(on:) và không nói gì về scheduler mà chúng ta nhận được các giá trị. Trong trường hợp publisher phát ra các giá trị trên một thread khác, nó sẽ được nhận giá trị trên thread đó. Ví dụ điển hình là một publisher tác vụ dữ liệu:

URLSession.shared.dataTaskPublisher(for: URL(string: "https://www.vadimbulavin.com")!)
   .subscribe(on: DispatchQueue.main) // Subscribe on the main thread
   .sink(receiveCompletion: { _ in },
         receiveValue: { _ in
           print(Thread.isMainThread) // Are we on the main thread?
   })

Code sẽ in ra false, chỉ ra rằng publisher phát ra các giá trị trên background thread. Trong những trường hợp như vậy, chúng ta phải sử dụng receive(on:) để chỉ định một scheduler nào khác

Thực hiện công việc không đồng bộ với Combine

Cuối cùng, hãy xem cách chúng ta có thể chuyển đổi schedulers bằng cách kết hợp subscribe(on:)receive(on:).

Giả sử, chúng ta có một publisher với một nhiệm vụ dài hạn:

struct BusyPublisher: Publisher {
    typealias Output = Int
    typealias Failure = Never
    
    func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
        sleep(10)
        subscriber.receive(subscription: Subscriptions.empty)
        _ = subscriber.receive(1)
        subscriber.receive(completion: .finished)
    }
}

Khi được gọi từ UI thread, nó sẽ đóng treo ứng dụng trong 10 giây. Hãy nhớ rằng, Combine mặc định sẽ trên cùng một scheduler từ nơi phần tử được bắn:

BusyPublisher()
   .sink { _ in print("Received value") }

print("Hello")

Như mong đợi, Hello được in sau khi nhận được giá trị:

Received value
Hello

Mẫu phổ biến để thực hiện công việc không đồng bộ với Combine là đăng ký vào scheduler background và nhận các sự kiện trên UI scheduler:

BusyPublisher()
   .subscribe(on: DispatchQueue.global())
   .receive(on: DispatchQueue.main)
   .sink { _ in print("Received value") }

print("Hello")

Nó sẽ in:

Hello
Received value

Lần này Hello được in trước khi nhận được giá trị. Điều này có nghĩa là publisher không treo ứng dụng bằng cách chặn main thread.

Tổng kết

Tóm tắt những điểm chính cần nhớ:

  • subscribe(on:)receive(on:) là các phương thức đa luồng của Combine Swift framework.
  • Scheduler mặc định sử dụng cùng một thread từ nơi phần tử được tạo.
  • receive(on:) đặt scheduler cho tất cả các operators sau đó.
  • subscribe(on:) đặt scheduler cho toàn bộ luồng, bắt đầu từ thời điểm Publisher được đăng ký. Luồng vẫn ở trên cùng một lịch trình, cho đến khi gọi receive(on:) chỉ định một scheduler khác.
  • Vị trí gọi subscribe(on:) không quan trọng.
  • Công việc không đồng bộ thường được thực hiện bằng cách đăng ký vào background scheduler và nhận các giá trị trên UI scheduler.

Đăng ký kênh youtube để ủng hộ Cafedev nha các bạn, Thanks you!