ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 토비의 봄 TV - Reactive Streams (1)
    Spring/Webflux 2022. 6. 4. 12:38

    Reactive Programming


    데이터가 변경될 때 마다 이벤트를 발생시켜서 데이터를 계속적으로 전달하는 Push 방식에 중점을 둔 프로그램 패러다임

    Push 방식 (비동기 방식)
    비동기 방식으로 데이터의 변화가 발생했을 때 변경이 발생한 곳에서 데이터를 보내주는 방식을 말한다.

     

     

     

    Iterable 과 Observable 


    Iterable

    • 데이터 순회를 가능하게 해주는 인터페이스이다.
      • java 에서 제공되고 있는 Iterator 패턴의 인터페이스
    • list 와 같은 collection 들은 Iterable 을 상속 받고 있다.
    • Iterable 를 상속받은 하위 객체들은 for-each loop 를 사용할 수 있다.
    • Iterable 인터페이스 안에는 iterator 라는 메서드가 존재한다.
      • Iterator<T> iterator( );
    • iterator 를 이용하여 순회가 가능하도록 한다.
    • Iterable 를 이용하여 순회를 하기 위해서는 iterator 를 구현해주어야 한다.

    Iterable 은 호출될때마다 새로운 iterator 을 리턴하는데, 반복문을 계속 이용해도 새로운 iterator 를 만들어 던져지기 때문에 항상 처음부터 순회를 할 수 있는것이다. 

    • Iterator 는 데이터의 존재 여부를 확인 할 수 있는 hasNext( ), 데이터를 가져올 수 있는 next( ) 를 제공한다.

     

    그렇다면, 왜 Iterator 메서드를 실행하도록 추가시켰을까?

    Iterable 이 제공해주는 정보를 여러 클라이언트들이 계속 반복적으로 사용할 수 있게 하기 위해서이다. 

    (물론, 우리가 Iterator 를 한번 사용하면 더 이상 사용할 수 없는 Iterable 로 설정할 수 도 있다.)

     

     

     

    아래 코드는  Iterator 을 구현하여 foreach 를 사용하는 코드이다.

    public class App {
        public static void main(String[] args) {
            // Iterator 구현
            Iterable<Integer> iter = () ->
                    new Iterator<Integer>() {
                        int i=0;
                        final static intMAX= 10;
                        
                        @Override
                        public boolean hasNext() {
                            return i < MAX;
                        }
    
                        @Override
                        public Integer next() {
                            return ++i;
                        }
                    };
    
            // for-each
            for (Integer item : iter) {
                System.out.println(item);
            }
    
            System.out.println("java 5 이전");
            for (Iterator<Integer> it = iter.iterator(); it.hasNext();) {
                System.out.println(it.next());
            }
        }
    }

     

    Observable

    • 다수의 객체(Observer) 가 특정 객체(Observable) 상태 변화를 감지하고 알림을 받을 수 있는 클래스이다. 
      • Java 에서 제공하는 Observer 패턴의 구현체
    • Observable 은 1..N 개의 Observer 를 가질 수 있다.
    • 동작 방식은 아래와 같다.
      • Observable (source) → event / data → Observer (target)
    • Observer (target) 을 Observable (source) 에 등록하여 데이터를 받는다.
    • Observable (source) 은 새로운 정보가 발생할 때 마다 Observer (target) 에게 전달하게 된다. (notify)
      • Observable 은 notifyObservers( ) 를 호출하여 변화를 Observer 들에게 알려준다.
    • 결국 Observable 은 계속해서 이벤트를 만들어내야 한다.
    @SuppressWarnings("deprecation")
    public class ObservableApp {
    
      static class IntObservable extends Observable implements Runnable {
    
        // publisher (데이터를 만드는 쪽)
        @Override
        public void run() {
          for (int i = 0; i < 10; i++) {
            // 1. 새로운 변화가 생겼다라는 것을 알려주는 메서드
            setChanged();
            // 2. Observer 에게 파라미터에 있는 데이터를 던져준다.
            notifyObservers(i);
          }
        }
      }
    
      public static void main(String[] args) {
    
        // 3. Observer 구현
        // subscribe (데이터를 받는 쪽)
        Observer ob = new Observer() {
          @Override
          public void update(Observable o, Object arg) {
            //System.out.println(arg); // 5. observable 이 던지는 데이터는 observer 가 다 받는다.
            System.out.println(Thread.currentThread().getName() + " " + arg);
          }
        };
    
        IntObservable io = new IntObservable();
        // 4. observable 에 observer 등록
        io.addObserver(ob); 
    
        ExecutorService es = Executors.newSingleThreadExecutor();
        es.execute(io); // observable 이 데이터를 만드는 행위를 해야한다.
    
        System.out.println(Thread.currentThread().getName() + " EXIT");
        es.shutdown();
      }
    }

     

    es.execute(io); 에 의해 setChanged( ),notifyObservers( ) 가 실행되면, ExecutorService 를 통해 할당은 받은 pool 에 있는 쓰레드 하나가 동작한다. 쓰레드 안에서 setChanged( ),notifyObservers( ) 이벤트 발생을 시켜서 update( ) 에서 처리되도록 하는 코드이다. 이 처럼 Observable 을 이용하면, 별개 쓰레드에서 동작하게 하는것을 쉽게 만들수 있지만 Iterable 로 만들려면 복잡해진다.

    Reactive Extension 을 만든 MS 엔지니어인 에릭 마이너는 Iterable 의 상대성이  Observable 이라 발표하였다.

     

    상대성(duality) 이란?

    A 와 B 가 있을때 A 에서 성립하는 정리를 뒤집어서 B에도 적용할 수 있는 경우를 말하는데, 결국 둘의 본질은 같다라는 뜻이다.

     

    Iterable 과 Observable 은 기능은 똑같지만 이 둘은 반대방향으로 표현되어 있다. 

    https://cozzin.tistory.com/13

     

    Iterable 은 받는 쪽에서 호출하 한 후 결과가 리턴값으로 오는 반면, (pull 방식)

    observable 에서는 데이터를 넘겨주는 함수 인 notifyObservers(data) 을 통해 넘겨준다. (push 방식)

     

    next( ) 는 데이터를 리턴값으로 받을 수 있는 반면 notifyObservers( ) 는 파라미터를 통해 데이터를 받을 수 있다.

    궁극적으로 기능은 똑같은데 다르게 표현한 것이다. → 그래서 상대성 이라는 표현을 쓴 것이다.

    • next( ) - pull 방식
    • notifyObserver( ) - push 방식

     

    Iterable 과 똑같은 기능을 구현했음에도 Observable 이 주는 장점이 훨씬 많다. Observable 을 관심있어하는 Oberservers 에게 한번에 브로드캐스트 할 수 있으며,  멀티 쓰레드를 이용하는 방식에 대해선 Observable 을 이용하면 손 쉽게 별개 쓰레드에서 동작하게 하는 것을 쉽게 만들 수 있지만 Iterable 로 만들려면 복잡해진다.

     

    하지만, Observer 패턴만으로는 여전히 해결할 수 없는 문제들이 남아있었다. 

    Complete 과 Error 를 표현하는 개념이 존재하지 않음.

    • '끝' 이라는 개념이 없다.
      • complete 를 언제/어떻게 시킬건지에 대한 개념이 없다. (데이터의 끝을 알 수 없음.)
    • 에러 처리에 대한 방식이 없다.
      • 복구 가능한 예외, 복구 불가능한 예외 등 여러 예외상황에 대한 처리 방식을 제공하지 않는다.

    이 두가지를 추가해서 확장된 Observer 패턴을 새로 만들어 Reactive Programming 개념에 적용하였다.

     

     

     

    Reactive Streams 


    - JVM 진영에서 정의한 리액티브 프로그래밍의 표준 스펙이다. 

    - 기존 옵저버 패턴을 확장한 패턴을 가지고 있으며, Non-blocking 과 Back Pressure 방식을 사용해 비동기 서비스를 할 때 사용된다.

    • 이 전의 Reactive extension 을 만든 MS 엔지니어들은 Observer 패턴으로는 부족하다고 생각했다.
    • Complete 와 Error 는 언제, 어떻게 표현할 것인지 이 두가지를 추가하여 확장시켰다. 
    • 현재 자바에서는 옵저버 패턴을 쉽게 사용할 수 있도록 Observable 클래스와 Observer 인터페이스를 제공하는데 Complete 와 Error 처리에 대한 부분은 제공하고 있지 않다.
      (java 9 이후에서는 사용성이 떨어지는 등의 이유로 deprecated 처리 되었다. 자세한건 디자인패턴 > 옵저버 패턴 편 참고)

    - Publiser, Subscriber, Subscription, Processor 라는 인터페이스를 제공한다.

    - 대표적인 구현체 라이브러리는 RxJava, Reactor 등이 있다.

    - Java 9 로 공식 채택되어 이를 구현한 Flow 클래스를 사용할 수 있다.

    Back Pressure
    Observer 패턴에서  Subscriber 가 Publisher 에게 자신이 처리할 수 있는 만큼의 데이터를 Request 하는 것이다.

     

    ReactiveStreams 구현체 중 하나인 Flow 클래스를 통해 간단한 코드로 구현해보자.

    대략적인 흐름은 아래 그림과 같다.

     

    public class PubsubAsync {
      public static void main(String[] args) {
    
        Iterable<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
        ExecutorService executorService = Executors.newCachedThreadPool();
    
        Flow.Publisher<Integer> pub = new Flow.Publisher<>() {
          Iterator<Integer> its = list.iterator();
    
          @Override
          public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
            subscriber.onSubscribe(new Flow.Subscription() {
              @Override
              public void request(long n) {
    
                executorService.execute(() -> {
                  int i = 0;
                  try {
                    while (i++ < n) {
                      if (its.hasNext()) {
                        subscriber.onNext(its.next());
                      } else {
                        subscriber.onComplete();
                        break;
                      }
                    }
                  } catch (Exception e) {
                    subscriber.onError(e);
                  }
                });
    
              }
    
              @Override
              public void cancel() {
    
              }
            });
          }
        };
    
        // [2] Subscriber
        Flow.Subscriber<Integer> sub = new Flow.Subscriber<Integer>() {
          Flow.Subscription subscription;
          @Override
          public void onSubscribe(Flow.Subscription subscription) {
            System.out.println(Thread.currentThread().getName() + " - onSubscribe");
            this.subscription = subscription;
            this.subscription.request(1);
          }
    
          @Override
          public void onNext(Integer item) {
            System.out.println(Thread.currentThread().getName() + " - onNext : " + item);
            this.subscription.request(1);
          }
    
          @Override
          public void onError(Throwable throwable) {
            System.out.println("onError : " + throwable.getMessage());
          }
    
          @Override
          public void onComplete() {
            System.out.println(Thread.currentThread().getName() + " - onComplete");
          }
        };
        
        pub.subscribe(sub);
        
        try {
          executorService.awaitTermination(10, TimeUnit.HOURS);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        
        executorService.shutdown();
      }
    }
    // 결과
    main - onSubscribe
    pool-1-thread-1 - onNext : 1
    pool-1-thread-2 - onNext : 2
    pool-1-thread-3 - onNext : 3
    pool-1-thread-2 - onNext : 4
    pool-1-thread-3 - onNext : 5
    pool-1-thread-2 - onNext : 6
    pool-1-thread-3 - onNext : 7
    pool-1-thread-2 - onNext : 8
    pool-1-thread-3 - onNext : 9
    pool-1-thread-2 - onComplete

     

     

     

    참고

    https://www.youtube.com/watch?v=8fenTR3KOJo 

    https://cozzin.tistory.com/13

    https://ahea.wordpress.com/2017/02/02/iterable%EC%99%80-observable%EC%9D%98-%EA%B0%9C%EB%85%90/

    https://do-study.tistory.com/115

    https://phantasmicmeans.tistory.com/entry/Observer-Iterator-Reactive-Stream?category=900172 

     

Designed by Tistory.