Spring/Webflux

토비의 봄 TV - Reactive Streams (1)

개발정리 2022. 6. 4. 12:38

Reactive Programming


데이터가 변경될 때 마다 이벤트를 발생시켜서 데이터를 계속적으로 전달하는 Push 방식에 중점을 둔 프로그램 패러다임

Push 방식 (비동기 방식)
비동기 방식으로 데이터의 변화가 발생했을 때 변경이 발생한 곳에서 데이터를 보내주는 방식을 말한다.

 

 

 

Iterable 과 Observable 


Iterable

  • 데이터 순회를 가능하게 해주는 인터페이스이다.
    • java 에서 제공되고 있는 Iterator 패턴의 인터페이스
  • list 와 같은 collection 들은 Iterable 을 상속 받고 있다.
  • Iterable 를 상속받은 하위 객체들은 for-each loop 를 사용할 수 있다.
  • Iterable 인터페이스 안에는 iterator 라는 메서드가 존재한다.
    • Iterator<T> iterator( );
  • iterator 를 이용하여 순회가 가능하도록 한다.
  • Iterable 를 이용하여 순회를 하기 위해서는 iterator 를 구현해주어야 한다.

Iterable 은 호출될때마다 새로운 iterator 을 리턴하는데, 반복문을 계속 이용해도 새로운 iterator 를 만들어 던져지기 때문에 항상 처음부터 순회를 할 수 있는것이다. 

  • Iterator 는 데이터의 존재 여부를 확인 할 수 있는 hasNext( ), 데이터를 가져올 수 있는 next( ) 를 제공한다.

 

그렇다면, 왜 Iterator 메서드를 실행하도록 추가시켰을까?

Iterable 이 제공해주는 정보를 여러 클라이언트들이 계속 반복적으로 사용할 수 있게 하기 위해서이다. 

(물론, 우리가 Iterator 를 한번 사용하면 더 이상 사용할 수 없는 Iterable 로 설정할 수 도 있다.)

 

 

 

아래 코드는  Iterator 을 구현하여 foreach 를 사용하는 코드이다.

public class App {
    public static void main(String[] args) {
        // Iterator 구현
        Iterable<Integer> iter = () ->
                new Iterator<Integer>() {
                    int i=0;
                    final static intMAX= 10;
                    
                    @Override
                    public boolean hasNext() {
                        return i < MAX;
                    }

                    @Override
                    public Integer next() {
                        return ++i;
                    }
                };

        // for-each
        for (Integer item : iter) {
            System.out.println(item);
        }

        System.out.println("java 5 이전");
        for (Iterator<Integer> it = iter.iterator(); it.hasNext();) {
            System.out.println(it.next());
        }
    }
}

 

Observable

  • 다수의 객체(Observer) 가 특정 객체(Observable) 상태 변화를 감지하고 알림을 받을 수 있는 클래스이다. 
    • Java 에서 제공하는 Observer 패턴의 구현체
  • Observable 은 1..N 개의 Observer 를 가질 수 있다.
  • 동작 방식은 아래와 같다.
    • Observable (source) → event / data → Observer (target)
  • Observer (target) 을 Observable (source) 에 등록하여 데이터를 받는다.
  • Observable (source) 은 새로운 정보가 발생할 때 마다 Observer (target) 에게 전달하게 된다. (notify)
    • Observable 은 notifyObservers( ) 를 호출하여 변화를 Observer 들에게 알려준다.
  • 결국 Observable 은 계속해서 이벤트를 만들어내야 한다.
@SuppressWarnings("deprecation")
public class ObservableApp {

  static class IntObservable extends Observable implements Runnable {

    // publisher (데이터를 만드는 쪽)
    @Override
    public void run() {
      for (int i = 0; i < 10; i++) {
        // 1. 새로운 변화가 생겼다라는 것을 알려주는 메서드
        setChanged();
        // 2. Observer 에게 파라미터에 있는 데이터를 던져준다.
        notifyObservers(i);
      }
    }
  }

