In this article we will dig a bit more depth in some core concepts on 2 hours to master RxSwift - Part 1.

Section 1: Observable

Like mentioned in the last article, an Observable in RxSwift will emit events (values and errors) over time, and notifies its observers, and nearly everything can be convert into Observable. There are few sub-types in the Observable or Observable Sequence: Single, Driver and Signal. They are used in different scenarios.

Single

In RxSwift, Single is a type of observable sequence that represents a stream of only one element, or an error if no element was emitted. With that in mind, we can use either .success(element) or .error(error) to handle the event.

enum MyError: Error {
  case someError
}

let single = Single<String>.create { single in
  // Do some operation that produces a single element or an error
  let success = Bool.random()
  if success {
    single(.success("Success!"))
  } else {
    single(.failure(MyError.someError))
  }
  return Disposables.create {}
}

single.subscribe(onSuccess: { element in
  print("Received element: \(element)")
}, onFailure: { error in
  print("Received error: \(error)")
}).disposed(by: disposeBag)

When should use Single?

Here are some scenarios where you might consider using Single:

  • Expect to return a single response, such as fetching a user profile or getting a configuration file.
  • When you want to chain together a series of operators and ensure that the final result is a single element, such as filtering and mapping an array to extract a specific object.

Driver

In RxSwift, a Driver is a special type of Observable that it observes on the main thread and will produce no errors.

import UIKit
import RxSwift
import RxCocoa

enum MyError: Error {
  case someError
}

class ViewController: UIViewController {
  
  @IBOutlet weak var emailTextField: UITextField!
  @IBOutlet weak var emailErrorInfo: UILabel!
  
  let disposeBag = DisposeBag()
  
  override func viewDidLoad() {
    super.viewDidLoad()
    // Do any additional setup after loading the view.
        
    let emailErrorInfoDriver = emailTextField
      .rx.text.orEmpty.asDriver()
      .map {
        $0.contains("!") ? "Invalid character" : ""
      }
    
    emailErrorInfoDriver
      .drive(emailErrorInfo.rx.text)
      .disposed(by: disposeBag)
  }  
}

We create a Driver from the text property of a UITextField, and then we transform it into another Driver that maps the input string to with validation rules. Finally, we bind the error check message Driver to a UILabel using the drive method.

Signal

Similar to the Driver, a Signal is another special type of Observable which observes on the main thread and will produce no errors, but one key difference is that Signal will not emit previous value on subscription while Driver will.

let buttonTap = okButton.rx.tap.asSignal()
buttonTap.emit(onNext: {
  print("Button tapped")
}).disposed(by: disposeBag)

In the example of Driver, which will generate a value even when started, we can monitor the values at line 4:

let emailErrorInfoDriver = emailTextField
  .rx.text.orEmpty.asDriver()
  .map {
    print(">>>> \($0)")	// monitoring the value change here
    return $0.contains("!") ? "Invalid character" : ""
  }

emailErrorInfoDriver
  .drive(emailErrorInfo.rx.text)
  .disposed(by: disposeBag)

Section 2: Subscriber

In RxSwift, a Subscriber is used to listen to the observable sequence (last post) and reacts to any events emitted by that sequence. It is also named Observer in other reactive programming paradigm. There are two main types.

AnyObserver

As its name indicates, AnyObserver is a type-erasing observer that can be used to wrap any kind of observer.

One example is:

let observable = Observable.just("Hello, world!")

let observer: AnyObserver<String> = AnyObserver { event in
  switch event {
    case .next(let value):
	  print("Received value: \(value)")
	case .error(let error):
	  print("Error occurred: \(error.localizedDescription)")
	case .completed:
	  print("Sequence completed")
	}
}

_ = observable.subscribe(observer)

We have an Observer which wrap a string type value. When the value changes, observable emit the next event, which will post to our observer. The observer will first receive the next and then completed event.

Binder

Binder is a type of observer that can be used to bind a stream of events to a property or view. Which means that whenever a value is emitted, the binder will automatically update the target with the new values. This is quite often seen in the view data binding.

Note that Binder will not handle the error like the other Observers.

One example as following:

let textObservable = Observable.just("Hello, world!")

let emailTextField = Binder<String>(self.emailTextField) { label, text in
  label.text = text
}

textObservable
  .bind(to: emailTextField.rx.text)
  .disposed(by: disposeBag)
output on iPhone

Section 3: Scheduler

Scheduler is the core module in RxSwift for implementing multi-threading. They are used to decide which thread or queue that tasks run on.

Subscribe(on:)

