본문 바로가기

ReactiveX

2. [RxSwift] Subject

이 글은 아래의 블로그를 많이 참고했습니다. 항상 좋은 글 많이 써주셔서 감사합니다.

https://sujinnaljin.medium.com/rxswift-subject-99b401e5d2e5

 

[RxSwift] Subject

Observable이랑 Subject 나만 헷갈려..?

sujinnaljin.medium.com

 

Subject와 Observable의 차이를 알아보자

Observable과 Subject는 하나의 매우 중요한 차이를 가진다. Observable은 단지 하나의 함수이기 때문에 어떤 상태도 가지지 않으므로 모든 새로운 Observer에 대해 관찰가능한 create 코드를 반복해서 실행한다. 코드는 각 관찰자에 대해 실행되므로 HTTP호출인 경우 각 관찰자에 대해 호출된다. 이로 인해 주요 버그와 비효율이 발생한다.
반면 subject는 관찰자 세부 정보를 저장하고 코드를 한 번만 실행하고 모든 관찰자에게 결과를 제공한다.
subject는 multicast 방식이고 observable은 unicast 방식이다. 

 

observable 이란?

  • Observable의 방출
    • Observable은 구독이 시작되기 전까지는 어떠한 데이터도 방출하지 않습니다.
    • 즉, 데이터의 방출은 subscribe 메서드가 호출될 때 시작됩니다.
  • 구독과 데이터 수신
    • subscribe 메서드를 호출하면 해당 Observable이 방출하는 데이터를 구독자가 수신할 수 있습니다.
    • 각 구독자는 Observable이 방출하는 데이터 스트림을 개별적으로 수신한다.
  • 구독의 독립성
    • 한 Observable에 여러 구독자가 구독할 수 있으며, 각 구독자는 독립적으로 데이터를 수신한다.
    • 이는 각 구독자가 Observable로 부터 동일한 데이터 시퀸스를 받지만 각 구독자의 상태가 서로 다를 수 있다. 
  • 한 번의 구독에서 데이터 수신
    • 한 번의 subscribe 호출로 구독자는 Observable이 완료되거나 구독이 해제될때까지 데이터를 계속 수신할 수 있다.
    • 구독 동안 Observable이 방출하는 모든 데이터는 해당 구독자에게 전달된다.
  • 데이터 스트림의 종료
    • Observable이 onCompleted나 onError 이벤트를 방출하면, 데이터 스트림은 종료되고 해당 구독자에게 더 이상의 데이터는 전달되지 않습니다.

observable의 unicast

  • unicast란 각각 subscribed된 observer가 observable에 대해 독립적인 실행을 갖는 것
  • 코드
// --- Observable ---
    let randomNumGenerator1 = Observable<Int>.create{ observer in
        observer.onNext(Int.random(in: 0 ..< 100))
        return Disposables.create()
    }
    
    randomNumGenerator1.subscribe(onNext: { (element) in
        print("observer 1 : \(element)")
    })
    randomNumGenerator1.subscribe(onNext: { (element) in
        print("observer 2 : \(element)")
    })
    
    --------------------print------------------
    
observer 1 : 54
observer 2 : 69

 

  • 다른 숫자가 출력된다. 
  • observer가 observable에 대해 독자적인 실행을 갖기 때문에, 동일한 observable 구독을 통해 생성된 두개의 observer라고 해도 observable이 각각 실행되면서 observer에게 서로 다른 값이 가는 것이다.

 

subject의 multicast

  • multicast란 하나의 observable 실행이 여러 subcriber에게 공유되는 걸 뜻한다.
  • 코드
// ------ BehaviorSubject/ Subject
    let randomNumGenerator2 = BehaviorSubject(value: 0)
    randomNumGenerator2.onNext(Int.random(in: 0..<100))
    
    randomNumGenerator2.subscribe(onNext: { (element) in
        print("observer subject 1 : \(element)")
    })
    randomNumGenerator2.subscribe(onNext: { (element) in
        print("observer subject 2 : \(element)")
    })

    --------------------print------------------

observer subject 1 : 92
observer subject 2 : 92

 

  • observer들에게 동일한 observable 실행이 도착한다.
  • 정리
    • observable
      • observable의 create는 항상 새롭게 실행된 결과
    • subject
      • subcribe 전에 수행된 onNext등의 하나의 공통된 것