  public static void main(String[] args) {

    // 3. Observer 구현
    // subscribe (데이터를 받는 쪽)
    Observer ob = new Observer() {
      @Override
      public void update(Observable o, Object arg) {
        //System.out.println(arg); // 5. observable 이 던지는 데이터는 observer 가 다 받는다.
        System.out.println(Thread.currentThread().getName() + " " + arg);
      }
    };

    IntObservable io = new IntObservable();
    // 4. observable 에 observer 등록
    io.addObserver(ob); 

    ExecutorService es = Executors.newSingleThreadExecutor();
    es.execute(io); // observable 이 데이터를 만드는 행위를 해야한다.

    System.out.println(Thread.currentThread().getName() + " EXIT");
    es.shutdown();
  }
}

 

es.execute(io); 에 의해 setChanged( ),notifyObservers( ) 가 실행되면, ExecutorService 를 통해 할당은 받은 pool 에 있는 쓰레드 하나가 동작한다. 쓰레드 안에서 setChanged( ),notifyObservers( ) 이벤트 발생을 시켜서 update( ) 에서 처리되도록 하는 코드이다. 이 처럼 Observable 을 이용하면, 별개 쓰레드에서 동작하게 하는것을 쉽게 만들수 있지만 Iterable 로 만들려면 복잡해진다.

Reactive Extension 을 만든 MS 엔지니어인 에릭 마이너는 Iterable 의 상대성이  Observable 이라 발표하였다.

 

상대성(duality) 이란?

A 와 B 가 있을때 A 에서 성립하는 정리를 뒤집어서 B에도 적용할 수 있는 경우를 말하는데, 결국 둘의 본질은 같다라는 뜻이다.

 

Iterable 과 Observable 은 기능은 똑같지만 이 둘은 반대방향으로 표현되어 있다. 

https://cozzin.tistory.com/13

 

Iterable 은 받는 쪽에서 호출하 한 후 결과가 리턴값으로 오는 반면, (pull 방식)

observable 에서는 데이터를 넘겨주는 함수 인 notifyObservers(data) 을 통해 넘겨준다. (push 방식)

 

next( ) 는 데이터를 리턴값으로 받을 수 있는 반면 notifyObservers( ) 는 파라미터를 통해 데이터를 받을 수 있다.

궁극적으로 기능은 똑같은데 다르게 표현한 것이다. → 그래서 상대성 이라는 표현을 쓴 것이다.

  • next( ) - pull 방식
  • notifyObserver( ) - push 방식

 

Iterable 과 똑같은 기능을 구현했음에도 Observable 이 주는 장점이 훨씬 많다. Observable 을 관심있어하는 Oberservers 에게 한번에 브로드캐스트 할 수 있으며,  멀티 쓰레드를 이용하는 방식에 대해선 Observable 을 이용하면 손 쉽게 별개 쓰레드에서 동작하게 하는 것을 쉽게 만들 수 있지만 Iterable 로 만들려면 복잡해진다.

 

하지만, Observer 패턴만으로는 여전히 해결할 수 없는 문제들이 남아있었다. 

Complete 과 Error 를 표현하는 개념이 존재하지 않음.

  • '끝' 이라는 개념이 없다.
    • complete 를 언제/어떻게 시킬건지에 대한 개념이 없다. (데이터의 끝을 알 수 없음.)
  • 에러 처리에 대한 방식이 없다.
    • 복구 가능한 예외, 복구 불가능한 예외 등 여러 예외상황에 대한 처리 방식을 제공하지 않는다.

이 두가지를 추가해서 확장된 Observer 패턴을 새로 만들어 Reactive Programming 개념에 적용하였다.

 

 

 

Reactive Streams 


- JVM 진영에서 정의한 리액티브 프로그래밍의 표준 스펙이다. 

