ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 토비의 봄 TV - Reactive Streams Operators (2)
    Spring/Webflux 2022. 6. 7. 14:42

    Observable 형태를 발전시킨 Publisher 와 Subscriber 를 만드는 방식을 이전 시간에 살펴보았다.

     

    이번엔 이를 조금 더 발전 시켜서 Operator 를 추가하는 방법과 동작방식을 살펴보고 실제 Reactive Strems 의 구현체인 Reactor 까지 사용해보자.

     

     

     

    복습


    ReactiveStreams 는 아래 기능들을 제공한다.

    • Publisher
    • Subscriber
    • Subscription
    • Processor

     

    Publisher 가 가장 중요한데,  데이터스트림을 계속해서 만들어내는 Provider 역할을 한다. 아래 코드를 살펴보자. 

    import org.reactivestreams.Publisher;
    import org.reactivestreams.Subscriber;
    import org.reactivestreams.Subscription;
    import java.util.stream.Collectors;
    import java.util.stream.Stream;
    
    class Pubsub {
      public static void main(String[] args) {
        Publisher<Integer> pub = new Publisher<Integer>() {
          Iterable<Integer> iter = Stream.iterate(1, a->a+1)
              .limit(10)
              .collect(Collectors.toList());
              
          @Override
          public void subscribe(Subscriber<? super Integer> subscriber) {
            subscriber.onSubscribe(new Subscription() {
              @Override
              public void request(long l) {
                try {
                  iter.forEach(item -> subscriber.onNext(item));
                  subscriber.onComplete();
                } catch(Throwable ex) {
                  subscriber.onError(ex);
                }
              }
    
              @Override
              public void cancel() {
    
              }
            });
          }
        };
    
        Subscriber<Integer> sub = new Subscriber<Integer>() {
          @Override
          public void onSubscribe(Subscription subscription) {
            System.out.println("onSubscribe(subscription) : " + subscription);
            subscription.request(Long.MAX_VALUE);
          }
    
          @Override
          public void onNext(Integer item) {
            System.out.println("onNext() : " + item);
          }
    
          @Override
          public void onError(Throwable ex) {
            System.out.println("onError() : " + ex);
          }
    
          @Override
          public void onComplete() {
            System.out.println("onComplete()");
          }
        };
    
        pub.subscribe(sub);
      }
    }

    Publisher

    1. Publisher 인터페이스에는 구현해야할 메서드가 딱 하나 있다. => subscribe()
    2. 해당 subscribe() 를 호출하여 Subscriber 를 매개변수로 던지면, Publisher 는 해당 Subscriber 구독를 구독 목록에 추가시켜 놓는다.
    3. subscribe() 을 호출하면 그때부터 데이터를 보내야하는데,가장 먼저 해야할것은 Subscriber 의 onSubscribe() 를 가장 먼저 호출해주어야 한다.
    4. onSubscribe() 매개변수로 subscription 객체를 만들어 파라미터로 던져주어야 한다.
    5. subscription 은 Pub, Sub 둘 사이에 구독이 한번 일어나는 액션을 담고있는 것이라 생각하면 된다.

    Subscriber

    1. Subscriber 는 Publisher 로 부터 네가지 메서드 (onSubscribe(), onNext(), onError(), onComplete()) 통해 데이터나 에러를 받고 완료 여부도 알 수 있게 된다.

     

     

     

    Operators


    여기서 핵심 주제는 Operators 이다.

     

    Operators 란?

    • 중간에 데이터가 가공되어지는것 (rxJava, reactor 는 사용하기 편하게 되어있다.)
    • 데이터가 전달될때 operator 를 거치면 가공되어 Subscriber 에게 전달되도록 한다.
    • Pub → [Data1] → operator1 → [Data2] → operator2 → [Data3] → Sub

     

    여기서 우리는 Operator 를 구현해보자.

    • map (mapPub()) 을 만들어보자. [Data1] → func(가공) → [Data2]
    • Pub → [Data1] → mapPub → [Data2] → Sub
                                ← subScribe(sub)
                                → onSubscribe(subscription)
                                → onNext()
                                → onNext() ...
                                → onComplete()
    import org.reactivestreams.Publisher;
    import org.reactivestreams.Subscriber;
    import org.reactivestreams.Subscription;
    import java.util.function.BiFunction;
    import java.util.function.Function;
    import java.util.stream.Collectors;
    import java.util.stream.Stream;
    
    public class PubSubOperators {
    
      public static void main(String[] args) {
    
        Iterable<Integer> data = Stream.iterate(1, number -> number + 1)
            //.filter(number -> number != 0)
            .limit(10)
            .collect(Collectors.toList());
    
        Publisher<Integer> pub = iterPub(data);
        //Subscriber<Integer> sub = logSub();
    
        /* pub 을 mapPub 에 연결, Subsriber 는 mapPub 에 다가 Subscribe 한다. */
        /* Subsriber 입장에서는 mapPub 이 Publisher 이다. */
        // Publisher<Integer> mapPub = mapPub(pub, (Function<Integer,Integer>) number -> number * 10);
        // Publisher<String> mapPub = mapPub(pub, s -> "[" + s + "]");
        // Publisher<Integer> map2Pub = mapPub(mapPub, (Function<Integer,Integer>) number -> -number);
    
        /* 어느 Publisher 를 넣어주면 합계를 구해줘서 리턴하는 Publisher 계산을 다 한 후에만 던져야다. */
        // Publisher<Integer> sumPub = sumPub(pub);
    
        // Publisher<Integer> reducePub = reducePub(pub, 0, (BiFunction<Integer,Integer,Integer>)(a, b)->a +   b);
        // Publisher<String> reducePub = reducePub(pub, "", (a, b)->a + "-" + b);
        Publisher<StringBuilder> reducePub = reducePub(pub, new StringBuilder(),
            (a, b) -> a.append(b + ", "));
        reducePub.subscribe(logSub());
      }
    
      private static <T, R> Publisher<R> reducePub(Publisher<T> pub, R init, BiFunction<R, T, R> bf) {
        return new Publisher<R>() {
          @Override
          public void subscribe(Subscriber<? super R> subscriber) {
    
            pub.subscribe(new DelegateSub<T, R>(subscriber) {
              R result = init;
    
              @Override
              public void onNext(T i) {
                result = bf.apply(result, i);
              }
    
              @Override
              public void onComplete() {
                subscriber.onNext(result);
                subscriber.onComplete();
              }
            });
          }
        };
      }
    //    private static Publisher<Integer> sumPub(Publisher<Integer> pub) {
    //        return new Publisher<Integer>() {
    //
    //            int sum = 0;
    //            @Override
    //            public void subscribe(Subscriber<? super Integer> sub) {
    //                pub.subscribe(new DelegateSub<Integer>(sub){
    //                    // 결과만 한번 넘겨줘야 하니까 onNext() 로 넘기면 안된다.
    //
    //                    @Override
    //                    public void onNext(Integer i) {
    //                        sum += (Integer)i;
    //                    }
    //
    //                    @Override
    //                    public void onComplete() {
    //                        sub.onNext(sum);// next 로 최종결과 한번만 넘겨준다.
    //                        sub.onComplete();
    //                    }
    //                });
    //            }
    //        };
    //    }
    
      private static <T, R> Publisher<R> mapPub(Publisher<T> pub, Function<T, R> f) {
        return new Publisher<R>() {
          @Override
          public void subscribe(Subscriber<? super R> sub) {
            // subscriber 를 새로 만들어 중계 채널로 사용한다,
            // 기존의 sub 에게 전달.
            pub.subscribe(new DelegateSub<T, R>(sub){
    
              // 여기서 단순하게 위임하는 코드 만들고 싶으면 오버라이드 하면된다.
              @Override
              public void onNext(T i) {
                sub.onNext(f.apply(i));
              }
            });
          }
        };
      }
    
      private static Publisher<Integer> iterPub(Iterable<Integer> data) {
    
        return  new Publisher<Integer>() {
          @Override
          public void subscribe(Subscriber<? super Integer> subscriber) {
            subscriber.onSubscribe(new Subscription() {
              @Override
              public void request(long l) {
                try {
                  data.forEach(item -> subscriber.onNext(item));
                  subscriber.onComplete();
                } catch(Throwable ex) {
                  subscriber.onError(ex);
                }
              }
    
              @Override
              public void cancel() {
    
              }
            });
          }
        };
      }
    
      private static <T> Subscriber<T> logSub() {
        return new Subscriber<T>() {
          @Override
          public void onSubscribe(Subscription subscription) {
            System.out.println("onSubscribe(subscription) : " + subscription);
            subscription.request(10);
          }
    
          @Override
          public void onNext(T integer) {
            System.out.println("onNext() : " + integer);
          }
    
          @Override
          public void onError(Throwable throwable) {
            System.out.println("onError() : " + throwable);
          }
    
          @Override
          public void onComplete() {
            System.out.println("onComplete()");
          }
        };
      }
    }
    import org.reactivestreams.Subscriber;
    import org.reactivestreams.Subscription;
    
    public class DelegateSub<T, R> implements Subscriber<T> {
    
      Subscriber sub;
    
      public DelegateSub(Subscriber<? super R> sub) {
        this.sub = sub;
      }
    
      @Override
      public void onSubscribe(Subscription subscription) {
        sub.onSubscribe(subscription);
      }
    
      @Override
      public void onNext(T i) {
        sub.onNext(i);
      }
    
      @Override
      public void onError(Throwable throwable) {
        sub.onError(throwable);
      }
    
      @Override
      public void onComplete() {
        sub.onComplete();
      }
    }
    • 위 실습을 통해 Operators 구조가 어떤식으로 동작하는지 알 수 있어야 한다.
    • Publisher Subscription Subscriber 구조로 ReactiveStreams 구조가 만들어지고 그 사이에 Operators 를 끼워넣을때 어떤식으로 가공되는지를 알 수 있다.

     

     

     

    Reactor 


    간단한 reactor 실습을 진행해보자.

    아래 코드는 위에서 직접구현한 Operator  코드를 reactor 에서 제공해주는 Operator 를 사용하여 구현한것이다.

    import reactor.core.publisher.Flux;
    
    public class ReactorEx {
      public static void main(String[] args) {
        Flux.<Integer>create(e -> {
              e.next(1);
              e.next(2);
              e.next(3);
              e.complete();
            })
            .log() 
            .map(s -> s * 10)
            .reduce(0, (a,b) -> a+b)
            .log()
            .subscribe(System.out::println);
      }
    }
    • create( ) 안에서 기존 subscription 에 구현했었던 코드를 간단하게 만들 수 있다.
    • 내부 동작이 어떻게 호출되고 있는지 확인하고 싶으면 log( ) 를 사용하자.
    • subscribe( ) 가 호출되면 create( ) 를 타고 올라가 내부적으로 onSubscribe( ), request( ), onNext( ), onComplete( ) 가 호출된다.
    • 이 외에도 다양한 reactor 는  Operator 메서드를 지원한다. 

     

     

     

    Spring 에 적용해보기 


    import org.reactivestreams.Publisher;
    import org.reactivestreams.Subscriber;
    import org.reactivestreams.Subscription;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @SpringBootApplication
    public class App {
    
      @RestController
      public static class Controller {
    
        @RequestMapping("/hello")
        public Publisher<Integer> hello(Integer name) {
          return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> subscriber) {
              subscriber.onSubscribe(new Subscription() {
                @Override
                public void request(long l) { 
                  subscriber.onNext(name);
                  subscriber.onComplete(); 
                }
    
                @Override
                public void cancel() {
    
                }
              });
            }
          };
        }
      }
      
      public static void main(String[] args) {
        SpringApplication.run(App.class, args);
      }
    }
    • Spring5 부터는 Publisher로 return 이 가능하다.
    • Spring 에 적용할떈 Publisher 만 만들면되고, Subscriber 는 Spring 이 만들어준다.
    • Publisher 만 만들면 Spring 이 원하는 방식과 시점에 Subscriber 를 만들어 Publisher 에게 데이터를 요청한다.
    • Publisher 에 정의된 request 역시 return 시 Spring 이 알아서 request 를 호출한다.

     

     

     

    참고


    https://www.youtube.com/watch?v=DChIxy9g19o 

Designed by Tistory.