Observable Subject
함수일 뿐, state 존재하지 않음 state를 가진다. data를 메모리에 저장
각각의 옵저버에 대해 코드가 실행 같은 코드 실행, 모든 옵저버에 대해 오직 한번만
오직 옵저버블만 생성 옵저버블 생성 및 그것을 관찰할 수 있음
용도 : 하나의 옵저버에 대한 간단한 observable 필요할 때 용도
1. 자주 데이터를 저장하고 수정할 때
2. 여러개의 옵저버가 데이터를 관찰해야할 때
3. 옵저버와 옵저버블 사이의 중개자 역할
  • data를 메모리에 저장하는 이유
    • PublishSubject : 구독 이후에 발생하는 이벤트만 구독자에게 전달한다. 이전에 발생한 데이터는 저장하지 않는다.
    • BehaviorSubject :  구독 시점에 가장 최근의 데이터(또는 초기값)를 구독자에게 전달한다. 가장 최근의 데이터는 내부적으로 메모리에 저장된다.
    • ReplaySubject : 구독하기 전에 발생한 모든 데이터를 저장하고, 새로운 구독자가 구독을 시작하면 저장된 모든 데이터를 전달한다. 이 데이터는 일정 크기의 버퍼에 저장된다.
    • AsyncSubject : Observable이 완료되었을 때(onCompleted 이벤트 발생) 마지막으로 발행된 데이터만 구독자에게 전달합니다. 마지막 데이터는 내부적으로 저장된다.

subject와 observable의 차이점

  • BehaviorSubject
public final class BehaviorSubject<Element>
    : Observable<Element>
    , SubjectType
    , ObserverType
    , SynchronizedUnsubscribeType
    , Disposable {
    //blah blah~~
}

 

  • BehaviorSubject는 Observable과 ObserverType을 따르고 있다.
  • subject가 observable이자 observer이라고 하는 이유이다.
  • ObserverType을 따르고 있기에 단순 Observable과는 차이가 있다.
public protocol ObserverType {
    /// The type of elements in sequence that observer can observe.
    associatedtype E

    /// Notify observer about sequence event.
    ///
    /// - parameter event: Event that occurred.
    func on(_ event: Event<E>)
}

 

  • ObserverType은 프로토콜인데 안에 on이라는 메소드가 정의되어 있다. 
  • ObserverType을 따르면 on() 을 구현해줘야 한다는 의미
  • Subject도 on()메서드가 구현되어 있다.
let subject = BehaviorSubject(value: "")
subject.onNext("개구리")

 

  • subject는 observerType으로 내부에 on 함수를 구현하고 있기 때문에 onNext()를 바로 호출할 수 있다.
/// Notifies all subscribed observers about next event.
    ///
    /// - parameter event: Event to send to the observers.
    public func on(_ event: Event<E>) {
        #if DEBUG
            _synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { _synchronizationTracker.unregister() }
        #endif
        dispatch(_synchronized_on(event), event)
    }

 

  • Notifies all subcribed observers about next event : 모든 subscribe된 observer들에게 nextEvent를 전달한다.
  • subject는 하나의 observer를 가지고 있는게 아니라 다수의 observers들을 가지고 있다.
  • subject 안에는 _observers라는 변수가 있어서 여기에 다수의 관찰자들에 대한 정보를 저장해 놓는다.
private var _observers = Observers()

 

  • on()은 어떤 식으로 이 모든 _observers에게 nextEvent를 전달하는 걸까?
  • Subject의 on 함수 하단에서는 dispatch(_synchronized_on(event),event)를 통해 _synchronized_on 함수를 호출하고 있다.
//BehaviorSubject
func _synchronized_on(_ event: Event<E>) -> Observers {
        _lock.lock(); defer { _lock.unlock() }
        if _stoppedEvent != nil || _isDisposed {
            return Observers()
        }
        
        switch event {
        case .next(let element):
            _element = element
        case .error, .completed:
            _stoppedEvent = event
        }
        
        return _observers
    }

 

  • _synchronized_on 함수의 구현부는 subject마다 조금씩 다르지만 정상적인 상황일 때 공통적으로 _observers를 반환한다.
  • 따라서 첫번째 인자로 해당 subject의 모든 observers을 담고 있고, 두번째 인자는 nextEvent를 갖고 있는 dispatch(_synchronized_on(event), event)는 모든 observers들이 nextEvent를 받는 것을 가능케한다. 

 

