저번시간엔 비동기 논블록킹 작업의 자원을 최소화하는 방법을 다뤘다.
하지만 문제는 중첩된 외부 서비스들을 호출하게되면 콜백의 구조가 복잡해지는 문제 즉, 콜백 헬에 빠지게 되었고 이를 어떻게 개선할 수 있는지 살펴보자.
다시한번 정리해보자면,
- 명령형 스타일의 콜백을 이용했고, 이를 함수형 스타일의 코드로 가독성을 높여보자.
- 콜백 안에 인자는 완료가 된 후 한번 실행되고 끝나 작업이 완료 후 어떤 액션을 하겠다 라는 것들을 구조적으로 제공해줄수 없다.
- 비동기 처리를 할 때 마다 에러를 처리하는 코드가 중복된다.
- 콜백 헬이 일어난다.
Completion 클래스 추가
비동기 작업을 수행해서 ListenableFuture 와 같은 결과를 가져오고, 콜백에 지정한 작업이 완료나 에러가 발생했을때 이후의 처리를 다시 재정의 해주기 위한 클래스
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {
@RestController
public static class MyController {
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
@Autowired
MyService myService;
static final String URL1 = "http://localhost:8081/service?req={req}";
static final String URL2 = "http://localhost:8081/service2?req={req}";
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();
Completion
.from(rt.getForEntity(URL1, String.class, "hello" + idx))
.andAccept(s -> dr.setResult(s.getBody()));
/*
ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
f1.addCallback(s -> {
ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
f2.addCallback(s2 -> {
ListenableFuture<String> f3 = myService.work(s2.getBody());
f3.addCallback(s3 -> {
dr.setResult(s3);
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
*/
return dr;
}
}
public static class Completion {
Consumer<ResponseEntity<String>> con;
Completion next;
public Completion() {
}
public Completion(Consumer<ResponseEntity<String>> con) {
this.con = con;
}
public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
Completion c = new Completion();
lf.addCallback(s -> {
c.complete(s);
}, e -> {
c.error(e);
});
return c;
}
public void andAccept(Consumer<ResponseEntity<String>> con) {
Completion c = new Completion(con);
this.next = c;
}
void complete(ResponseEntity<String> s) {
if (next != null) next.run(s);
}
private void run(ResponseEntity<String> value) {
if (con != null) con.accept(value);
}
private void error(Throwable e) {
}
}
@Service
public static class MyService {
@Async
public ListenableFuture<String> work(String req) {
return new AsyncResult<>(req + "/asyncwork");
}
}
@Bean
public ThreadPoolTaskExecutor myThreadPool() {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(1);
te.setMaxPoolSize(1);
te.initialize();
return te;
}
public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}
andApply 메서드 추가
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {
@RestController
public static class MyController {
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
@Autowired
MyService myService;
static final String URL1 = "http://localhost:8081/service?req={req}";
static final String URL2 = "http://localhost:8081/service2?req={req}";
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();
Completion
.from(rt.getForEntity(URL1, String.class, "hello" + idx))
.andApply(s -> rt.getForEntity(URL2, String.class, s.getBody()))
.andAccept(s -> dr.setResult(s.getBody()));
/*
ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
f1.addCallback(s -> {
ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
f2.addCallback(s2 -> {
ListenableFuture<String> f3 = myService.work(s2.getBody());
f3.addCallback(s3 -> {
dr.setResult(s3);
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
*/
return dr;
}
}
public static class Completion {
Consumer<ResponseEntity<String>> con;
Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn;
Completion next;
public Completion() {
}
public Completion(Consumer<ResponseEntity<String>> con) {
this.con = con;
}
public Completion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
this.fn = fn;
}
public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
Completion c = new Completion();
lf.addCallback(s -> {
c.complete(s);
}, e -> {
c.error(e);
});
return c;
}
public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
Completion c = new Completion(fn);
this.next = c;
return c;
}
public void andAccept(Consumer<ResponseEntity<String>> con) {
Completion c = new Completion(con);
this.next = c;
}
void complete(ResponseEntity<String> s) {
if (next != null) next.run(s);
}
private void run(ResponseEntity<String> value) {
if (con != null) con.accept(value);
else if (fn != null) {
ListenableFuture<ResponseEntity<String>> lf = fn.apply(value);
lf.addCallback(s -> complete(s), e -> error(e));
}
}
private void error(Throwable e) {
}
}
@Service
public static class MyService {
@Async
public ListenableFuture<String> work(String req) {
return new AsyncResult<>(req + "/asyncwork");
}
}
@Bean
public ThreadPoolTaskExecutor myThreadPool() {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(1);
te.setMaxPoolSize(1);
te.initialize();
return te;
}
public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}
AcceptCompletion, AsyncCompletion 클래스 추가
AcceptCompletion
- 결과를 받아서 사용만 하고 끝난다. (Accept 처리)
AsyncCompletion
- 결과를 받아서 또 다른 비동기 작업을 수행하고 그 결과를 반환
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {
@RestController
public static class MyController {
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
@Autowired
MyService myService;
static final String URL1 = "http://localhost:8081/service?req={req}";
static final String URL2 = "http://localhost:8081/service2?req={req}";
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();
Completion
.from(rt.getForEntity(URL1, String.class, "hello" + idx))
.andApply(s -> rt.getForEntity(URL2, String.class, s.getBody()))
.andAccept(s -> dr.setResult(s.getBody()));
/*
ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
f1.addCallback(s -> {
ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
f2.addCallback(s2 -> {
ListenableFuture<String> f3 = myService.work(s2.getBody());
f3.addCallback(s3 -> {
dr.setResult(s3);
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
*/
return dr;
}
}
public static class AcceptCompletion extends Completion {
Consumer<ResponseEntity<String>> con;
public AcceptCompletion(Consumer<ResponseEntity<String>> con) {
this.con = con;
}
@Override
public void run(ResponseEntity<String> value) {
con.accept(value);
}
}
public static class AsyncCompletion extends Completion {
Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn;
public AsyncCompletion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
this.fn = fn;
}
@Override
public void run(ResponseEntity<String> value) {
ListenableFuture<ResponseEntity<String>> lf = fn.apply(value);
lf.addCallback(s -> complete(s), e -> error(e));
}
}
public static class Completion {
Completion next;
public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
Completion c = new Completion();
lf.addCallback(s -> {
c.complete(s);
}, e -> {
c.error(e);
});
return c;
}
public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
Completion c = new AsyncCompletion(fn);
this.next = c;
return c;
}
public void andAccept(Consumer<ResponseEntity<String>> con) {
Completion c = new AcceptCompletion(con);
this.next = c;
}
public void complete(ResponseEntity<String> s) {
if (next != null) next.run(s);
}
public void run(ResponseEntity<String> value) {
}
public void error(Throwable e) {
}
}
@Service
public static class MyService {
@Async
public ListenableFuture<String> work(String req) {
return new AsyncResult<>(req + "/asyncwork");
}
}
@Bean
public ThreadPoolTaskExecutor myThreadPool() {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(1);
te.setMaxPoolSize(1);
te.initialize();
return te;
}
public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}
ErrorCompletion 클래스 추가
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {
@RestController
public static class MyController {
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
@Autowired
MyService myService;
static final String URL1 = "http://localhost:8081/service?req={req}";
static final String URL2 = "http://localhost:8081/service2?req={req}";
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();
Completion
.from(rt.getForEntity(URL1, String.class, "hello" + idx))
.andApply(s -> rt.getForEntity(URL2, String.class, s.getBody()))
.andError(e -> dr.setErrorResult(e))
.andAccept(s -> dr.setResult(s.getBody()));
/*
ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
f1.addCallback(s -> {
ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
f2.addCallback(s2 -> {
ListenableFuture<String> f3 = myService.work(s2.getBody());
f3.addCallback(s3 -> {
dr.setResult(s3);
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
*/
return dr;
}
}
public static class AcceptCompletion extends Completion {
Consumer<ResponseEntity<String>> con;
public AcceptCompletion(Consumer<ResponseEntity<String>> con) {
this.con = con;
}
@Override
public void run(ResponseEntity<String> value) {
con.accept(value);
}
}
public static class ErrorCompletion extends Completion {
Consumer<Throwable> econ;
public ErrorCompletion(Consumer<Throwable> econ) {
this.econ = econ;
}
@Override
public void run(ResponseEntity<String> value) {
if (next != null) {
next.run(value);
}
}
@Override
public void error(Throwable e) {
econ.accept(e);
}
}
public static class AsyncCompletion extends Completion {
Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn;
public AsyncCompletion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
this.fn = fn;
}
@Override
public void run(ResponseEntity<String> value) {
ListenableFuture<ResponseEntity<String>> lf = fn.apply(value);
lf.addCallback(s -> complete(s), e -> error(e));
}
}
public static class Completion {
Completion next;
public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
Completion c = new Completion();
lf.addCallback(s -> {
c.complete(s);
}, e -> {
c.error(e);
});
return c;
}
public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
Completion c = new AsyncCompletion(fn);
this.next = c;
return c;
}
public Completion andError(Consumer<Throwable> econ) {
Completion c = new ErrorCompletion(econ);
this.next = c;
return c;
}
public void andAccept(Consumer<ResponseEntity<String>> con) {
Completion c = new AcceptCompletion(con);
this.next = c;
}
public void complete(ResponseEntity<String> s) {
if (next != null) next.run(s);
}
public void run(ResponseEntity<String> value) {
}
public void error(Throwable e) {
if (next != null) next.error(e);
}
}
@Service
public static class MyService {
@Async
public ListenableFuture<String> work(String req) {
return new AsyncResult<>(req + "/asyncwork");
}
}
@Bean
public ThreadPoolTaskExecutor myThreadPool() {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(1);
te.setMaxPoolSize(1);
te.initialize();
return te;
}
public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}
Generic 적용
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {
@RestController
public static class MyController {
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
@Autowired
MyService myService;
static final String URL1 = "http://localhost:8081/service?req={req}";
static final String URL2 = "http://localhost:8081/service2?req={req}";
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();
Completion
.from(rt.getForEntity(URL1, String.class, "hello" + idx))
.andApply(s -> rt.getForEntity(URL2, String.class, s.getBody()))
.andApply(s -> myService.work(s.getBody()))
.andError(e -> dr.setErrorResult(e.toString()))
.andAccept(s -> dr.setResult(s));
/*
ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
f1.addCallback(s -> {
ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
f2.addCallback(s2 -> {
ListenableFuture<String> f3 = myService.work(s2.getBody());
f3.addCallback(s3 -> {
dr.setResult(s3);
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
}, e -> {
dr.setErrorResult(e.getMessage());
});
*/
return dr;
}
}
public static class AcceptCompletion<S> extends Completion<S, Void> {
Consumer<S> con;
public AcceptCompletion(Consumer<S> con) {
this.con = con;
}
@Override
public void run(S value) {
con.accept(value);
}
}
public static class ErrorCompletion<T> extends Completion<T, T> {
Consumer<Throwable> econ;
public ErrorCompletion(Consumer<Throwable> econ) {
this.econ = econ;
}
@Override
public void run(T value) {
if (next != null) {
next.run(value);
}
}
@Override
public void error(Throwable e) {
econ.accept(e);
}
}
public static class AsyncCompletion<S, T> extends Completion<S, T> {
Function<S, ListenableFuture<T>> fn;
public AsyncCompletion(Function<S, ListenableFuture<T>> fn) {
this.fn = fn;
}
@Override
public void run(S value) {
ListenableFuture<T> lf = fn.apply(value);
lf.addCallback(s -> complete(s), e -> error(e));
}
}
// S는 넘어온 파라미터, T는 결과
public static class Completion<S, T> {
Completion next;
public static <S, T> Completion<S, T> from(ListenableFuture<T> lf) {
Completion<S, T> c = new Completion<>();
lf.addCallback(s -> {
c.complete(s);
}, e -> {
c.error(e);
});
return c;
}
public <V> Completion<T, V> andApply(Function<T, ListenableFuture<V>> fn) {
Completion<T, V> c = new AsyncCompletion<>(fn);
this.next = c;
return c;
}
public Completion<T, T> andError(Consumer<Throwable> econ) {
Completion<T, T> c = new ErrorCompletion<>(econ);
this.next = c;
return c;
}
public void andAccept(Consumer<T> con) {
Completion<T, Void> c = new AcceptCompletion<>(con);
this.next = c;
}
public void complete(T s) {
if (next != null) next.run(s);
}
public void run(S value) {
}
public void error(Throwable e) {
if (next != null) next.error(e);
}
}
@Service
public static class MyService {
@Async
public ListenableFuture<String> work(String req) {
return new AsyncResult<>(req + "/asyncwork");
}
}
@Bean
public ThreadPoolTaskExecutor myThreadPool() {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(1);
te.setMaxPoolSize(1);
te.initialize();
return te;
}
public static void main(String[] args) {
SpringApplication.run(StudyApplication.class, args);
}
}
참고
Jongmin92 님 블로그 참고
Toby 님 강의 참고
'Spring > Webflux' 카테고리의 다른 글
Thread, Future, CompletableFuture 란 (1) | 2024.01.27 |
---|---|
토비의 봄 TV - CompletableFuture (7) (0) | 2022.06.28 |
토비의 봄 TV - 스프링의 비동기 기술 (4) 2/2 (0) | 2022.06.17 |
토비의 봄 TV - 자바의 비동기 기술 (4) 1/2 (0) | 2022.06.17 |
토비의 봄 TV - Reactive Streams Scheduler (3) (0) | 2022.06.14 |