Observable 형태를 발전시킨 Publisher 와 Subscriber 를 만드는 방식을 이전 시간에 살펴보았다.
이번엔 이를 조금 더 발전 시켜서 Operator 를 추가하는 방법과 동작방식을 살펴보고 실제 Reactive Strems 의 구현체인 Reactor 까지 사용해보자.
복습
ReactiveStreams 는 아래 기능들을 제공한다.
- Publisher
- Subscriber
- Subscription
- Processor
Publisher 가 가장 중요한데, 데이터스트림을 계속해서 만들어내는 Provider 역할을 한다. 아래 코드를 살펴보자.
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.stream.Collectors;
import java.util.stream.Stream;
class Pubsub {
public static void main(String[] args) {
Publisher<Integer> pub = new Publisher<Integer>() {
Iterable<Integer> iter = Stream.iterate(1, a->a+1)
.limit(10)
.collect(Collectors.toList());
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long l) {
try {
iter.forEach(item -> subscriber.onNext(item));
subscriber.onComplete();
} catch(Throwable ex) {
subscriber.onError(ex);
}
}
@Override
public void cancel() {
}
});
}
};
Subscriber<Integer> sub = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("onSubscribe(subscription) : " + subscription);
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer item) {
System.out.println("onNext() : " + item);
}
@Override
public void onError(Throwable ex) {
System.out.println("onError() : " + ex);
}
@Override
public void onComplete() {
System.out.println("onComplete()");
}
};
pub.subscribe(sub);
}
}
Publisher
- Publisher 인터페이스에는 구현해야할 메서드가 딱 하나 있다. => subscribe()
- 해당 subscribe() 를 호출하여 Subscriber 를 매개변수로 던지면, Publisher 는 해당 Subscriber 구독를 구독 목록에 추가시켜 놓는다.
- subscribe() 을 호출하면 그때부터 데이터를 보내야하는데,가장 먼저 해야할것은 Subscriber 의 onSubscribe() 를 가장 먼저 호출해주어야 한다.
- onSubscribe() 매개변수로 subscription 객체를 만들어 파라미터로 던져주어야 한다.
- subscription 은 Pub, Sub 둘 사이에 구독이 한번 일어나는 액션을 담고있는 것이라 생각하면 된다.
Subscriber
- Subscriber 는 Publisher 로 부터 네가지 메서드 (onSubscribe(), onNext(), onError(), onComplete()) 통해 데이터나 에러를 받고 완료 여부도 알 수 있게 된다.
Operators
여기서 핵심 주제는 Operators 이다.
Operators 란?
- 중간에 데이터가 가공되어지는것 (rxJava, reactor 는 사용하기 편하게 되어있다.)
- 데이터가 전달될때 operator 를 거치면 가공되어 Subscriber 에게 전달되도록 한다.
- Pub → [Data1] → operator1 → [Data2] → operator2 → [Data3] → Sub
여기서 우리는 Operator 를 구현해보자.
- map (mapPub()) 을 만들어보자. [Data1] → func(가공) → [Data2]
- Pub → [Data1] → mapPub → [Data2] → Sub
← subScribe(sub)
→ onSubscribe(subscription)
→ onNext()
→ onNext() ...
→ onComplete()
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class PubSubOperators {
public static void main(String[] args) {
Iterable<Integer> data = Stream.iterate(1, number -> number + 1)
//.filter(number -> number != 0)
.limit(10)
.collect(Collectors.toList());
Publisher<Integer> pub = iterPub(data);
//Subscriber<Integer> sub = logSub();
/* pub 을 mapPub 에 연결, Subsriber 는 mapPub 에 다가 Subscribe 한다. */
/* Subsriber 입장에서는 mapPub 이 Publisher 이다. */
// Publisher<Integer> mapPub = mapPub(pub, (Function<Integer,Integer>) number -> number * 10);
// Publisher<String> mapPub = mapPub(pub, s -> "[" + s + "]");
// Publisher<Integer> map2Pub = mapPub(mapPub, (Function<Integer,Integer>) number -> -number);
/* 어느 Publisher 를 넣어주면 합계를 구해줘서 리턴하는 Publisher 계산을 다 한 후에만 던져야다. */
// Publisher<Integer> sumPub = sumPub(pub);
// Publisher<Integer> reducePub = reducePub(pub, 0, (BiFunction<Integer,Integer,Integer>)(a, b)->a + b);
// Publisher<String> reducePub = reducePub(pub, "", (a, b)->a + "-" + b);
Publisher<StringBuilder> reducePub = reducePub(pub, new StringBuilder(),
(a, b) -> a.append(b + ", "));
reducePub.subscribe(logSub());
}
private static <T, R> Publisher<R> reducePub(Publisher<T> pub, R init, BiFunction<R, T, R> bf) {
return new Publisher<R>() {
@Override
public void subscribe(Subscriber<? super R> subscriber) {
pub.subscribe(new DelegateSub<T, R>(subscriber) {
R result = init;
@Override
public void onNext(T i) {
result = bf.apply(result, i);
}
@Override
public void onComplete() {
subscriber.onNext(result);
subscriber.onComplete();
}
});
}
};
}
// private static Publisher<Integer> sumPub(Publisher<Integer> pub) {
// return new Publisher<Integer>() {
//
// int sum = 0;
// @Override
// public void subscribe(Subscriber<? super Integer> sub) {
// pub.subscribe(new DelegateSub<Integer>(sub){
// // 결과만 한번 넘겨줘야 하니까 onNext() 로 넘기면 안된다.
//
// @Override
// public void onNext(Integer i) {
// sum += (Integer)i;
// }
//
// @Override
// public void onComplete() {
// sub.onNext(sum);// next 로 최종결과 한번만 넘겨준다.
// sub.onComplete();
// }
// });
// }
// };
// }
private static <T, R> Publisher<R> mapPub(Publisher<T> pub, Function<T, R> f) {
return new Publisher<R>() {
@Override
public void subscribe(Subscriber<? super R> sub) {
// subscriber 를 새로 만들어 중계 채널로 사용한다,
// 기존의 sub 에게 전달.
pub.subscribe(new DelegateSub<T, R>(sub){
// 여기서 단순하게 위임하는 코드 만들고 싶으면 오버라이드 하면된다.
@Override
public void onNext(T i) {
sub.onNext(f.apply(i));
}
});
}
};
}
private static Publisher<Integer> iterPub(Iterable<Integer> data) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long l) {
try {
data.forEach(item -> subscriber.onNext(item));
subscriber.onComplete();
} catch(Throwable ex) {
subscriber.onError(ex);
}
}
@Override
public void cancel() {
}
});
}
};
}
private static <T> Subscriber<T> logSub() {
return new Subscriber<T>() {
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("onSubscribe(subscription) : " + subscription);
subscription.request(10);
}
@Override
public void onNext(T integer) {
System.out.println("onNext() : " + integer);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError() : " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete()");
}
};
}
}
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class DelegateSub<T, R> implements Subscriber<T> {
Subscriber sub;
public DelegateSub(Subscriber<? super R> sub) {
this.sub = sub;
}
@Override
public void onSubscribe(Subscription subscription) {
sub.onSubscribe(subscription);
}
@Override
public void onNext(T i) {
sub.onNext(i);
}
@Override
public void onError(Throwable throwable) {
sub.onError(throwable);
}
@Override
public void onComplete() {
sub.onComplete();
}
}
- 위 실습을 통해 Operators 구조가 어떤식으로 동작하는지 알 수 있어야 한다.
- Publisher Subscription Subscriber 구조로 ReactiveStreams 구조가 만들어지고 그 사이에 Operators 를 끼워넣을때 어떤식으로 가공되는지를 알 수 있다.
Reactor
간단한 reactor 실습을 진행해보자.
아래 코드는 위에서 직접구현한 Operator 코드를 reactor 에서 제공해주는 Operator 를 사용하여 구현한것이다.
import reactor.core.publisher.Flux;
public class ReactorEx {
public static void main(String[] args) {
Flux.<Integer>create(e -> {
e.next(1);
e.next(2);
e.next(3);
e.complete();
})
.log()
.map(s -> s * 10)
.reduce(0, (a,b) -> a+b)
.log()
.subscribe(System.out::println);
}
}
- create( ) 안에서 기존 subscription 에 구현했었던 코드를 간단하게 만들 수 있다.
- 내부 동작이 어떻게 호출되고 있는지 확인하고 싶으면 log( ) 를 사용하자.
- subscribe( ) 가 호출되면 create( ) 를 타고 올라가 내부적으로 onSubscribe( ), request( ), onNext( ), onComplete( ) 가 호출된다.
- 이 외에도 다양한 reactor 는 Operator 메서드를 지원한다.
Spring 에 적용해보기
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
public class App {
@RestController
public static class Controller {
@RequestMapping("/hello")
public Publisher<Integer> hello(Integer name) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long l) {
subscriber.onNext(name);
subscriber.onComplete();
}
@Override
public void cancel() {
}
});
}
};
}
}
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
- Spring5 부터는 Publisher로 return 이 가능하다.
- Spring 에 적용할떈 Publisher 만 만들면되고, Subscriber 는 Spring 이 만들어준다.
- Publisher 만 만들면 Spring 이 원하는 방식과 시점에 Subscriber 를 만들어 Publisher 에게 데이터를 요청한다.
- Publisher 에 정의된 request 역시 return 시 Spring 이 알아서 request 를 호출한다.
참고
'Spring > Webflux' 카테고리의 다른 글
토비의 봄 TV - AsyncRestTemplate의 콜백 헬과 중복 작업 문제 (6) (0) | 2022.06.24 |
---|---|
토비의 봄 TV - 스프링의 비동기 기술 (4) 2/2 (0) | 2022.06.17 |
토비의 봄 TV - 자바의 비동기 기술 (4) 1/2 (0) | 2022.06.17 |
토비의 봄 TV - Reactive Streams Scheduler (3) (0) | 2022.06.14 |
토비의 봄 TV - Reactive Streams (1) (0) | 2022.06.04 |