Reactive Programming
데이터가 변경될 때 마다 이벤트를 발생시켜서 데이터를 계속적으로 전달하는 Push 방식에 중점을 둔 프로그램 패러다임
Push 방식 (비동기 방식)
비동기 방식으로 데이터의 변화가 발생했을 때 변경이 발생한 곳에서 데이터를 보내주는 방식을 말한다.
Iterable 과 Observable
Iterable
- 데이터 순회를 가능하게 해주는 인터페이스이다.
- java 에서 제공되고 있는 Iterator 패턴의 인터페이스
- list 와 같은 collection 들은 Iterable 을 상속 받고 있다.
- Iterable 를 상속받은 하위 객체들은 for-each loop 를 사용할 수 있다.
- Iterable 인터페이스 안에는 iterator 라는 메서드가 존재한다.
- Iterator<T> iterator( );
- iterator 를 이용하여 순회가 가능하도록 한다.
- Iterable 를 이용하여 순회를 하기 위해서는 iterator 를 구현해주어야 한다.
Iterable 은 호출될때마다 새로운 iterator 을 리턴하는데, 반복문을 계속 이용해도 새로운 iterator 를 만들어 던져지기 때문에 항상 처음부터 순회를 할 수 있는것이다.
- Iterator 는 데이터의 존재 여부를 확인 할 수 있는 hasNext( ), 데이터를 가져올 수 있는 next( ) 를 제공한다.
그렇다면, 왜 Iterator 메서드를 실행하도록 추가시켰을까?
Iterable 이 제공해주는 정보를 여러 클라이언트들이 계속 반복적으로 사용할 수 있게 하기 위해서이다.
(물론, 우리가 Iterator 를 한번 사용하면 더 이상 사용할 수 없는 Iterable 로 설정할 수 도 있다.)
아래 코드는 Iterator 을 구현하여 foreach 를 사용하는 코드이다.
public class App {
public static void main(String[] args) {
// Iterator 구현
Iterable<Integer> iter = () ->
new Iterator<Integer>() {
int i=0;
final static intMAX= 10;
@Override
public boolean hasNext() {
return i < MAX;
}
@Override
public Integer next() {
return ++i;
}
};
// for-each
for (Integer item : iter) {
System.out.println(item);
}
System.out.println("java 5 이전");
for (Iterator<Integer> it = iter.iterator(); it.hasNext();) {
System.out.println(it.next());
}
}
}
Observable
- 다수의 객체(Observer) 가 특정 객체(Observable) 상태 변화를 감지하고 알림을 받을 수 있는 클래스이다.
- Java 에서 제공하는 Observer 패턴의 구현체
- Observable 은 1..N 개의 Observer 를 가질 수 있다.
- 동작 방식은 아래와 같다.
- Observable (source) → event / data → Observer (target)
- Observer (target) 을 Observable (source) 에 등록하여 데이터를 받는다.
- Observable (source) 은 새로운 정보가 발생할 때 마다 Observer (target) 에게 전달하게 된다. (notify)
- Observable 은 notifyObservers( ) 를 호출하여 변화를 Observer 들에게 알려준다.
- 결국 Observable 은 계속해서 이벤트를 만들어내야 한다.
@SuppressWarnings("deprecation")
public class ObservableApp {
static class IntObservable extends Observable implements Runnable {
// publisher (데이터를 만드는 쪽)
@Override
public void run() {
for (int i = 0; i < 10; i++) {
// 1. 새로운 변화가 생겼다라는 것을 알려주는 메서드
setChanged();
// 2. Observer 에게 파라미터에 있는 데이터를 던져준다.
notifyObservers(i);
}
}
}
public static void main(String[] args) {
// 3. Observer 구현
// subscribe (데이터를 받는 쪽)
Observer ob = new Observer() {
@Override
public void update(Observable o, Object arg) {
//System.out.println(arg); // 5. observable 이 던지는 데이터는 observer 가 다 받는다.
System.out.println(Thread.currentThread().getName() + " " + arg);
}
};
IntObservable io = new IntObservable();
// 4. observable 에 observer 등록
io.addObserver(ob);
ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(io); // observable 이 데이터를 만드는 행위를 해야한다.
System.out.println(Thread.currentThread().getName() + " EXIT");
es.shutdown();
}
}
es.execute(io); 에 의해 setChanged( ),notifyObservers( ) 가 실행되면, ExecutorService 를 통해 할당은 받은 pool 에 있는 쓰레드 하나가 동작한다. 쓰레드 안에서 setChanged( ),notifyObservers( ) 이벤트 발생을 시켜서 update( ) 에서 처리되도록 하는 코드이다. 이 처럼 Observable 을 이용하면, 별개 쓰레드에서 동작하게 하는것을 쉽게 만들수 있지만 Iterable 로 만들려면 복잡해진다.
Reactive Extension 을 만든 MS 엔지니어인 에릭 마이너는 Iterable 의 상대성이 Observable 이라 발표하였다.
상대성(duality) 이란?
A 와 B 가 있을때 A 에서 성립하는 정리를 뒤집어서 B에도 적용할 수 있는 경우를 말하는데, 결국 둘의 본질은 같다라는 뜻이다.
Iterable 과 Observable 은 기능은 똑같지만 이 둘은 반대방향으로 표현되어 있다.
Iterable 은 받는 쪽에서 호출하 한 후 결과가 리턴값으로 오는 반면, (pull 방식)
observable 에서는 데이터를 넘겨주는 함수 인 notifyObservers(data) 을 통해 넘겨준다. (push 방식)
next( ) 는 데이터를 리턴값으로 받을 수 있는 반면 notifyObservers( ) 는 파라미터를 통해 데이터를 받을 수 있다.
궁극적으로 기능은 똑같은데 다르게 표현한 것이다. → 그래서 상대성 이라는 표현을 쓴 것이다.
- next( ) - pull 방식
- notifyObserver( ) - push 방식
Iterable 과 똑같은 기능을 구현했음에도 Observable 이 주는 장점이 훨씬 많다. Observable 을 관심있어하는 Oberservers 에게 한번에 브로드캐스트 할 수 있으며, 멀티 쓰레드를 이용하는 방식에 대해선 Observable 을 이용하면 손 쉽게 별개 쓰레드에서 동작하게 하는 것을 쉽게 만들 수 있지만 Iterable 로 만들려면 복잡해진다.
하지만, Observer 패턴만으로는 여전히 해결할 수 없는 문제들이 남아있었다.
Complete 과 Error 를 표현하는 개념이 존재하지 않음.
- '끝' 이라는 개념이 없다.
- complete 를 언제/어떻게 시킬건지에 대한 개념이 없다. (데이터의 끝을 알 수 없음.)
- 에러 처리에 대한 방식이 없다.
- 복구 가능한 예외, 복구 불가능한 예외 등 여러 예외상황에 대한 처리 방식을 제공하지 않는다.
이 두가지를 추가해서 확장된 Observer 패턴을 새로 만들어 Reactive Programming 개념에 적용하였다.
Reactive Streams
- JVM 진영에서 정의한 리액티브 프로그래밍의 표준 스펙이다.
- 기존 옵저버 패턴을 확장한 패턴을 가지고 있으며, Non-blocking 과 Back Pressure 방식을 사용해 비동기 서비스를 할 때 사용된다.
- 이 전의 Reactive extension 을 만든 MS 엔지니어들은 Observer 패턴으로는 부족하다고 생각했다.
- Complete 와 Error 는 언제, 어떻게 표현할 것인지 이 두가지를 추가하여 확장시켰다.
- 현재 자바에서는 옵저버 패턴을 쉽게 사용할 수 있도록 Observable 클래스와 Observer 인터페이스를 제공하는데 Complete 와 Error 처리에 대한 부분은 제공하고 있지 않다.
(java 9 이후에서는 사용성이 떨어지는 등의 이유로 deprecated 처리 되었다. 자세한건 디자인패턴 > 옵저버 패턴 편 참고)
- Publiser, Subscriber, Subscription, Processor 라는 인터페이스를 제공한다.
- 대표적인 구현체 라이브러리는 RxJava, Reactor 등이 있다.
- Java 9 로 공식 채택되어 이를 구현한 Flow 클래스를 사용할 수 있다.
Back Pressure
Observer 패턴에서 Subscriber 가 Publisher 에게 자신이 처리할 수 있는 만큼의 데이터를 Request 하는 것이다.
ReactiveStreams 구현체 중 하나인 Flow 클래스를 통해 간단한 코드로 구현해보자.
대략적인 흐름은 아래 그림과 같다.
public class PubsubAsync {
public static void main(String[] args) {
Iterable<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
ExecutorService executorService = Executors.newCachedThreadPool();
Flow.Publisher<Integer> pub = new Flow.Publisher<>() {
Iterator<Integer> its = list.iterator();
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
executorService.execute(() -> {
int i = 0;
try {
while (i++ < n) {
if (its.hasNext()) {
subscriber.onNext(its.next());
} else {
subscriber.onComplete();
break;
}
}
} catch (Exception e) {
subscriber.onError(e);
}
});
}
@Override
public void cancel() {
}
});
}
};
// [2] Subscriber
Flow.Subscriber<Integer> sub = new Flow.Subscriber<Integer>() {
Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println(Thread.currentThread().getName() + " - onSubscribe");
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
System.out.println(Thread.currentThread().getName() + " - onNext : " + item);
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError : " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println(Thread.currentThread().getName() + " - onComplete");
}
};
pub.subscribe(sub);
try {
executorService.awaitTermination(10, TimeUnit.HOURS);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.shutdown();
}
}
// 결과
main - onSubscribe
pool-1-thread-1 - onNext : 1
pool-1-thread-2 - onNext : 2
pool-1-thread-3 - onNext : 3
pool-1-thread-2 - onNext : 4
pool-1-thread-3 - onNext : 5
pool-1-thread-2 - onNext : 6
pool-1-thread-3 - onNext : 7
pool-1-thread-2 - onNext : 8
pool-1-thread-3 - onNext : 9
pool-1-thread-2 - onComplete
참고
https://www.youtube.com/watch?v=8fenTR3KOJo
https://ahea.wordpress.com/2017/02/02/iterable%EC%99%80-observable%EC%9D%98-%EA%B0%9C%EB%85%90/
https://do-study.tistory.com/115
https://phantasmicmeans.tistory.com/entry/Observer-Iterator-Reactive-Stream?category=900172
'Spring > Webflux' 카테고리의 다른 글
토비의 봄 TV - AsyncRestTemplate의 콜백 헬과 중복 작업 문제 (6) (0) | 2022.06.24 |
---|---|
토비의 봄 TV - 스프링의 비동기 기술 (4) 2/2 (0) | 2022.06.17 |
토비의 봄 TV - 자바의 비동기 기술 (4) 1/2 (0) | 2022.06.17 |
토비의 봄 TV - Reactive Streams Scheduler (3) (0) | 2022.06.14 |
토비의 봄 TV - Reactive Streams Operators (2) (0) | 2022.06.07 |