Spring/Webflux

토비의 봄 TV - Reactive Streams Operators (2)

개발정리 2022. 6. 7. 14:42

Observable 형태를 발전시킨 Publisher 와 Subscriber 를 만드는 방식을 이전 시간에 살펴보았다.

 

이번엔 이를 조금 더 발전 시켜서 Operator 를 추가하는 방법과 동작방식을 살펴보고 실제 Reactive Strems 의 구현체인 Reactor 까지 사용해보자.

 

 

 

복습


ReactiveStreams 는 아래 기능들을 제공한다.

  • Publisher
  • Subscriber
  • Subscription
  • Processor

 

Publisher 가 가장 중요한데,  데이터스트림을 계속해서 만들어내는 Provider 역할을 한다. 아래 코드를 살펴보자. 

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.stream.Collectors;
import java.util.stream.Stream;

class Pubsub {
  public static void main(String[] args) {
    Publisher<Integer> pub = new Publisher<Integer>() {
      Iterable<Integer> iter = Stream.iterate(1, a->a+1)
          .limit(10)
          .collect(Collectors.toList());
          
      @Override
      public void subscribe(Subscriber<? super Integer> subscriber) {
        subscriber.onSubscribe(new Subscription() {
          @Override
          public void request(long l) {
            try {
              iter.forEach(item -> subscriber.onNext(item));
              subscriber.onComplete();
            } catch(Throwable ex) {
              subscriber.onError(ex);
            }
          }

          @Override
          public void cancel() {

          }
        });
      }
    };

    Subscriber<Integer> sub = new Subscriber<Integer>() {
      @Override
      public void onSubscribe(Subscription subscription) {
        System.out.println("onSubscribe(subscription) : " + subscription);
        subscription.request(Long.MAX_VALUE);
      }

      @Override
      public void onNext(Integer item) {
        System.out.println("onNext() : " + item);
      }

      @Override
      public void onError(Throwable ex) {
        System.out.println("onError() : " + ex);
      }

      @Override
      public void onComplete() {
        System.out.println("onComplete()");
      }
    };

    pub.subscribe(sub);
  }
}

Publisher

  1. Publisher 인터페이스에는 구현해야할 메서드가 딱 하나 있다. => subscribe()
  2. 해당 subscribe() 를 호출하여 Subscriber 를 매개변수로 던지면, Publisher 는 해당 Subscriber 구독를 구독 목록에 추가시켜 놓는다.
  3. subscribe() 을 호출하면 그때부터 데이터를 보내야하는데,가장 먼저 해야할것은 Subscriber 의 onSubscribe() 를 가장 먼저 호출해주어야 한다.
  4. onSubscribe() 매개변수로 subscription 객체를 만들어 파라미터로 던져주어야 한다.
  5. subscription 은 Pub, Sub 둘 사이에 구독이 한번 일어나는 액션을 담고있는 것이라 생각하면 된다.

Subscriber

  1. Subscriber 는 Publisher 로 부터 네가지 메서드 (onSubscribe(), onNext(), onError(), onComplete()) 통해 데이터나 에러를 받고 완료 여부도 알 수 있게 된다.

 

 

 

Operators


여기서 핵심 주제는 Operators 이다.

 

Operators 란?

  • 중간에 데이터가 가공되어지는것 (rxJava, reactor 는 사용하기 편하게 되어있다.)
  • 데이터가 전달될때 operator 를 거치면 가공되어 Subscriber 에게 전달되도록 한다.
  • Pub → [Data1] → operator1 → [Data2] → operator2 → [Data3] → Sub

 