_observers에 observer들이 모인 과정을 알아보자

  • _observers는 subject가 subscribe될때 마다 생기는 observer들이 모인 것일 텐데, 어떤 과정을 거쳐 만들어 진걸까?
  • Observable과 Subject 기본적으로 둘 다 ObservableType을 상속 받기 때문에 subscribe 함수를 호출할 때 같은 코드가 실행된다.
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            let disposable: Disposable
            
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
            }
            
            #if DEBUG
                let synchronizationTracker = SynchronizationTracker()
            #endif
            
            let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
            
            //1) 안에서 자체적으로 observer 생성
            let observer = AnonymousObserver<E> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                //2) 'observer'에 대한 subscription. 이렇게 해서 해당 옵저버블에 대해 옵저버가 관찰을 할 수 있다.
                self.asObservable().subscribe(observer),
                disposable
            )
    }

 

  • 여기서 주목해야할 부분은 Disposable을 create하고 return 하는 과정에서 불리는 self.asObservable().subcribe(observer)함수다.
  • 이전 포스팅에서도 설명했듯 ObservableType은 protocol이고 subscribe라는 메소드가 있다.
  • 그리고 해당 프로토콜을 따르는 각 Subject를 살펴보면 그 안에 override를 통해 .subcribe 함수를 구현해 놓고 있음을 확인할 수 있다.
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        _lock.lock()
        let subscription = _synchronized_subscribe(observer)
        _lock.unlock()
        return subscription
    }

 

  • 이 함수가 불리면서 각기 다른 구현부를 가진 _synchronized_subscribe(observer) 호출한다.
//ReplaySubject
func _synchronized_subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == E {
        if _isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
     
        let anyObserver = observer.asObserver()
        
        replayBuffer(anyObserver)
        if let stoppedEvent = _stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        else {
            //_observers 에 observe insert 한 것
            let key = _observers.insert(observer.on)
            return SubscriptionDisposable(owner: self, key: key)
        }
    }

 

  • 구현은 조금씩 다르지만 정상적인 상황일 때 모두 let key = _observers.insert(observer.on)를 통해 _observers에 observe를 추가한 key라는 변수를 만들고 subcription으로 만들어 리턴한다.
 return Disposables.create(
       self.asObservable().subscribe(observer),
       disposable
  )

 

  • 결국 이러한 과정을 통해 self.asObservable().subscribe(observer)에는 해당 subject의 모든 observers에 대한 subsription이 담기게 되고 (observable이었을 땐 단일 observer에 대한 subscription)이므로 .on이 실행될때 모든 observer가 nextEvent를 받을 수 있는 환경을 마련한다.

 

 

Cold Observable VS Hot Observable

Cold Observable

  • 특징
    • Cold Observable은 구독자가 구독을 시작할 때 까지 데이터를 발생하지 않음
    • 각 구독자에게 독립적으로 작동한다. 
    • 즉 새로운 구독자가 구독을 시작하면 데이터 스트림이 처음부터 다시 시작됩니다.
  • 사용 사례
    • 데이터 파일 또는 HTTP 요청과 같이 요청에 의해 시작되는 데이터 스트림에 주로 사용된다.
    • 예: 파일에서 데이터를 읽을 때 , 새 구독자가 구독을 시작하면 파일읽기가 처음부터 다시 시작됩니다.
    • 예: HTTP 요청을 통해 서버에서 데이터를 가져오는 Observable은 Cold Observable의 좋은 예이다. 
      각 구독자가 구독을 시작할 때 마다 요청이 서버로 보내지고, 개별적인 응답을 받는다.

 

Hot Observable

  • 특징
    • Hot Observable은 구독자의 구독 여부와 상관없이 데이터를 발행합니다.
    • 모든 구독자에게 동일한 데이터 스트림을 공유합니다. 
    • 즉, 구독자가 구독을 시작한 시점부터 데이터를 수신하며, 이전에 발행된 데이터는 수신하지 못할 수 있습니다.
    • 모든 구독자에게 동일한 데이터 스트림 공유
    • 사용 사례
      • 실시간 주식가격, 센서 데이터, 마우스 이벤트등과 같이 지속적으로 데이터가 생성되고 여러 구독자에게 공유되어야하는 경우에 사용됩니다.

'ReactiveX' 카테고리의 다른 글

3. RXJava ObservableOnSubscribe란 무엇일까?  (0) 2023.12.05