ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 토비의 봄 TV - CompletableFuture (7)
    Spring/Webflux 2022. 6. 28. 22:20

    https://hyokeun0419.tistory.com/89

     

    CompletableFuture (4)

    자바에서 비동기(Asynchronous) 프로그래밍을 가능케하는 인터페이스이다. Future 인터페이스는 java5부터 java.util.concurrency 패키지에서 비동기의 결과값을 받는 용도로 사용했지만 비동기의 결과값

    hyokeun0419.tistory.com

    https://hyokeun0419.tistory.com/90

     

    CompletableFuture (5)

    지난 시간에 이어 진행해보자. 이번엔 CompletableFuture 를 가지고 여러 작업을 조합하는 방법과 예외를 처리하는 방법에 대해 살펴보자. Future 만 가지고는 특정 작업들을 이어서 처리하는게 힘들었

    hyokeun0419.tistory.com

     

    해당 포스팅은 토비의 봄 TV 유튜브 영상을 참고하여 실습한 내용이다.

    CompletableFuture 의 자세한 설명은 위 포스트팅을 참고하자.

     

    지난번엔 Completion 클래스를 만들고 함수형으로 비동기처리가 될 수 있도록 변경하여 콜백헬 문제를 처리하였다. 

    이번엔 자바8 부터 제공되는 새로운 비동기 자바 프로그래밍 기술인 CompletableFuture 를 이용하여 해당 코드를 변경해보자. 

     

     

     

    CompletableFuture 실습


    runAsync & thenRun

    @Slf4j
    public class CFutureApp {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
          // Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다.
          CompletableFuture
              .runAsync(() -> log.info("runAsync"))
              .thenRun(() -> log.info("thenRun"))
              .thenRun(() -> log.info("thenRun"));
          log.info("exit");
    
          // 별도의 pool을 설정하지 않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다.
          ForkJoinPool.commonPool().shutdown();
          ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
      }
    }
    
    //결과
    22:03:45.495 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - runAsync
    22:03:45.497 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenRun
    22:03:45.495 [main] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exit
    22:03:45.497 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenRun

     

    supplyAsync, thenApply, thenAccept

    @Slf4j
    public class CFutureApp {
      public static void main(String[] args) throws ExecutionException, InterruptedException {
        // Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다.
        CompletableFuture
            .supplyAsync(() -> {
              log.info("supplyAsync");
              return 1;
            })
            // 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다.
            .thenApply(s -> {
              log.info("thenApply {}", s);
              return s + 1;
            })
            // 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다.
            .thenAccept(s -> log.info("thenAccept {}", s));
        log.info("exit");
    
        // 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다.
        ForkJoinPool.commonPool().shutdown();
        ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
      }
    }
    //결과
    22:05:10.103 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - supplyAsync
    22:05:10.103 [main] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exit
    22:05:10.105 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 1
    22:05:10.106 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenAccept 2

     

    thenCompose

    @Slf4j
    public class CFutureApp {
      public static void main(String[] args) throws ExecutionException, InterruptedException {
        // Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다.
        CompletableFuture
            .supplyAsync(() -> {
              log.info("supplyAsync");
              return 1;
            })
            // return이 CompletableFuture인 경우 thenCompose를 사용한다.
            .thenCompose(s -> {
              log.info("thenApply {}", s);
              return CompletableFuture.completedFuture(s + 1);
            })
            // 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다.
            .thenApply(s -> {
              log.info("thenApply {}", s);
              return s + 1;
            })
            // 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다.
            .thenAccept(s -> log.info("thenAccept {}", s));
        log.info("exit");
    
        // 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다.
        ForkJoinPool.commonPool().shutdown();
        ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
      }
    }
    //결과
    22:06:28.763 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - supplyAsync
    22:06:28.763 [main] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exit
    22:06:28.765 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 1
    22:06:28.766 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 2
    22:06:28.766 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenAccept 3

     

    exceptionally

    @Slf4j
    public class CFutureApp {
      public static void main(String[] args) throws ExecutionException, InterruptedException {
        // Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다.
        CompletableFuture
            .supplyAsync(() -> {
              log.info("supplyAsync");
              return 1;
            })
            // return이 CompletableFuture인 경우 thenCompose를 사용한다.
            .thenCompose(s -> {
              log.info("thenApply {}", s);
              if (1 == 1)
                throw new RuntimeException();
              return CompletableFuture.completedFuture(s + 1);
            })
            // 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다.
            .thenApply(s -> {
              log.info("thenApply {}", s);
              return s + 1;
            })
            .exceptionally(e -> {
              log.info("exceptionally");
              return -10;
            })
            // 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다.
            .thenAccept(s -> log.info("thenAccept {}", s));
        log.info("exit");
    
        // 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다.
        ForkJoinPool.commonPool().shutdown();
        ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
      }
    }
    //결과
    22:09:05.923 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - supplyAsync
    22:09:05.923 [main] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exit
    22:09:05.925 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 1
    22:09:05.926 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exceptionally
    22:09:05.926 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenAccept -10

     

    thenApplyAsync

    @Slf4j
    public class CFutureApp {
      public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(10);
    
        // Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다.
        CompletableFuture
            .supplyAsync(() -> {
              log.info("supplyAsync");
              return 1;
            }, es)
            // return이 CompletableFuture인 경우 thenCompose를 사용한다.
            .thenCompose(s -> {
              log.info("thenApply {}", s);
              return CompletableFuture.completedFuture(s + 1);
            })
            // 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다.
            .thenApply(s -> {
              log.info("thenApply {}", s);
              return s + 2;
            })
            // 이 작업은 다른 스레드에서 처리를 하려고 할 때, thenApplyAsync를 사용한다.
            // 스레드의 사용을 더 효율적으로 하고 자원을 더 효율적으로 사용한다.
            // 현재 스레드 풀의 정책에 따라서 새로운 스레드를 할당하거나 대기중인 스레드를 사용한다. (스레드 풀 전략에 따라 다르다.)
            .thenApplyAsync(s -> {
              log.info("thenApply {}", s);
              return s + 3;
            }, es)
            .exceptionally(e -> {
              log.info("exceptionally");
              return -10;
            })
            // 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다.
            .thenAcceptAsync(s -> log.info("thenAccept {}", s), es);
        log.info("exit");
    
        // 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다.
        ForkJoinPool.commonPool().shutdown();
        ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
      }
    }
    //결과
    22:10:17.201 [main] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exit
    22:10:17.201 [pool-1-thread-1] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - supplyAsync
    22:10:17.203 [pool-1-thread-1] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 1
    22:10:17.204 [pool-1-thread-1] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 2
    22:10:17.204 [pool-1-thread-2] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 4
    22:10:17.204 [pool-1-thread-3] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenAccept 7

     

     

     

    ListenableFuture에서 CompletableFuture로 변환


    이전 AsyncRestTemplate 을 사용하여 비동기를 구현하였을때 return 값을 확인해보면 ListenableFuture 로 리턴되는것을 확인 할 수 있다. 이번엔 ListenalbeFuture 를 사용할 것이 아니라 CompletableFuture 를 사용하여 체이닝 방식으로 비동기 작업을 구현할 것이다. 이를 위해선 유틸성 wrapper 메서드를 만들어야 한다. toCF 메서드를 이용하여 ListenableFuture 를 CompletableFuture 로 감싸주어 반환한다.

    @SpringBootApplication
    @EnableAsync
    @Slf4j
    public class Main {
    
      @RestController
      public static class MyController {
        AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
    
        @Autowired
        MyService myService;
    
        static final String URL1 = "http://localhost:8081/service?req={req}";
        static final String URL2 = "http://localhost:8081/service2?req={req}";
    
        @GetMapping("/rest")
        public DeferredResult<String> rest(int idx) {
          DeferredResult<String> dr = new DeferredResult<>();
    
          toCF(rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx))
              .thenCompose(s -> toCF(rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody())))
              .thenCompose(s -> toCF(myService.work(s.getBody())))
              .thenAccept(s -> dr.setResult(s))
              .exceptionally(e -> {
                dr.setErrorResult(e.getMessage());
                return null;
              });
    
          return dr;
        }
    
        <T> CompletableFuture<T> toCF(ListenableFuture<T> lf) {
          CompletableFuture<T> cf = new CompletableFuture<>();
          lf.addCallback(s -> cf.complete(s), e -> cf.completeExceptionally(e));
          return cf;
        }
      }
    
      @Service
      public static class MyService {
        @Async
        public ListenableFuture<String> work(String req) {
          return new AsyncResult<>(req + "/asyncwork");
        }
      }
    
      @Bean
      public ThreadPoolTaskExecutor myThreadPool() {
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        te.setCorePoolSize(1);
        te.setMaxPoolSize(1);
        te.initialize();
        return te;
      }
    
      public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
      }
    }
    

     

     

     

    참고


    Jongmin92 님 블로그

    Toby 님 강의

Designed by Tistory.