-
토비의 봄 TV - CompletableFuture (7)Spring/Webflux 2022. 6. 28. 22:20
https://hyokeun0419.tistory.com/89
https://hyokeun0419.tistory.com/90
해당 포스팅은 토비의 봄 TV 유튜브 영상을 참고하여 실습한 내용이다.
CompletableFuture 의 자세한 설명은 위 포스트팅을 참고하자.
지난번엔 Completion 클래스를 만들고 함수형으로 비동기처리가 될 수 있도록 변경하여 콜백헬 문제를 처리하였다.
이번엔 자바8 부터 제공되는 새로운 비동기 자바 프로그래밍 기술인 CompletableFuture 를 이용하여 해당 코드를 변경해보자.
CompletableFuture 실습
runAsync & thenRun
@Slf4j public class CFutureApp { public static void main(String[] args) throws ExecutionException, InterruptedException { // Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다. CompletableFuture .runAsync(() -> log.info("runAsync")) .thenRun(() -> log.info("thenRun")) .thenRun(() -> log.info("thenRun")); log.info("exit"); // 별도의 pool을 설정하지 않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다. ForkJoinPool.commonPool().shutdown(); ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS); } } //결과 22:03:45.495 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - runAsync 22:03:45.497 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenRun 22:03:45.495 [main] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exit 22:03:45.497 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenRun
supplyAsync, thenApply, thenAccept
@Slf4j public class CFutureApp { public static void main(String[] args) throws ExecutionException, InterruptedException { // Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다. CompletableFuture .supplyAsync(() -> { log.info("supplyAsync"); return 1; }) // 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다. .thenApply(s -> { log.info("thenApply {}", s); return s + 1; }) // 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다. .thenAccept(s -> log.info("thenAccept {}", s)); log.info("exit"); // 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다. ForkJoinPool.commonPool().shutdown(); ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS); } } //결과 22:05:10.103 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - supplyAsync 22:05:10.103 [main] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exit 22:05:10.105 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 1 22:05:10.106 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenAccept 2
thenCompose
@Slf4j public class CFutureApp { public static void main(String[] args) throws ExecutionException, InterruptedException { // Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다. CompletableFuture .supplyAsync(() -> { log.info("supplyAsync"); return 1; }) // return이 CompletableFuture인 경우 thenCompose를 사용한다. .thenCompose(s -> { log.info("thenApply {}", s); return CompletableFuture.completedFuture(s + 1); }) // 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다. .thenApply(s -> { log.info("thenApply {}", s); return s + 1; }) // 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다. .thenAccept(s -> log.info("thenAccept {}", s)); log.info("exit"); // 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다. ForkJoinPool.commonPool().shutdown(); ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS); } } //결과 22:06:28.763 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - supplyAsync 22:06:28.763 [main] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exit 22:06:28.765 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 1 22:06:28.766 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 2 22:06:28.766 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenAccept 3
exceptionally
@Slf4j public class CFutureApp { public static void main(String[] args) throws ExecutionException, InterruptedException { // Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다. CompletableFuture .supplyAsync(() -> { log.info("supplyAsync"); return 1; }) // return이 CompletableFuture인 경우 thenCompose를 사용한다. .thenCompose(s -> { log.info("thenApply {}", s); if (1 == 1) throw new RuntimeException(); return CompletableFuture.completedFuture(s + 1); }) // 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다. .thenApply(s -> { log.info("thenApply {}", s); return s + 1; }) .exceptionally(e -> { log.info("exceptionally"); return -10; }) // 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다. .thenAccept(s -> log.info("thenAccept {}", s)); log.info("exit"); // 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다. ForkJoinPool.commonPool().shutdown(); ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS); } } //결과 22:09:05.923 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - supplyAsync 22:09:05.923 [main] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exit 22:09:05.925 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 1 22:09:05.926 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exceptionally 22:09:05.926 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenAccept -10
thenApplyAsync
@Slf4j public class CFutureApp { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService es = Executors.newFixedThreadPool(10); // Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다. CompletableFuture .supplyAsync(() -> { log.info("supplyAsync"); return 1; }, es) // return이 CompletableFuture인 경우 thenCompose를 사용한다. .thenCompose(s -> { log.info("thenApply {}", s); return CompletableFuture.completedFuture(s + 1); }) // 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다. .thenApply(s -> { log.info("thenApply {}", s); return s + 2; }) // 이 작업은 다른 스레드에서 처리를 하려고 할 때, thenApplyAsync를 사용한다. // 스레드의 사용을 더 효율적으로 하고 자원을 더 효율적으로 사용한다. // 현재 스레드 풀의 정책에 따라서 새로운 스레드를 할당하거나 대기중인 스레드를 사용한다. (스레드 풀 전략에 따라 다르다.) .thenApplyAsync(s -> { log.info("thenApply {}", s); return s + 3; }, es) .exceptionally(e -> { log.info("exceptionally"); return -10; }) // 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다. .thenAcceptAsync(s -> log.info("thenAccept {}", s), es); log.info("exit"); // 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다. ForkJoinPool.commonPool().shutdown(); ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS); } } //결과 22:10:17.201 [main] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exit 22:10:17.201 [pool-1-thread-1] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - supplyAsync 22:10:17.203 [pool-1-thread-1] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 1 22:10:17.204 [pool-1-thread-1] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 2 22:10:17.204 [pool-1-thread-2] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 4 22:10:17.204 [pool-1-thread-3] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenAccept 7
ListenableFuture에서 CompletableFuture로 변환
이전 AsyncRestTemplate 을 사용하여 비동기를 구현하였을때 return 값을 확인해보면 ListenableFuture 로 리턴되는것을 확인 할 수 있다. 이번엔 ListenalbeFuture 를 사용할 것이 아니라 CompletableFuture 를 사용하여 체이닝 방식으로 비동기 작업을 구현할 것이다. 이를 위해선 유틸성 wrapper 메서드를 만들어야 한다. toCF 메서드를 이용하여 ListenableFuture 를 CompletableFuture 로 감싸주어 반환한다.
@SpringBootApplication @EnableAsync @Slf4j public class Main { @RestController public static class MyController { AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1))); @Autowired MyService myService; static final String URL1 = "http://localhost:8081/service?req={req}"; static final String URL2 = "http://localhost:8081/service2?req={req}"; @GetMapping("/rest") public DeferredResult<String> rest(int idx) { DeferredResult<String> dr = new DeferredResult<>(); toCF(rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx)) .thenCompose(s -> toCF(rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody()))) .thenCompose(s -> toCF(myService.work(s.getBody()))) .thenAccept(s -> dr.setResult(s)) .exceptionally(e -> { dr.setErrorResult(e.getMessage()); return null; }); return dr; } <T> CompletableFuture<T> toCF(ListenableFuture<T> lf) { CompletableFuture<T> cf = new CompletableFuture<>(); lf.addCallback(s -> cf.complete(s), e -> cf.completeExceptionally(e)); return cf; } } @Service public static class MyService { @Async public ListenableFuture<String> work(String req) { return new AsyncResult<>(req + "/asyncwork"); } } @Bean public ThreadPoolTaskExecutor myThreadPool() { ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor(); te.setCorePoolSize(1); te.setMaxPoolSize(1); te.initialize(); return te; } public static void main(String[] args) { SpringApplication.run(Main.class, args); } }
참고
Jongmin92 님 블로그
Toby 님 강의
'Spring > Webflux' 카테고리의 다른 글
리액티브 프로그래밍이란 (0) 2024.01.27 Thread, Future, CompletableFuture 란 (1) 2024.01.27 토비의 봄 TV - AsyncRestTemplate의 콜백 헬과 중복 작업 문제 (6) (0) 2022.06.24 토비의 봄 TV - 스프링의 비동기 기술 (4) 2/2 (0) 2022.06.17 토비의 봄 TV - 자바의 비동기 기술 (4) 1/2 (0) 2022.06.17