Study Programming/RxJava

RxJava_3. Observer

네모메모 2020. 5. 14. 22:49
반응형

 

RxJava의 구성요소 중 하나인 Observer


Observer이란?

 : Observable에서 전달되는 이벤트를 받는 주체.

- Observable이 데이터나 이벤트를 배출(=push=emit)하기 위해서는 이를 수신할 '구독(=subscribe) Observer'가 필요!!

- ObserverObservable체인의 맨 마지막 방출물(=emission)을 수신하며 여기서 emission을 소비한다.

  cf) Operator가 연산 처리 후 반환한 ObservableObserver이다, Operator도 내부적으로는 Observer를 구현하는데

      보통, Observer라 할 때는 'Observable 체인의 제일 끝에서 내보낸(=push=emit)을 소비하는 최종 Observer'를 지칭한다.

  +) '소비한다' 의미?  마지막 전달된 emission 즉 일종의 Observable 전달 아이템들을 사용한다는 의미이며,
                             다양한 방식으로 소비가능하다. ex) DB, 텍스트파일 서버 응답, UI 출력, 콘솔 출력 등등

 


Observer의 구조

package io.reactivex.rxjava3.core;
public interface Observer<@NonNull T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}

 

 - Observer interface는 Observerable에서 전달된 이벤트를 받기 위해 onNext(), onComplete(), onError(), onSubscribe()
   함수를 가짐

   1. onNext() 

: Observale에서 Observer까지 한 번에 하나씩 각 아이템을 전달


   2. onComplete() 

: 완료 이벤트를 Observer에 전달한다.
                             즉, onNext() 호출이 더 이상 발생하지 않을 것을 알림 


   3. onError() 

: 오류를 Observer에게 전달. (일반적으로 Observer는 이를 처리하는 방법을 정의함)


   4. onSubscribe() :

 


 

cf) 위 함수들을 Observable.create()에서 쓴다고 Observable 함수라고 착각하면 안됨!

Observable.create(ObservableOnSubscribe인터페이스 구현한 람다식)꼴로 데이터를 넘겨주는데

람다식으로 표현할 ObservableOnSubscribe인터페이스의 단일 함수가 subscribe(ObservableEmitter인터페이스 매개변수)이므로 

매개변수 'emitter'는 ObservableEmitter이며, 우리는 emitter.onNext(), emitter.onComplete(), emitter.onError()를 호출한다.

 

 

 ▽ Observable create() 함수 정의

@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
view raw Observable.java hosted with ❤ by GitHub

 

 

그렇다면, Emitter란 무엇인가??

 

- Emitter는 취소 처리를 위한 Observer의 추상화 인터페이스

 

- 그러므로 Observable.create()에서 emitter.onNext(), emitter.onComplete(), emitter.onError()호출은 Observer의 함수를 호출하는 것

 


Observer 구독하기

구독(=subscribe)이란?

  - Observer에서 Observable의 이벤트를 받겠다고 등록?하는 행위.
  - 특정 Observable구독(=subscribe)해야 Observable의 이벤트를 받을 수 있다.

 

 

구독(=subscribe)하기

- Observable에서 subscribe() 메서드를 호출하면 끝!

- subscribe() 함수 호출하면 Observer가 구현한 메서드를 사용해 3가지 이벤트를 소비한다.

 


Observer 구현

 - Observer은 아래와 같은 타입으로 선언됨 

Observer<T> 

 

- 람다식을 이용하여 간결히 표현하여 subscribe 함수에 매개변수로 넘길 수 있다.
  이렇게 응용된? subscribe() 함수들은 모두 Disposable을 리턴 (Observer로부터 Observable의 연결을 끊을 수 있게 해줘
  장기간 지속되는 Observable에서 유용)

public final Disposable subscribe(Consumer<? super T> onNext)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                                             Action onComplete)

 

 

 

Ex1) 람다식 없이 Observer 구현 및 등록 예

public static void main(String[] args) {
// create Observable
Observable<String> supportedVersionNameObservable = Observable.create(emitter -> {
try {
emitter.onNext("Nougat");
emitter.onNext("Oreo");
emitter.onNext("Pie");
emitter.onNext("Android10");
emitter.onComplete();
} catch (Exception e) {
e.printStackTrace();
emitter.onError(e);
}
});
// create Observer
Observer<Integer> supportedVersionNameLengthObserver = new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// Do nothing with Disposable, disregard for now
}
@Override
public void onNext(@NonNull Integer value) {
System.out.println("ReceivedData Length : " + value);
}
@Override
public void onError(@NonNull Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("DONE!");
}
};
// subscribe Observer to received data from Observable
supportedVersionNameObservable.map(String::length) // Add Operator 'map()' to get a string length
.subscribe(supportedVersionNameLengthObserver);
}

 

 

Ex2) 람다식 사용하여 Observer 구현 및 등록 예

 

public static void main(String[] args) {
// create Observable
Observable<String> supportedVersionNameObservable = Observable.create(emitter -> {
try {
emitter.onNext("Nougat");
emitter.onNext("Oreo");
emitter.onNext("Pie");
emitter.onNext("Android10");
emitter.onComplete();
} catch (Exception e) {
e.printStackTrace();
emitter.onError(e);
}
});
// subscribe Observer to received data from Observable
supportedVersionNameObservable.map(String::length) // Add Operator 'map()' to get a string length
// create Observer with Lambda expression
.subscribe(
i -> System.out.println("ReceivedData Length : " + i), // onNext(Integer value)
Throwable::printStackTrace, // onError(Throwable e)
() -> System.out.println("DONE!") // onComplete()
);
}

 

 

 

 


 

 

END!

 


스터디 도움 참조 블로그 (References)

- Emitter vs Observer
github.com/ReactiveX/RxJava/issues/4787#issuecomment-257270353


- [도서] RxJava 시작하기, 원제 Learning Rxjava, 저자 토마스 닐드| 역자 조승진 |에이콘출판 |2019.07.24

http://book.naver.com/bookdb/book_detail.nhn?bid=12801360

 
- RecativeX 공식사이트

http://reactivex.io/intro.htm

 

반응형