subscribe(on:) decides which Scheduler that a Observable sequence will be subscribed to.

Observe(on:)

Similarly, observe(on:) decides which Scheduler that subscriber will listen to.

let observable = Observable<Int>.create { observer in
  print("Observable on thread: \(Thread.current)")

  observer.onNext(1)
  observer.onNext(2)
  observer.onNext(3)
  observer.onCompleted()

  return Disposables.create()
}

_ = observable
.subscribe(on: ConcurrentDispatchQueueScheduler(qos: .background))
.observe(on: MainScheduler.instance)
.subscribe { event in
  print("Subscriber on thread: \(Thread.current)")
  print(event)
}


// output
Observable on thread: <NSThread: 0x6000000f6700>{number = 4, name = (null)}
Subscriber on thread: <_NSMainThread: 0x6000000b0580>{number = 1, name = main}
next(1)
Subscriber on thread: <_NSMainThread: 0x6000000b0580>{number = 1, name = main}
next(2)
Subscriber on thread: <_NSMainThread: 0x6000000b0580>{number = 1, name = main}
next(3)
Subscriber on thread: <_NSMainThread: 0x6000000b0580>{number = 1, name = main}
completed

With the above example, subscribe to specify that the sequence will be subscribed to on a background scheduler. We then use observe to switch to the main thread to receive events. For example we can get the data through api call via network in the background when subscribe an observer, and use the main thread to update the UI.

Section 4: Operators

In face, we have seen the operator a couple of times in the above tutorial already. Knowing that Operator is a function which transform Observable to another Observable. Take a look at the example again:

let observable = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

// Subscribe to the observable to receive its emitted values
_ = observable
.filter { $0 % 2 == 0 } // filter to even numbers
.subscribe(onNext: { value in
  print(value)
}, onCompleted: {
  print("Observable completed")
})

This example will filter an array giving the output a list of even numbers.

Not just that, Operators can be used to create, transform, combine, filter, or update Observables.

Just

The just operator in RxSwift creates an Observable that emits a single element and then completes immediately. Here's an example:

let observable = Observable.just("Hello, world!")

observable.subscribe(
  onNext: { element in
    print(element)
  },
  onError: { error in
    print(error)
  },
  onCompleted: {
    print("completed")
  }
).disposed(by: disposeBag)

In this example, observable emits a single element "Hello, world!" and then completes. Just is often used in the unit test.

From

The from operator will create an Observable which emits each element of an array one at a time. Here's an example:

let observable = Observable.from([1, 2, 3, 4, 5])

observable.subscribe(
  onNext: { element in
    print(element)
  },
  onError: { error in
    print(error)
  },
  onCompleted: {
    print("completed")
  }
).disposed(by: disposeBag)

In the above example, observable is created with 5 events. The output will be:

1
2
3
4
5

Create

create operator is used to create a custom Observable in RxSwift. Here is an example:

let customObservable = Observable<String>.create { observer in
  observer.onNext("One")
  observer.onNext("Two")
  observer.onNext("Three")
  observer.onError(CustomError.myError)
  observer.onNext("Four") // This value will not be emitted since an error has occurred
  observer.onCompleted()
  return Disposables.create()
}

customObservable.subscribe(
  onNext: { print($0) },
  onError: { print($0) },
  onCompleted: { print("completed") }
).disposed(by: disposeBag)

The output will be:

One
Two
Three
myError

Map

The map operator in RxSwift transforms the items emitted by an observable sequence into a new sequence of items. For example, we can convert one integer array to a decimal array. One typical example:

let observable = Observable.from([1, 2, 3, 4, 5])
observable.map {
  Decimal($0)
}.subscribe(
  onNext: { element in
    print(element)
  },
  onError: { error in
    print(error)
  },
  onCompleted: {
    print("completed")
  }
).disposed(by: disposeBag)

// output
1
2
3
4
5
completed

CombineLatest

The combineLatest operator in RxSwift combines the latest elements from multiple observables and returns a new observable that emits a combined observable, this is especially useful when we need to call multiple APIs and get the results together. For example, we call user api to get the user's basic information, call avatar api to get the user's image, however we will need to render the screen once two api finished.

Zip

zip operator will zip two operator sequence together and ignore the redundant part.

let firstObservable = Observable.from([1, 2, 3])
let secondObservable = Observable.from(["A", "B", "C", "D", "E"])

Observable.zip(firstObservable, secondObservable)
.subscribe(onNext: { first, second in
  print("\(first) \(second)")
})
.disposed(by: disposeBag)


// output
1 A
2 B
3 C