- 기존 옵저버 패턴을 확장한 패턴을 가지고 있으며, Non-blocking 과 Back Pressure 방식을 사용해 비동기 서비스를 할 때 사용된다.

  • 이 전의 Reactive extension 을 만든 MS 엔지니어들은 Observer 패턴으로는 부족하다고 생각했다.
  • Complete 와 Error 는 언제, 어떻게 표현할 것인지 이 두가지를 추가하여 확장시켰다. 
  • 현재 자바에서는 옵저버 패턴을 쉽게 사용할 수 있도록 Observable 클래스와 Observer 인터페이스를 제공하는데 Complete 와 Error 처리에 대한 부분은 제공하고 있지 않다.
    (java 9 이후에서는 사용성이 떨어지는 등의 이유로 deprecated 처리 되었다. 자세한건 디자인패턴 > 옵저버 패턴 편 참고)

- Publiser, Subscriber, Subscription, Processor 라는 인터페이스를 제공한다.

- 대표적인 구현체 라이브러리는 RxJava, Reactor 등이 있다.

- Java 9 로 공식 채택되어 이를 구현한 Flow 클래스를 사용할 수 있다.

Back Pressure
Observer 패턴에서  Subscriber 가 Publisher 에게 자신이 처리할 수 있는 만큼의 데이터를 Request 하는 것이다.

 

ReactiveStreams 구현체 중 하나인 Flow 클래스를 통해 간단한 코드로 구현해보자.

대략적인 흐름은 아래 그림과 같다.

 

public class PubsubAsync {
  public static void main(String[] args) {

    Iterable<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
    ExecutorService executorService = Executors.newCachedThreadPool();

    Flow.Publisher<Integer> pub = new Flow.Publisher<>() {
      Iterator<Integer> its = list.iterator();

      @Override
      public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        subscriber.onSubscribe(new Flow.Subscription() {
          @Override
          public void request(long n) {

            executorService.execute(() -> {
              int i = 0;
              try {
                while (i++ < n) {
                  if (its.hasNext()) {
                    subscriber.onNext(its.next());
                  } else {
                    subscriber.onComplete();
                    break;
                  }
                }
              } catch (Exception e) {
                subscriber.onError(e);
              }
            });

          }

          @Override
          public void cancel() {

          }
        });
      }
    };

    // [2] Subscriber
    Flow.Subscriber<Integer> sub = new Flow.Subscriber<Integer>() {
      Flow.Subscription subscription;
      @Override
      public void onSubscribe(Flow.Subscription subscription) {
        System.out.println(Thread.currentThread().getName() + " - onSubscribe");
        this.subscription = subscription;
        this.subscription.request(1);
      }

      @Override
      public void onNext(Integer item) {
        System.out.println(Thread.currentThread().getName() + " - onNext : " + item);
        this.subscription.request(1);
      }

      @Override
      public void onError(Throwable throwable) {
        System.out.println("onError : " + throwable.getMessage());
      }

      @Override
      public void onComplete() {
        System.out.println(Thread.currentThread().getName() + " - onComplete");
      }
    };
    
    pub.subscribe(sub);
    
    try {
      executorService.awaitTermination(10, TimeUnit.HOURS);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    
    executorService.shutdown();
  }
}
// 결과
main - onSubscribe
pool-1-thread-1 - onNext : 1
pool-1-thread-2 - onNext : 2
pool-1-thread-3 - onNext : 3
pool-1-thread-2 - onNext : 4
pool-1-thread-3 - onNext : 5
pool-1-thread-2 - onNext : 6
pool-1-thread-3 - onNext : 7
pool-1-thread-2 - onNext : 8
pool-1-thread-3 - onNext : 9
pool-1-thread-2 - onComplete

 

 

 

참고

https://www.youtube.com/watch?v=8fenTR3KOJo 

https://cozzin.tistory.com/13

https://ahea.wordpress.com/2017/02/02/iterable%EC%99%80-observable%EC%9D%98-%EA%B0%9C%EB%85%90/

https://do-study.tistory.com/115

https://phantasmicmeans.tistory.com/entry/Observer-Iterator-Reactive-Stream?category=900172