여기서 우리는 Operator 를 구현해보자.

  • map (mapPub()) 을 만들어보자. [Data1] → func(가공) → [Data2]
  • Pub → [Data1] → mapPub → [Data2] → Sub
                              ← subScribe(sub)
                              → onSubscribe(subscription)
                              → onNext()
                              → onNext() ...
                              → onComplete()
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class PubSubOperators {

  public static void main(String[] args) {

    Iterable<Integer> data = Stream.iterate(1, number -> number + 1)
        //.filter(number -> number != 0)
        .limit(10)
        .collect(Collectors.toList());

    Publisher<Integer> pub = iterPub(data);
    //Subscriber<Integer> sub = logSub();

    /* pub 을 mapPub 에 연결, Subsriber 는 mapPub 에 다가 Subscribe 한다. */
    /* Subsriber 입장에서는 mapPub 이 Publisher 이다. */
    // Publisher<Integer> mapPub = mapPub(pub, (Function<Integer,Integer>) number -> number * 10);
    // Publisher<String> mapPub = mapPub(pub, s -> "[" + s + "]");
    // Publisher<Integer> map2Pub = mapPub(mapPub, (Function<Integer,Integer>) number -> -number);

    /* 어느 Publisher 를 넣어주면 합계를 구해줘서 리턴하는 Publisher 계산을 다 한 후에만 던져야다. */
    // Publisher<Integer> sumPub = sumPub(pub);

    // Publisher<Integer> reducePub = reducePub(pub, 0, (BiFunction<Integer,Integer,Integer>)(a, b)->a +   b);
    // Publisher<String> reducePub = reducePub(pub, "", (a, b)->a + "-" + b);
    Publisher<StringBuilder> reducePub = reducePub(pub, new StringBuilder(),
        (a, b) -> a.append(b + ", "));
    reducePub.subscribe(logSub());
  }

  private static <T, R> Publisher<R> reducePub(Publisher<T> pub, R init, BiFunction<R, T, R> bf) {
    return new Publisher<R>() {
      @Override
      public void subscribe(Subscriber<? super R> subscriber) {

        pub.subscribe(new DelegateSub<T, R>(subscriber) {
          R result = init;

          @Override
          public void onNext(T i) {
            result = bf.apply(result, i);
          }

          @Override
          public void onComplete() {
            subscriber.onNext(result);
            subscriber.onComplete();
          }
        });
      }
    };
  }
//    private static Publisher<Integer> sumPub(Publisher<Integer> pub) {
//        return new Publisher<Integer>() {
//
//            int sum = 0;
//            @Override
//            public void subscribe(Subscriber<? super Integer> sub) {
//                pub.subscribe(new DelegateSub<Integer>(sub){
//                    // 결과만 한번 넘겨줘야 하니까 onNext() 로 넘기면 안된다.
//
//                    @Override
//                    public void onNext(Integer i) {
//                        sum += (Integer)i;
//                    }
//
//                    @Override
//                    public void onComplete() {
//                        sub.onNext(sum);// next 로 최종결과 한번만 넘겨준다.
//                        sub.onComplete();
//                    }
//                });
//            }
//        };
//    }

  private static <T, R> Publisher<R> mapPub(Publisher<T> pub, Function<T, R> f) {
    return new Publisher<R>() {
      @Override
      public void subscribe(Subscriber<? super R> sub) {
        // subscriber 를 새로 만들어 중계 채널로 사용한다,
        // 기존의 sub 에게 전달.
        pub.subscribe(new DelegateSub<T, R>(sub){

          // 여기서 단순하게 위임하는 코드 만들고 싶으면 오버라이드 하면된다.
          @Override
          public void onNext(T i) {
            sub.onNext(f.apply(i));
          }
        });
      }
    };
  }

  private static Publisher<Integer> iterPub(Iterable<Integer> data) {

    return  new Publisher<Integer>() {
      @Override
      public void subscribe(Subscriber<? super Integer> subscriber) {
        subscriber.onSubscribe(new Subscription() {
          @Override
          public void request(long l) {
            try {
              data.forEach(item -> subscriber.onNext(item));
              subscriber.onComplete();
            } catch(Throwable ex) {
              subscriber.onError(ex);
            }
          }

          @Override
          public void cancel() {

          }
        });
      }
    };
  }

  private static <T> Subscriber<T> logSub() {
    return new Subscriber<T>() {
      @Override
      public void onSubscribe(Subscription subscription) {
        System.out.println("onSubscribe(subscription) : " + subscription);
        subscription.request(10);
      }

      @Override
      public void onNext(T integer) {
        System.out.println("onNext() : " + integer);
      }

      @Override
      public void onError(Throwable throwable) {
        System.out.println("onError() : " + throwable);
      }

      @Override
      public void onComplete() {
        System.out.println("onComplete()");
      }
    };
  }
}
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class DelegateSub<T, R> implements Subscriber<T> {

  Subscriber sub;

  public DelegateSub(Subscriber<? super R> sub) {
    this.sub = sub;
  }

  @Override
  public void onSubscribe(Subscription subscription) {
    sub.onSubscribe(subscription);
  }

  @Override
  public void onNext(T i) {
    sub.onNext(i);
  }

  @Override
  public void onError(Throwable throwable) {
    sub.onError(throwable);
  }

  @Override
  public void onComplete() {
    sub.onComplete();
  }
}
  • 위 실습을 통해 Operators 구조가 어떤식으로 동작하는지 알 수 있어야 한다.
  • Publisher Subscription Subscriber 구조로 ReactiveStreams 구조가 만들어지고 그 사이에 Operators 를 끼워넣을때 어떤식으로 가공되는지를 알 수 있다.

 

 

 

Reactor 


간단한 reactor 실습을 진행해보자.

아래 코드는 위에서 직접구현한 Operator  코드를 reactor 에서 제공해주는 Operator 를 사용하여 구현한것이다.

import reactor.core.publisher.Flux;

public class ReactorEx {
  public static void main(String[] args) {
    Flux.<Integer>create(e -> {
          e.next(1);
          e.next(2);
          e.next(3);
          e.complete();
        })
        .log() 
        .map(s -> s * 10)
        .reduce(0, (a,b) -> a+b)
        .log()
        .subscribe(System.out::println);
  }
}
  • create( ) 안에서 기존 subscription 에 구현했었던 코드를 간단하게 만들 수 있다.
  • 내부 동작이 어떻게 호출되고 있는지 확인하고 싶으면 log( ) 를 사용하자.
  • subscribe( ) 가 호출되면 create( ) 를 타고 올라가 내부적으로 onSubscribe( ), request( ), onNext( ), onComplete( ) 가 호출된다.
  • 이 외에도 다양한 reactor 는  Operator 메서드를 지원한다. 

 

 

 

Spring 에 적용해보기 


import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
public class App {

  @RestController
  public static class Controller {

    @RequestMapping("/hello")
    public Publisher<Integer> hello(Integer name) {
      return new Publisher<Integer>() {
        @Override
        public void subscribe(Subscriber<? super Integer> subscriber) {
          subscriber.onSubscribe(new Subscription() {
            @Override
            public void request(long l) { 
              subscriber.onNext(name);
              subscriber.onComplete(); 
            }

            @Override
            public void cancel() {

            }
          });
        }
      };
    }
  }
  
  public static void main(String[] args) {
    SpringApplication.run(App.class, args);
  }
}
  • Spring5 부터는 Publisher로 return 이 가능하다.
  • Spring 에 적용할떈 Publisher 만 만들면되고, Subscriber 는 Spring 이 만들어준다.
  • Publisher 만 만들면 Spring 이 원하는 방식과 시점에 Subscriber 를 만들어 Publisher 에게 데이터를 요청한다.
  • Publisher 에 정의된 request 역시 return 시 Spring 이 알아서 request 를 호출한다.

 

 

 

참고


https://www.youtube.com/watch?v=DChIxy9g19o