Study Programming/RxJava

RxJava_2. Observable

네모메모 2020. 5. 13. 23:58
반응형

 

RxJava의 구성요소 중 하나인 Observable.


Observable이란?

 : 이벤트를 만들어 '이벤트 스트림(=Stream)'을 통해 이벤트를 내보내는(=push =emit) 주체.

 ↪ 이벤트란? 리액티브에서는 '데이터'와 '이벤트'를 동일하게 취급하므로 '일종의 데이터'를 보낸다고 봐도됨

>> 관련 포스팅

더보기

 

  #Stream 이란?

 : 시간에 따른 데이터 값들의 일련의 시퀀스 (=일종의 시간순으로 전달되지는 값들의 collection )

  • 'Observable이 만든 시간순으로 전달되는 데이터들의 강'으로 비유된다. (Subscriber가 이 강에서 데이터를 건짐)

  • Reactive Programing은 이벤트, ajax call, 등 모든 "데이터의 흐름"을 시간순으로 전달되어지는 Stream이라는 구조로 받아 처리함

  • 그러므로 Stream에서 데이터를 얻어와 처리한다.


Observable은 무슨 일을 하나?

  • 기본적으로 "이벤트를 Stream으로 내보내는(=push =emit) 작업"을 수행. 

    cf) Java8 'stream'과 주 차이점

  • '(정의된 Observable<T>를 구독(=subscribe)하여 T 아이템을 소비하는) Observer'에 도달을 목적으로 함

  • 원한다면 일련의 Operator를 추가하여 연산 처리된 T 아이템을 Stream으로 내보냄(=push=emit)

  • 이러한 동작으로 인해 Observable을 "push 작업 기반의 여러 조합 가능한 일종의 iterator" 라고도 정의함

 


push(=emit)작업을 어떻게 하나?

  • 작업 목적인 'Observer에게 순차적으로 아이템을 전달'을 위해 아래 3가지 함수를 호출한다.
    이 함수들은 Observer에 정의되어 있는 함수들

  • 반드시 최종 Observer로 전달되지는 않으며 chain의 다음 단계에서 서빙하는 연산자에게 push될 수도 있다.
    ex) Observable에 '연산자1'을 적용 후 Observer에게 전달되는 경우,
         ① Observable 아이템은 바로 다음 chain의 '연산자1'에게 push.
         ② '연산자1'은 연산된 아이템을 전달하는 새로운 Observable을 생성하여 다음 chain인 Observer에게 push
         ③ Observer는 2번 과정 push로 전달된 연산 처리된 아이템을 구독(=subscribe)함.

 

   1. onNext()

: Observale에서 Observer까지 한 번에 하나씩 각 아이템을 전달


   2. onComplete()

: 완료 이벤트를 Observer에 전달한다. 즉, onNext() 호출이 더 이상 발생하지 않을 것을 알림

   3. onError()

: 오류를 Observer에게 전달. (일반적으로 Observer는 이를 처리하는 방법을 정의함)

 


Observable은 어떤 타입?

 - 우리가 생성하는 Observable은 아래와 같은 타입으로 선언됨

Observable<T>

 


Observable 생성

- Observable을 생성하는 다양한 팩토리 메소드가 존재하는데 대표적인 3개만 일단 봐보자.

  1. Observable.create( emitter -> {emitter.onNext(); emitter.onComplete(); emitter.onError();})

    - 매개변수에 "ObservableEmitter를 인자로 받는 람다(=ObservableOnSubscribe인터페이스 구현체)"를 넣어준다.
    - 그리고 그 람다에 전달된 ObservableEmitter인자 'emitter'에서 Observer로 아이템을 전하는 Observable 작업  

      (=push=emit)

    을 수행할 함수 emitter.onNext(), emitter.onComplete(), emitter.onError()를 호출함.

    - Observable.create() 함수 정의 (from RxJava3 io.reactivex.rxjava3.core.Observable)

    - ObservableOnSubscribe인터페이스 (=ObservableEmitter를 인자로 받는 람다의 타입) (from RxJava3)
      즉, 우리가 create() 함수 내 썼던 람다식은 이 인터페이스의 subscribe() 함수!

  2. Observable.just( item1, ... item 10 )

    - 매개변수에 (push=emit)할 아이템을 최대 10개까지 넣을 수 있다.
    - 전달된 아이템들을 알아서 onNext() 호출 후, 또 알아서 모든 아이템 작업 완료 시 onComplete()호출

     

  3. Observable.fromIterable( Iterable )

    - 매개변수에 Iterable타입의 아이템 모음 전달
    - 전달된 아이템들을 알아서 onNext() 호출 후, 또 알아서 모든 아이템 작업 완료 시 onComplete()호출


 


그 외 특징

  • Observable은 여러 이벤트를 만들어내거나 하나도 만들어내지 않을 수도 있다.

  • Stream은 특정 조건 만족시 종료되어 이벤트 생성 중지하거나 계속 유지시킬 수도 있다.

  • unsubscribe를 감지하여 subscribe하고 있는 Observer가 하나도 없으면 이벤트 생성을 위해 유지했던 리소스를 알아서 해제한다.

 


 

 

 

Observable에 대해서는 할 것들이 아직 많지만 개념정도까지만 일단 정리!

 

END!


스터디 도움 참조 블로그 (References)

- Emitter vs Observer
github.com/ReactiveX/RxJava/issues/4787#issuecomment-257270353

- [도서] RxJava 시작하기, 원제 Learning Rxjava, 저자 토마스 닐드| 역자 조승진 |에이콘출판 |2019.07.24

http://book.naver.com/bookdb/book_detail.nhn?bid=12801360

 
- RecativeX 공식사이트

http://reactivex.io/intro.htm

 

- Reactive Programming 개념

http://blog.skby.net/%EB%B0%98%EC%9D%91%ED%98%95-%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%98%EB%B0%8D-reactive-programming/

 

- Reactive Programming 필요성 및 전반적 이해 

https://dev-daddy.tistory.com/25

 

- [도서] 커니의 코틀린, 김태호 저 | 인사이트 | 2017.12.08

http://book.naver.com/bookdb/book_detail.nhn?bid=12801360

 

- Reactive Programming : Stream 정의적 의미

https://codecraft.tv/courses/angular/reactive-programming-with-rxjs/streams-and-reactive-programming/

 

- Reactive Programming : Stream 쉬운 의미
http://sculove.github.io/blog/2016/06/22/Reactive-Programming/

 

반응형