ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 토비의 봄 TV - AsyncRestTemplate의 콜백 헬과 중복 작업 문제 (6)
    Spring/Webflux 2022. 6. 24. 21:17

    저번시간엔 비동기 논블록킹 작업의 자원을 최소화하는 방법을 다뤘다. 

    하지만 문제는 중첩된 외부 서비스들을 호출하게되면 콜백의 구조가 복잡해지는 문제 즉, 콜백 헬에 빠지게 되었고 이를 어떻게 개선할 수 있는지 살펴보자.

     

    다시한번 정리해보자면,

    • 명령형 스타일의 콜백을 이용했고, 이를 함수형 스타일의 코드로 가독성을 높여보자.
    • 콜백 안에 인자는 완료가 된 후 한번 실행되고 끝나 작업이 완료 후 어떤 액션을 하겠다 라는 것들을 구조적으로 제공해줄수 없다.
    • 비동기 처리를 할 때 마다 에러를 처리하는 코드가 중복된다.
    • 콜백 헬이 일어난다.

     

     

     

    Completion 클래스 추가


    비동기 작업을 수행해서 ListenableFuture 와 같은 결과를 가져오고, 콜백에 지정한 작업이 완료나 에러가 발생했을때 이후의 처리를 다시 재정의 해주기 위한 클래스

    @SpringBootApplication
    @EnableAsync
    @Slf4j
    public class StudyApplication {
    
        @RestController
        public static class MyController {
            AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
    
            @Autowired
            MyService myService;
    
            static final String URL1 = "http://localhost:8081/service?req={req}";
            static final String URL2 = "http://localhost:8081/service2?req={req}";
    
            @GetMapping("/rest")
            public DeferredResult<String> rest(int idx) {
                DeferredResult<String> dr = new DeferredResult<>();
    
                Completion
                        .from(rt.getForEntity(URL1, String.class, "hello" + idx))
                        .andAccept(s -> dr.setResult(s.getBody()));
    
                /*
                ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
                f1.addCallback(s -> {
                    ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
                    f2.addCallback(s2 -> {
                        ListenableFuture<String> f3 = myService.work(s2.getBody());
                        f3.addCallback(s3 -> {
                            dr.setResult(s3);
                        }, e -> {
                            dr.setErrorResult(e.getMessage());
                        });
                    }, e -> {
                        dr.setErrorResult(e.getMessage());
                    });
                }, e -> {
                    dr.setErrorResult(e.getMessage());
                });
                */
    
                return dr;
            }
        }
    
        public static class Completion {
    
            Consumer<ResponseEntity<String>> con;
    
            Completion next;
    
            public Completion() {
            }
    
            public Completion(Consumer<ResponseEntity<String>> con) {
                this.con = con;
            }
    
            public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
                Completion c = new Completion();
                lf.addCallback(s -> {
                    c.complete(s);
                }, e -> {
                    c.error(e);
                });
                return c;
            }
    
            public void andAccept(Consumer<ResponseEntity<String>> con) {
                Completion c = new Completion(con);
                this.next = c;
            }
    
            void complete(ResponseEntity<String> s) {
                if (next != null) next.run(s);
            }
    
            private void run(ResponseEntity<String> value) {
                if (con != null) con.accept(value);
            }
    
            private void error(Throwable e) {
            }
        }
    
        @Service
        public static class MyService {
            @Async
            public ListenableFuture<String> work(String req) {
                return new AsyncResult<>(req + "/asyncwork");
            }
        }
    
        @Bean
        public ThreadPoolTaskExecutor myThreadPool() {
            ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
            te.setCorePoolSize(1);
            te.setMaxPoolSize(1);
            te.initialize();
            return te;
        }
    
        public static void main(String[] args) {
            SpringApplication.run(StudyApplication.class, args);
        }
    }

     

     

     

    andApply 메서드 추가


    @SpringBootApplication
    @EnableAsync
    @Slf4j
    public class StudyApplication {
    
        @RestController
        public static class MyController {
            AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
    
            @Autowired
            MyService myService;
    
            static final String URL1 = "http://localhost:8081/service?req={req}";
            static final String URL2 = "http://localhost:8081/service2?req={req}";
    
            @GetMapping("/rest")
            public DeferredResult<String> rest(int idx) {
                DeferredResult<String> dr = new DeferredResult<>();
    
                Completion
                        .from(rt.getForEntity(URL1, String.class, "hello" + idx))
                        .andApply(s -> rt.getForEntity(URL2, String.class, s.getBody()))
                        .andAccept(s -> dr.setResult(s.getBody()));
    
                /*
                ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
                f1.addCallback(s -> {
                    ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
                    f2.addCallback(s2 -> {
                        ListenableFuture<String> f3 = myService.work(s2.getBody());
                        f3.addCallback(s3 -> {
                            dr.setResult(s3);
                        }, e -> {
                            dr.setErrorResult(e.getMessage());
                        });
                    }, e -> {
                        dr.setErrorResult(e.getMessage());
                    });
                }, e -> {
                    dr.setErrorResult(e.getMessage());
                });
                */
    
                return dr;
            }
        }
    
        public static class Completion {
    
            Consumer<ResponseEntity<String>> con;
    
            Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn;
    
            Completion next;
    
            public Completion() {
            }
    
            public Completion(Consumer<ResponseEntity<String>> con) {
                this.con = con;
            }
    
            public Completion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
                this.fn = fn;
            }
    
            public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
                Completion c = new Completion();
                lf.addCallback(s -> {
                    c.complete(s);
                }, e -> {
                    c.error(e);
                });
                return c;
            }
    
            public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
                Completion c = new Completion(fn);
                this.next = c;
                return c;
            }
    
            public void andAccept(Consumer<ResponseEntity<String>> con) {
                Completion c = new Completion(con);
                this.next = c;
            }
    
            void complete(ResponseEntity<String> s) {
                if (next != null) next.run(s);
            }
    
            private void run(ResponseEntity<String> value) {
                if (con != null) con.accept(value);
                else if (fn != null) {
                    ListenableFuture<ResponseEntity<String>> lf = fn.apply(value);
                    lf.addCallback(s -> complete(s), e -> error(e));
                }
            }
    
            private void error(Throwable e) {
            }
        }
    
        @Service
        public static class MyService {
            @Async
            public ListenableFuture<String> work(String req) {
                return new AsyncResult<>(req + "/asyncwork");
            }
        }
    
        @Bean
        public ThreadPoolTaskExecutor myThreadPool() {
            ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
            te.setCorePoolSize(1);
            te.setMaxPoolSize(1);
            te.initialize();
            return te;
        }
    
        public static void main(String[] args) {
            SpringApplication.run(StudyApplication.class, args);
        }
    }

     

     

     

    AcceptCompletion, AsyncCompletion 클래스 추가


    AcceptCompletion

    • 결과를 받아서 사용만 하고 끝난다. (Accept 처리)

    AsyncCompletion

    • 결과를 받아서 또 다른 비동기 작업을 수행하고 그 결과를 반환
    @SpringBootApplication
    @EnableAsync
    @Slf4j
    public class StudyApplication {
    
        @RestController
        public static class MyController {
            AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
    
            @Autowired
            MyService myService;
    
            static final String URL1 = "http://localhost:8081/service?req={req}";
            static final String URL2 = "http://localhost:8081/service2?req={req}";
    
            @GetMapping("/rest")
            public DeferredResult<String> rest(int idx) {
                DeferredResult<String> dr = new DeferredResult<>();
    
                Completion
                        .from(rt.getForEntity(URL1, String.class, "hello" + idx))
                        .andApply(s -> rt.getForEntity(URL2, String.class, s.getBody()))
                        .andAccept(s -> dr.setResult(s.getBody()));
    
                /*
                ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
                f1.addCallback(s -> {
                    ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
                    f2.addCallback(s2 -> {
                        ListenableFuture<String> f3 = myService.work(s2.getBody());
                        f3.addCallback(s3 -> {
                            dr.setResult(s3);
                        }, e -> {
                            dr.setErrorResult(e.getMessage());
                        });
                    }, e -> {
                        dr.setErrorResult(e.getMessage());
                    });
                }, e -> {
                    dr.setErrorResult(e.getMessage());
                });
                */
    
                return dr;
            }
        }
        
        public static class AcceptCompletion extends Completion {
            Consumer<ResponseEntity<String>> con;
    
            public AcceptCompletion(Consumer<ResponseEntity<String>> con) {
                this.con = con;
            }
    
            @Override
            public void run(ResponseEntity<String> value) {
                con.accept(value);
            }
        }
    
        public static class AsyncCompletion extends Completion {
            Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn;
    
            public AsyncCompletion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
                this.fn = fn;
            }
    
            @Override
            public void run(ResponseEntity<String> value) {
                ListenableFuture<ResponseEntity<String>> lf = fn.apply(value);
                lf.addCallback(s -> complete(s), e -> error(e));
            }
        }
    
        public static class Completion {
            Completion next;
    
            public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
                Completion c = new Completion();
                lf.addCallback(s -> {
                    c.complete(s);
                }, e -> {
                    c.error(e);
                });
                return c;
            }
    
            public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
                Completion c = new AsyncCompletion(fn);
                this.next = c;
                return c;
            }
    
            public void andAccept(Consumer<ResponseEntity<String>> con) {
                Completion c = new AcceptCompletion(con);
                this.next = c;
            }
    
            public void complete(ResponseEntity<String> s) {
                if (next != null) next.run(s);
            }
    
            public void run(ResponseEntity<String> value) {
            }
    
            public void error(Throwable e) {
            }
        }
    
        @Service
        public static class MyService {
            @Async
            public ListenableFuture<String> work(String req) {
                return new AsyncResult<>(req + "/asyncwork");
            }
        }
    
        @Bean
        public ThreadPoolTaskExecutor myThreadPool() {
            ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
            te.setCorePoolSize(1);
            te.setMaxPoolSize(1);
            te.initialize();
            return te;
        }
    
        public static void main(String[] args) {
            SpringApplication.run(StudyApplication.class, args);
        }
    }

     

     

     

    ErrorCompletion 클래스 추가


    @SpringBootApplication
    @EnableAsync
    @Slf4j
    public class StudyApplication {
    
        @RestController
        public static class MyController {
            AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
    
            @Autowired
            MyService myService;
    
            static final String URL1 = "http://localhost:8081/service?req={req}";
            static final String URL2 = "http://localhost:8081/service2?req={req}";
    
            @GetMapping("/rest")
            public DeferredResult<String> rest(int idx) {
                DeferredResult<String> dr = new DeferredResult<>();
    
                Completion
                        .from(rt.getForEntity(URL1, String.class, "hello" + idx))
                        .andApply(s -> rt.getForEntity(URL2, String.class, s.getBody()))
                        .andError(e -> dr.setErrorResult(e))
                        .andAccept(s -> dr.setResult(s.getBody()));
    
                /*
                ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
                f1.addCallback(s -> {
                    ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
                    f2.addCallback(s2 -> {
                        ListenableFuture<String> f3 = myService.work(s2.getBody());
                        f3.addCallback(s3 -> {
                            dr.setResult(s3);
                        }, e -> {
                            dr.setErrorResult(e.getMessage());
                        });
                    }, e -> {
                        dr.setErrorResult(e.getMessage());
                    });
                }, e -> {
                    dr.setErrorResult(e.getMessage());
                });
                */
    
                return dr;
            }
        }
    
        public static class AcceptCompletion extends Completion {
            Consumer<ResponseEntity<String>> con;
    
            public AcceptCompletion(Consumer<ResponseEntity<String>> con) {
                this.con = con;
            }
    
            @Override
            public void run(ResponseEntity<String> value) {
                con.accept(value);
            }
        }
    
        public static class ErrorCompletion extends Completion {
            Consumer<Throwable> econ;
    
            public ErrorCompletion(Consumer<Throwable> econ) {
                this.econ = econ;
            }
    
            @Override
            public void run(ResponseEntity<String> value) {
                if (next != null) {
                    next.run(value);
                }
            }
    
            @Override
            public void error(Throwable e) {
                econ.accept(e);
            }
        }
    
        public static class AsyncCompletion extends Completion {
            Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn;
    
            public AsyncCompletion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
                this.fn = fn;
            }
    
            @Override
            public void run(ResponseEntity<String> value) {
                ListenableFuture<ResponseEntity<String>> lf = fn.apply(value);
                lf.addCallback(s -> complete(s), e -> error(e));
            }
        }
    
        public static class Completion {
            Completion next;
    
            public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
                Completion c = new Completion();
                lf.addCallback(s -> {
                    c.complete(s);
                }, e -> {
                    c.error(e);
                });
                return c;
            }
    
            public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> fn) {
                Completion c = new AsyncCompletion(fn);
                this.next = c;
                return c;
            }
    
            public Completion andError(Consumer<Throwable> econ) {
                Completion c = new ErrorCompletion(econ);
                this.next = c;
                return c;
            }
    
            public void andAccept(Consumer<ResponseEntity<String>> con) {
                Completion c = new AcceptCompletion(con);
                this.next = c;
            }
    
            public void complete(ResponseEntity<String> s) {
                if (next != null) next.run(s);
            }
    
            public void run(ResponseEntity<String> value) {
            }
    
            public void error(Throwable e) {
                if (next != null) next.error(e);
            }
        }
    
        @Service
        public static class MyService {
            @Async
            public ListenableFuture<String> work(String req) {
                return new AsyncResult<>(req + "/asyncwork");
            }
        }
    
        @Bean
        public ThreadPoolTaskExecutor myThreadPool() {
            ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
            te.setCorePoolSize(1);
            te.setMaxPoolSize(1);
            te.initialize();
            return te;
        }
    
        public static void main(String[] args) {
            SpringApplication.run(StudyApplication.class, args);
        }
    }

     

     

     

    Generic 적용


    @SpringBootApplication
    @EnableAsync
    @Slf4j
    public class StudyApplication {
    
        @RestController
        public static class MyController {
            AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
    
            @Autowired
            MyService myService;
    
            static final String URL1 = "http://localhost:8081/service?req={req}";
            static final String URL2 = "http://localhost:8081/service2?req={req}";
    
            @GetMapping("/rest")
            public DeferredResult<String> rest(int idx) {
                DeferredResult<String> dr = new DeferredResult<>();
    
                Completion
                        .from(rt.getForEntity(URL1, String.class, "hello" + idx))
                        .andApply(s -> rt.getForEntity(URL2, String.class, s.getBody()))
                        .andApply(s -> myService.work(s.getBody()))
                        .andError(e -> dr.setErrorResult(e.toString()))
                        .andAccept(s -> dr.setResult(s));
    
                /*
                ListenableFuture<ResponseEntity<String>> f1 = rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx);
                f1.addCallback(s -> {
                    ListenableFuture<ResponseEntity<String>> f2 = rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody());
                    f2.addCallback(s2 -> {
                        ListenableFuture<String> f3 = myService.work(s2.getBody());
                        f3.addCallback(s3 -> {
                            dr.setResult(s3);
                        }, e -> {
                            dr.setErrorResult(e.getMessage());
                        });
                    }, e -> {
                        dr.setErrorResult(e.getMessage());
                    });
                }, e -> {
                    dr.setErrorResult(e.getMessage());
                });
                */
    
                return dr;
            }
        }
    
        public static class AcceptCompletion<S> extends Completion<S, Void> {
            Consumer<S> con;
    
            public AcceptCompletion(Consumer<S> con) {
                this.con = con;
            }
    
            @Override
            public void run(S value) {
                con.accept(value);
            }
        }
    
        public static class ErrorCompletion<T> extends Completion<T, T> {
            Consumer<Throwable> econ;
    
            public ErrorCompletion(Consumer<Throwable> econ) {
                this.econ = econ;
            }
    
            @Override
            public void run(T value) {
                if (next != null) {
                    next.run(value);
                }
            }
    
            @Override
            public void error(Throwable e) {
                econ.accept(e);
            }
        }
    
        public static class AsyncCompletion<S, T> extends Completion<S, T> {
            Function<S, ListenableFuture<T>> fn;
    
            public AsyncCompletion(Function<S, ListenableFuture<T>> fn) {
                this.fn = fn;
            }
    
            @Override
            public void run(S value) {
                ListenableFuture<T> lf = fn.apply(value);
                lf.addCallback(s -> complete(s), e -> error(e));
            }
        }
    
        // S는 넘어온 파라미터, T는 결과
        public static class Completion<S, T> {
            Completion next;
    
            public static <S, T> Completion<S, T> from(ListenableFuture<T> lf) {
                Completion<S, T> c = new Completion<>();
                lf.addCallback(s -> {
                    c.complete(s);
                }, e -> {
                    c.error(e);
                });
                return c;
            }
    
            public <V> Completion<T, V> andApply(Function<T, ListenableFuture<V>> fn) {
                Completion<T, V> c = new AsyncCompletion<>(fn);
                this.next = c;
                return c;
            }
    
            public Completion<T, T> andError(Consumer<Throwable> econ) {
                Completion<T, T> c = new ErrorCompletion<>(econ);
                this.next = c;
                return c;
            }
    
            public void andAccept(Consumer<T> con) {
                Completion<T, Void> c = new AcceptCompletion<>(con);
                this.next = c;
            }
    
            public void complete(T s) {
                if (next != null) next.run(s);
            }
    
            public void run(S value) {
            }
    
            public void error(Throwable e) {
                if (next != null) next.error(e);
            }
        }
    
        @Service
        public static class MyService {
            @Async
            public ListenableFuture<String> work(String req) {
                return new AsyncResult<>(req + "/asyncwork");
            }
        }
    
        @Bean
        public ThreadPoolTaskExecutor myThreadPool() {
            ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
            te.setCorePoolSize(1);
            te.setMaxPoolSize(1);
            te.initialize();
            return te;
        }
    
        public static void main(String[] args) {
            SpringApplication.run(StudyApplication.class, args);
        }
    }

     

     

     

    참고


    Jongmin92 님 블로그 참고 

    Toby 님 강의 참고 

     
Designed by Tistory.