-
토비의 봄 TV - Reactive Streams Operators (2)Spring/Webflux 2022. 6. 7. 14:42
Observable 형태를 발전시킨 Publisher 와 Subscriber 를 만드는 방식을 이전 시간에 살펴보았다.
이번엔 이를 조금 더 발전 시켜서 Operator 를 추가하는 방법과 동작방식을 살펴보고 실제 Reactive Strems 의 구현체인 Reactor 까지 사용해보자.
복습
ReactiveStreams 는 아래 기능들을 제공한다.
- Publisher
- Subscriber
- Subscription
- Processor
Publisher 가 가장 중요한데, 데이터스트림을 계속해서 만들어내는 Provider 역할을 한다. 아래 코드를 살펴보자.
import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import java.util.stream.Collectors; import java.util.stream.Stream; class Pubsub { public static void main(String[] args) { Publisher<Integer> pub = new Publisher<Integer>() { Iterable<Integer> iter = Stream.iterate(1, a->a+1) .limit(10) .collect(Collectors.toList()); @Override public void subscribe(Subscriber<? super Integer> subscriber) { subscriber.onSubscribe(new Subscription() { @Override public void request(long l) { try { iter.forEach(item -> subscriber.onNext(item)); subscriber.onComplete(); } catch(Throwable ex) { subscriber.onError(ex); } } @Override public void cancel() { } }); } }; Subscriber<Integer> sub = new Subscriber<Integer>() { @Override public void onSubscribe(Subscription subscription) { System.out.println("onSubscribe(subscription) : " + subscription); subscription.request(Long.MAX_VALUE); } @Override public void onNext(Integer item) { System.out.println("onNext() : " + item); } @Override public void onError(Throwable ex) { System.out.println("onError() : " + ex); } @Override public void onComplete() { System.out.println("onComplete()"); } }; pub.subscribe(sub); } }
Publisher
- Publisher 인터페이스에는 구현해야할 메서드가 딱 하나 있다. => subscribe()
- 해당 subscribe() 를 호출하여 Subscriber 를 매개변수로 던지면, Publisher 는 해당 Subscriber 구독를 구독 목록에 추가시켜 놓는다.
- subscribe() 을 호출하면 그때부터 데이터를 보내야하는데,가장 먼저 해야할것은 Subscriber 의 onSubscribe() 를 가장 먼저 호출해주어야 한다.
- onSubscribe() 매개변수로 subscription 객체를 만들어 파라미터로 던져주어야 한다.
- subscription 은 Pub, Sub 둘 사이에 구독이 한번 일어나는 액션을 담고있는 것이라 생각하면 된다.
Subscriber
- Subscriber 는 Publisher 로 부터 네가지 메서드 (onSubscribe(), onNext(), onError(), onComplete()) 통해 데이터나 에러를 받고 완료 여부도 알 수 있게 된다.
Operators
여기서 핵심 주제는 Operators 이다.
Operators 란?
- 중간에 데이터가 가공되어지는것 (rxJava, reactor 는 사용하기 편하게 되어있다.)
- 데이터가 전달될때 operator 를 거치면 가공되어 Subscriber 에게 전달되도록 한다.
- Pub → [Data1] → operator1 → [Data2] → operator2 → [Data3] → Sub
여기서 우리는 Operator 를 구현해보자.
- map (mapPub()) 을 만들어보자. [Data1] → func(가공) → [Data2]
- Pub → [Data1] → mapPub → [Data2] → Sub
← subScribe(sub)
→ onSubscribe(subscription)
→ onNext()
→ onNext() ...
→ onComplete()
import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; public class PubSubOperators { public static void main(String[] args) { Iterable<Integer> data = Stream.iterate(1, number -> number + 1) //.filter(number -> number != 0) .limit(10) .collect(Collectors.toList()); Publisher<Integer> pub = iterPub(data); //Subscriber<Integer> sub = logSub(); /* pub 을 mapPub 에 연결, Subsriber 는 mapPub 에 다가 Subscribe 한다. */ /* Subsriber 입장에서는 mapPub 이 Publisher 이다. */ // Publisher<Integer> mapPub = mapPub(pub, (Function<Integer,Integer>) number -> number * 10); // Publisher<String> mapPub = mapPub(pub, s -> "[" + s + "]"); // Publisher<Integer> map2Pub = mapPub(mapPub, (Function<Integer,Integer>) number -> -number); /* 어느 Publisher 를 넣어주면 합계를 구해줘서 리턴하는 Publisher 계산을 다 한 후에만 던져야다. */ // Publisher<Integer> sumPub = sumPub(pub); // Publisher<Integer> reducePub = reducePub(pub, 0, (BiFunction<Integer,Integer,Integer>)(a, b)->a + b); // Publisher<String> reducePub = reducePub(pub, "", (a, b)->a + "-" + b); Publisher<StringBuilder> reducePub = reducePub(pub, new StringBuilder(), (a, b) -> a.append(b + ", ")); reducePub.subscribe(logSub()); } private static <T, R> Publisher<R> reducePub(Publisher<T> pub, R init, BiFunction<R, T, R> bf) { return new Publisher<R>() { @Override public void subscribe(Subscriber<? super R> subscriber) { pub.subscribe(new DelegateSub<T, R>(subscriber) { R result = init; @Override public void onNext(T i) { result = bf.apply(result, i); } @Override public void onComplete() { subscriber.onNext(result); subscriber.onComplete(); } }); } }; } // private static Publisher<Integer> sumPub(Publisher<Integer> pub) { // return new Publisher<Integer>() { // // int sum = 0; // @Override // public void subscribe(Subscriber<? super Integer> sub) { // pub.subscribe(new DelegateSub<Integer>(sub){ // // 결과만 한번 넘겨줘야 하니까 onNext() 로 넘기면 안된다. // // @Override // public void onNext(Integer i) { // sum += (Integer)i; // } // // @Override // public void onComplete() { // sub.onNext(sum);// next 로 최종결과 한번만 넘겨준다. // sub.onComplete(); // } // }); // } // }; // } private static <T, R> Publisher<R> mapPub(Publisher<T> pub, Function<T, R> f) { return new Publisher<R>() { @Override public void subscribe(Subscriber<? super R> sub) { // subscriber 를 새로 만들어 중계 채널로 사용한다, // 기존의 sub 에게 전달. pub.subscribe(new DelegateSub<T, R>(sub){ // 여기서 단순하게 위임하는 코드 만들고 싶으면 오버라이드 하면된다. @Override public void onNext(T i) { sub.onNext(f.apply(i)); } }); } }; } private static Publisher<Integer> iterPub(Iterable<Integer> data) { return new Publisher<Integer>() { @Override public void subscribe(Subscriber<? super Integer> subscriber) { subscriber.onSubscribe(new Subscription() { @Override public void request(long l) { try { data.forEach(item -> subscriber.onNext(item)); subscriber.onComplete(); } catch(Throwable ex) { subscriber.onError(ex); } } @Override public void cancel() { } }); } }; } private static <T> Subscriber<T> logSub() { return new Subscriber<T>() { @Override public void onSubscribe(Subscription subscription) { System.out.println("onSubscribe(subscription) : " + subscription); subscription.request(10); } @Override public void onNext(T integer) { System.out.println("onNext() : " + integer); } @Override public void onError(Throwable throwable) { System.out.println("onError() : " + throwable); } @Override public void onComplete() { System.out.println("onComplete()"); } }; } }
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; public class DelegateSub<T, R> implements Subscriber<T> { Subscriber sub; public DelegateSub(Subscriber<? super R> sub) { this.sub = sub; } @Override public void onSubscribe(Subscription subscription) { sub.onSubscribe(subscription); } @Override public void onNext(T i) { sub.onNext(i); } @Override public void onError(Throwable throwable) { sub.onError(throwable); } @Override public void onComplete() { sub.onComplete(); } }
- 위 실습을 통해 Operators 구조가 어떤식으로 동작하는지 알 수 있어야 한다.
- Publisher Subscription Subscriber 구조로 ReactiveStreams 구조가 만들어지고 그 사이에 Operators 를 끼워넣을때 어떤식으로 가공되는지를 알 수 있다.
Reactor
간단한 reactor 실습을 진행해보자.
아래 코드는 위에서 직접구현한 Operator 코드를 reactor 에서 제공해주는 Operator 를 사용하여 구현한것이다.
import reactor.core.publisher.Flux; public class ReactorEx { public static void main(String[] args) { Flux.<Integer>create(e -> { e.next(1); e.next(2); e.next(3); e.complete(); }) .log() .map(s -> s * 10) .reduce(0, (a,b) -> a+b) .log() .subscribe(System.out::println); } }
- create( ) 안에서 기존 subscription 에 구현했었던 코드를 간단하게 만들 수 있다.
- 내부 동작이 어떻게 호출되고 있는지 확인하고 싶으면 log( ) 를 사용하자.
- subscribe( ) 가 호출되면 create( ) 를 타고 올라가 내부적으로 onSubscribe( ), request( ), onNext( ), onComplete( ) 가 호출된다.
- 이 외에도 다양한 reactor 는 Operator 메서드를 지원한다.
Spring 에 적용해보기
import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @SpringBootApplication public class App { @RestController public static class Controller { @RequestMapping("/hello") public Publisher<Integer> hello(Integer name) { return new Publisher<Integer>() { @Override public void subscribe(Subscriber<? super Integer> subscriber) { subscriber.onSubscribe(new Subscription() { @Override public void request(long l) { subscriber.onNext(name); subscriber.onComplete(); } @Override public void cancel() { } }); } }; } } public static void main(String[] args) { SpringApplication.run(App.class, args); } }
- Spring5 부터는 Publisher로 return 이 가능하다.
- Spring 에 적용할떈 Publisher 만 만들면되고, Subscriber 는 Spring 이 만들어준다.
- Publisher 만 만들면 Spring 이 원하는 방식과 시점에 Subscriber 를 만들어 Publisher 에게 데이터를 요청한다.
- Publisher 에 정의된 request 역시 return 시 Spring 이 알아서 request 를 호출한다.
참고
'Spring > Webflux' 카테고리의 다른 글
토비의 봄 TV - AsyncRestTemplate의 콜백 헬과 중복 작업 문제 (6) (0) 2022.06.24 토비의 봄 TV - 스프링의 비동기 기술 (4) 2/2 (0) 2022.06.17 토비의 봄 TV - 자바의 비동기 기술 (4) 1/2 (0) 2022.06.17 토비의 봄 TV - Reactive Streams Scheduler (3) (0) 2022.06.14 토비의 봄 TV - Reactive Streams (1) (0) 2022.06.04