-
토비의 봄 TV - AsyncRestTemplate의 콜백 헬과 중복 작업 문제 (6)Spring/Webflux 2022. 6. 24. 21:17
저번시간엔 비동기 논블록킹 작업의 자원을 최소화하는 방법을 다뤘다.
하지만 문제는 중첩된 외부 서비스들을 호출하게되면 콜백의 구조가 복잡해지는 문제 즉, 콜백 헬에 빠지게 되었고 이를 어떻게 개선할 수 있는지 살펴보자.
다시한번 정리해보자면,
- 명령형 스타일의 콜백을 이용했고, 이를 함수형 스타일의 코드로 가독성을 높여보자.
- 콜백 안에 인자는 완료가 된 후 한번 실행되고 끝나 작업이 완료 후 어떤 액션을 하겠다 라는 것들을 구조적으로 제공해줄수 없다.
- 비동기 처리를 할 때 마다 에러를 처리하는 코드가 중복된다.
- 콜백 헬이 일어난다.
Completion 클래스 추가
비동기 작업을 수행해서 ListenableFuture 와 같은 결과를 가져오고, 콜백에 지정한 작업이 완료나 에러가 발생했을때 이후의 처리를 다시 재정의 해주기 위한 클래스
@SpringBootApplication @EnableAsync @Slf4j public class StudyApplication { @RestController public static class MyController { AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1))); @Autowired MyService myService; static final String URL1 = "http://localhost:8081/service?req={req}"; static final String URL2 = "http://localhost:8081/service2?req={req}"; @GetMapping("/rest") public DeferredResult<String> rest(int idx) { DeferredResult<String> dr = new DeferredResult<>(); Completion .from(rt.getForEntity(URL1, String.class, "hello" + idx)) .andAccept(s -> dr.setResult(s.getBody())); /* ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx); f1.addCallback(s -> { ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody()); f2.addCallback(s2 -> { ListenableFuture<String> f3 = myService.work(s2.getBody()); f3.addCallback(s3 -> { dr.setResult(s3); }, e -> { dr.setErrorResult(e.getMessage()); }); }, e -> { dr.setErrorResult(e.getMessage()); }); }, e -> { dr.setErrorResult(e.getMessage()); }); */ return dr; } } public static class Completion { Consumer<ResponseEntity<String>> con; Completion next; public Completion() { } public Completion(Consumer<ResponseEntity<String>> con) { this.con = con; } public static Completion from(ListenableFuture<ResponseEntity<String>> lf) { Completion c = new Completion(); lf.addCallback(s -> { c.complete(s); }, e -> { c.error(e); }); return c; } public void andAccept(Consumer<ResponseEntity<String>> con) { Completion c = new Completion(con); this.next = c; } void complete(ResponseEntity<String> s) { if (next != null) next.run(s); } private void run(ResponseEntity<String> value) { if (con != null) con.accept(value); } private void error(Throwable e) { } } @Service public static class MyService { @Async public ListenableFuture<String> work(String req) { return new AsyncResult<>(req + "/asyncwork"); } } @Bean public ThreadPoolTaskExecutor myThreadPool() { ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor(); te.setCorePoolSize(1); te.setMaxPoolSize(1); te.initialize(); return te; } public static void main(String[] args) { SpringApplication.run(StudyApplication.class, args); } }
andApply 메서드 추가
@SpringBootApplication @EnableAsync @Slf4j public class StudyApplication { @RestController public static class MyController { AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1))); @Autowired MyService myService; static final String URL1 = "http://localhost:8081/service?req={req}"; static final String URL2 = "http://localhost:8081/service2?req={req}"; @GetMapping("/rest") public DeferredResult<String> rest(int idx) { DeferredResult<String> dr = new DeferredResult<>(); Completion .from(rt.getForEntity(URL1, String.class, "hello" + idx)) .andApply(s -> rt.getForEntity(URL2, String.class, s.getBody())) .andAccept(s -> dr.setResult(s.getBody())); /* ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx); f1.addCallback(s -> { ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody()); f2.addCallback(s2 -> { ListenableFuture<String> f3 = myService.work(s2.getBody()); f3.addCallback(s3 -> { dr.setResult(s3); }, e -> { dr.setErrorResult(e.getMessage()); }); }, e -> { dr.setErrorResult(e.getMessage()); }); }, e -> { dr.setErrorResult(e.getMessage()); }); */ return dr; } } public static class Completion { Consumer<ResponseEntity<String>> con; Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn; Completion next; public Completion() { } public Completion(Consumer<ResponseEntity<String>> con) { this.con = con; } public Completion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) { this.fn = fn; } public static Completion from(ListenableFuture<ResponseEntity<String>> lf) { Completion c = new Completion(); lf.addCallback(s -> { c.complete(s); }, e -> { c.error(e); }); return c; } public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) { Completion c = new Completion(fn); this.next = c; return c; } public void andAccept(Consumer<ResponseEntity<String>> con) { Completion c = new Completion(con); this.next = c; } void complete(ResponseEntity<String> s) { if (next != null) next.run(s); } private void run(ResponseEntity<String> value) { if (con != null) con.accept(value); else if (fn != null) { ListenableFuture<ResponseEntity<String>> lf = fn.apply(value); lf.addCallback(s -> complete(s), e -> error(e)); } } private void error(Throwable e) { } } @Service public static class MyService { @Async public ListenableFuture<String> work(String req) { return new AsyncResult<>(req + "/asyncwork"); } } @Bean public ThreadPoolTaskExecutor myThreadPool() { ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor(); te.setCorePoolSize(1); te.setMaxPoolSize(1); te.initialize(); return te; } public static void main(String[] args) { SpringApplication.run(StudyApplication.class, args); } }
AcceptCompletion, AsyncCompletion 클래스 추가
AcceptCompletion
- 결과를 받아서 사용만 하고 끝난다. (Accept 처리)
AsyncCompletion
- 결과를 받아서 또 다른 비동기 작업을 수행하고 그 결과를 반환
@SpringBootApplication @EnableAsync @Slf4j public class StudyApplication { @RestController public static class MyController { AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1))); @Autowired MyService myService; static final String URL1 = "http://localhost:8081/service?req={req}"; static final String URL2 = "http://localhost:8081/service2?req={req}"; @GetMapping("/rest") public DeferredResult<String> rest(int idx) { DeferredResult<String> dr = new DeferredResult<>(); Completion .from(rt.getForEntity(URL1, String.class, "hello" + idx)) .andApply(s -> rt.getForEntity(URL2, String.class, s.getBody())) .andAccept(s -> dr.setResult(s.getBody())); /* ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx); f1.addCallback(s -> { ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody()); f2.addCallback(s2 -> { ListenableFuture<String> f3 = myService.work(s2.getBody()); f3.addCallback(s3 -> { dr.setResult(s3); }, e -> { dr.setErrorResult(e.getMessage()); }); }, e -> { dr.setErrorResult(e.getMessage()); }); }, e -> { dr.setErrorResult(e.getMessage()); }); */ return dr; } } public static class AcceptCompletion extends Completion { Consumer<ResponseEntity<String>> con; public AcceptCompletion(Consumer<ResponseEntity<String>> con) { this.con = con; } @Override public void run(ResponseEntity<String> value) { con.accept(value); } } public static class AsyncCompletion extends Completion { Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn; public AsyncCompletion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) { this.fn = fn; } @Override public void run(ResponseEntity<String> value) { ListenableFuture<ResponseEntity<String>> lf = fn.apply(value); lf.addCallback(s -> complete(s), e -> error(e)); } } public static class Completion { Completion next; public static Completion from(ListenableFuture<ResponseEntity<String>> lf) { Completion c = new Completion(); lf.addCallback(s -> { c.complete(s); }, e -> { c.error(e); }); return c; } public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) { Completion c = new AsyncCompletion(fn); this.next = c; return c; } public void andAccept(Consumer<ResponseEntity<String>> con) { Completion c = new AcceptCompletion(con); this.next = c; } public void complete(ResponseEntity<String> s) { if (next != null) next.run(s); } public void run(ResponseEntity<String> value) { } public void error(Throwable e) { } } @Service public static class MyService { @Async public ListenableFuture<String> work(String req) { return new AsyncResult<>(req + "/asyncwork"); } } @Bean public ThreadPoolTaskExecutor myThreadPool() { ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor(); te.setCorePoolSize(1); te.setMaxPoolSize(1); te.initialize(); return te; } public static void main(String[] args) { SpringApplication.run(StudyApplication.class, args); } }
ErrorCompletion 클래스 추가
@SpringBootApplication @EnableAsync @Slf4j public class StudyApplication { @RestController public static class MyController { AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1))); @Autowired MyService myService; static final String URL1 = "http://localhost:8081/service?req={req}"; static final String URL2 = "http://localhost:8081/service2?req={req}"; @GetMapping("/rest") public DeferredResult<String> rest(int idx) { DeferredResult<String> dr = new DeferredResult<>(); Completion .from(rt.getForEntity(URL1, String.class, "hello" + idx)) .andApply(s -> rt.getForEntity(URL2, String.class, s.getBody())) .andError(e -> dr.setErrorResult(e)) .andAccept(s -> dr.setResult(s.getBody())); /* ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx); f1.addCallback(s -> { ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody()); f2.addCallback(s2 -> { ListenableFuture<String> f3 = myService.work(s2.getBody()); f3.addCallback(s3 -> { dr.setResult(s3); }, e -> { dr.setErrorResult(e.getMessage()); }); }, e -> { dr.setErrorResult(e.getMessage()); }); }, e -> { dr.setErrorResult(e.getMessage()); }); */ return dr; } } public static class AcceptCompletion extends Completion { Consumer<ResponseEntity<String>> con; public AcceptCompletion(Consumer<ResponseEntity<String>> con) { this.con = con; } @Override public void run(ResponseEntity<String> value) { con.accept(value); } } public static class ErrorCompletion extends Completion { Consumer<Throwable> econ; public ErrorCompletion(Consumer<Throwable> econ) { this.econ = econ; } @Override public void run(ResponseEntity<String> value) { if (next != null) { next.run(value); } } @Override public void error(Throwable e) { econ.accept(e); } } public static class AsyncCompletion extends Completion { Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn; public AsyncCompletion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) { this.fn = fn; } @Override public void run(ResponseEntity<String> value) { ListenableFuture<ResponseEntity<String>> lf = fn.apply(value); lf.addCallback(s -> complete(s), e -> error(e)); } } public static class Completion { Completion next; public static Completion from(ListenableFuture<ResponseEntity<String>> lf) { Completion c = new Completion(); lf.addCallback(s -> { c.complete(s); }, e -> { c.error(e); }); return c; } public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) { Completion c = new AsyncCompletion(fn); this.next = c; return c; } public Completion andError(Consumer<Throwable> econ) { Completion c = new ErrorCompletion(econ); this.next = c; return c; } public void andAccept(Consumer<ResponseEntity<String>> con) { Completion c = new AcceptCompletion(con); this.next = c; } public void complete(ResponseEntity<String> s) { if (next != null) next.run(s); } public void run(ResponseEntity<String> value) { } public void error(Throwable e) { if (next != null) next.error(e); } } @Service public static class MyService { @Async public ListenableFuture<String> work(String req) { return new AsyncResult<>(req + "/asyncwork"); } } @Bean public ThreadPoolTaskExecutor myThreadPool() { ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor(); te.setCorePoolSize(1); te.setMaxPoolSize(1); te.initialize(); return te; } public static void main(String[] args) { SpringApplication.run(StudyApplication.class, args); } }
Generic 적용
@SpringBootApplication @EnableAsync @Slf4j public class StudyApplication { @RestController public static class MyController { AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1))); @Autowired MyService myService; static final String URL1 = "http://localhost:8081/service?req={req}"; static final String URL2 = "http://localhost:8081/service2?req={req}"; @GetMapping("/rest") public DeferredResult<String> rest(int idx) { DeferredResult<String> dr = new DeferredResult<>(); Completion .from(rt.getForEntity(URL1, String.class, "hello" + idx)) .andApply(s -> rt.getForEntity(URL2, String.class, s.getBody())) .andApply(s -> myService.work(s.getBody())) .andError(e -> dr.setErrorResult(e.toString())) .andAccept(s -> dr.setResult(s)); /* ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx); f1.addCallback(s -> { ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody()); f2.addCallback(s2 -> { ListenableFuture<String> f3 = myService.work(s2.getBody()); f3.addCallback(s3 -> { dr.setResult(s3); }, e -> { dr.setErrorResult(e.getMessage()); }); }, e -> { dr.setErrorResult(e.getMessage()); }); }, e -> { dr.setErrorResult(e.getMessage()); }); */ return dr; } } public static class AcceptCompletion<S> extends Completion<S, Void> { Consumer<S> con; public AcceptCompletion(Consumer<S> con) { this.con = con; } @Override public void run(S value) { con.accept(value); } } public static class ErrorCompletion<T> extends Completion<T, T> { Consumer<Throwable> econ; public ErrorCompletion(Consumer<Throwable> econ) { this.econ = econ; } @Override public void run(T value) { if (next != null) { next.run(value); } } @Override public void error(Throwable e) { econ.accept(e); } } public static class AsyncCompletion<S, T> extends Completion<S, T> { Function<S, ListenableFuture<T>> fn; public AsyncCompletion(Function<S, ListenableFuture<T>> fn) { this.fn = fn; } @Override public void run(S value) { ListenableFuture<T> lf = fn.apply(value); lf.addCallback(s -> complete(s), e -> error(e)); } } // S는 넘어온 파라미터, T는 결과 public static class Completion<S, T> { Completion next; public static <S, T> Completion<S, T> from(ListenableFuture<T> lf) { Completion<S, T> c = new Completion<>(); lf.addCallback(s -> { c.complete(s); }, e -> { c.error(e); }); return c; } public <V> Completion<T, V> andApply(Function<T, ListenableFuture<V>> fn) { Completion<T, V> c = new AsyncCompletion<>(fn); this.next = c; return c; } public Completion<T, T> andError(Consumer<Throwable> econ) { Completion<T, T> c = new ErrorCompletion<>(econ); this.next = c; return c; } public void andAccept(Consumer<T> con) { Completion<T, Void> c = new AcceptCompletion<>(con); this.next = c; } public void complete(T s) { if (next != null) next.run(s); } public void run(S value) { } public void error(Throwable e) { if (next != null) next.error(e); } } @Service public static class MyService { @Async public ListenableFuture<String> work(String req) { return new AsyncResult<>(req + "/asyncwork"); } } @Bean public ThreadPoolTaskExecutor myThreadPool() { ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor(); te.setCorePoolSize(1); te.setMaxPoolSize(1); te.initialize(); return te; } public static void main(String[] args) { SpringApplication.run(StudyApplication.class, args); } }
참고
Jongmin92 님 블로그 참고
Toby 님 강의 참고
'Spring > Webflux' 카테고리의 다른 글
Thread, Future, CompletableFuture 란 (1) 2024.01.27 토비의 봄 TV - CompletableFuture (7) (0) 2022.06.28 토비의 봄 TV - 스프링의 비동기 기술 (4) 2/2 (0) 2022.06.17 토비의 봄 TV - 자바의 비동기 기술 (4) 1/2 (0) 2022.06.17 토비의 봄 TV - Reactive Streams Scheduler (3) (0) 2022.06.14