Spring/Webflux

토비의 봄 TV - CompletableFuture (7)

개발정리 2022. 6. 28. 22:20

https://hyokeun0419.tistory.com/89

 

CompletableFuture (4)

자바에서 비동기(Asynchronous) 프로그래밍을 가능케하는 인터페이스이다. Future 인터페이스는 java5부터 java.util.concurrency 패키지에서 비동기의 결과값을 받는 용도로 사용했지만 비동기의 결과값

hyokeun0419.tistory.com

https://hyokeun0419.tistory.com/90

 

CompletableFuture (5)

지난 시간에 이어 진행해보자. 이번엔 CompletableFuture 를 가지고 여러 작업을 조합하는 방법과 예외를 처리하는 방법에 대해 살펴보자. Future 만 가지고는 특정 작업들을 이어서 처리하는게 힘들었

hyokeun0419.tistory.com

 

해당 포스팅은 토비의 봄 TV 유튜브 영상을 참고하여 실습한 내용이다.

CompletableFuture 의 자세한 설명은 위 포스트팅을 참고하자.

 

지난번엔 Completion 클래스를 만들고 함수형으로 비동기처리가 될 수 있도록 변경하여 콜백헬 문제를 처리하였다. 

이번엔 자바8 부터 제공되는 새로운 비동기 자바 프로그래밍 기술인 CompletableFuture 를 이용하여 해당 코드를 변경해보자. 

 

 

 

CompletableFuture 실습


runAsync & thenRun

@Slf4j
public class CFutureApp {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
      // Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다.
      CompletableFuture
          .runAsync(() -> log.info("runAsync"))
          .thenRun(() -> log.info("thenRun"))
          .thenRun(() -> log.info("thenRun"));
      log.info("exit");

      // 별도의 pool을 설정하지 않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다.
      ForkJoinPool.commonPool().shutdown();
      ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
  }
}

//결과
22:03:45.495 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - runAsync
22:03:45.497 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenRun
22:03:45.495 [main] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exit
22:03:45.497 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenRun

 

supplyAsync, thenApply, thenAccept

@Slf4j
public class CFutureApp {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
    // Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다.
    CompletableFuture
        .supplyAsync(() -> {
          log.info("supplyAsync");
          return 1;
        })
        // 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다.
        .thenApply(s -> {
          log.info("thenApply {}", s);
          return s + 1;
        })
        // 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다.
        .thenAccept(s -> log.info("thenAccept {}", s));
    log.info("exit");

    // 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다.
    ForkJoinPool.commonPool().shutdown();
    ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
  }
}
//결과
22:05:10.103 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - supplyAsync
22:05:10.103 [main] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exit
22:05:10.105 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 1
22:05:10.106 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenAccept 2

 

thenCompose

@Slf4j
public class CFutureApp {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
    // Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다.
    CompletableFuture
        .supplyAsync(() -> {
          log.info("supplyAsync");
          return 1;
        })
        // return이 CompletableFuture인 경우 thenCompose를 사용한다.
        .thenCompose(s -> {
          log.info("thenApply {}", s);
          return CompletableFuture.completedFuture(s + 1);
        })
        // 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다.
        .thenApply(s -> {
          log.info("thenApply {}", s);
          return s + 1;
        })
        // 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다.
        .thenAccept(s -> log.info("thenAccept {}", s));
    log.info("exit");

    // 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다.
    ForkJoinPool.commonPool().shutdown();
    ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
  }
}
//결과
22:06:28.763 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - supplyAsync
22:06:28.763 [main] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exit
22:06:28.765 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 1
22:06:28.766 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 2
22:06:28.766 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenAccept 3

 

exceptionally

@Slf4j
public class CFutureApp {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
    // Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다.
    CompletableFuture
        .supplyAsync(() -> {
          log.info("supplyAsync");
          return 1;
        })
        // return이 CompletableFuture인 경우 thenCompose를 사용한다.
        .thenCompose(s -> {
          log.info("thenApply {}", s);
          if (1 == 1)
            throw new RuntimeException();
          return CompletableFuture.completedFuture(s + 1);
        })
        // 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다.
        .thenApply(s -> {
          log.info("thenApply {}", s);
          return s + 1;
        })
        .exceptionally(e -> {
          log.info("exceptionally");
          return -10;
        })
        // 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다.
        .thenAccept(s -> log.info("thenAccept {}", s));
    log.info("exit");

    // 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다.
    ForkJoinPool.commonPool().shutdown();
    ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
  }
}
//결과
22:09:05.923 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - supplyAsync
22:09:05.923 [main] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exit
22:09:05.925 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 1
22:09:05.926 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exceptionally
22:09:05.926 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenAccept -10

 

thenApplyAsync

@Slf4j
public class CFutureApp {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService es = Executors.newFixedThreadPool(10);

    // Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다.
    CompletableFuture
        .supplyAsync(() -> {
          log.info("supplyAsync");
          return 1;
        }, es)
        // return이 CompletableFuture인 경우 thenCompose를 사용한다.
        .thenCompose(s -> {
          log.info("thenApply {}", s);
          return CompletableFuture.completedFuture(s + 1);
        })
        // 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다.
        .thenApply(s -> {
          log.info("thenApply {}", s);
          return s + 2;
        })
        // 이 작업은 다른 스레드에서 처리를 하려고 할 때, thenApplyAsync를 사용한다.
        // 스레드의 사용을 더 효율적으로 하고 자원을 더 효율적으로 사용한다.
        // 현재 스레드 풀의 정책에 따라서 새로운 스레드를 할당하거나 대기중인 스레드를 사용한다. (스레드 풀 전략에 따라 다르다.)
        .thenApplyAsync(s -> {
          log.info("thenApply {}", s);
          return s + 3;
        }, es)
        .exceptionally(e -> {
          log.info("exceptionally");
          return -10;
        })
        // 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다.
        .thenAcceptAsync(s -> log.info("thenAccept {}", s), es);
    log.info("exit");

    // 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다.
    ForkJoinPool.commonPool().shutdown();
    ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
  }
}
//결과
22:10:17.201 [main] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exit
22:10:17.201 [pool-1-thread-1] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - supplyAsync
22:10:17.203 [pool-1-thread-1] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 1
22:10:17.204 [pool-1-thread-1] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 2
22:10:17.204 [pool-1-thread-2] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 4
22:10:17.204 [pool-1-thread-3] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenAccept 7

 

 

 

ListenableFuture에서 CompletableFuture로 변환


이전 AsyncRestTemplate 을 사용하여 비동기를 구현하였을때 return 값을 확인해보면 ListenableFuture 로 리턴되는것을 확인 할 수 있다. 이번엔 ListenalbeFuture 를 사용할 것이 아니라 CompletableFuture 를 사용하여 체이닝 방식으로 비동기 작업을 구현할 것이다. 이를 위해선 유틸성 wrapper 메서드를 만들어야 한다. toCF 메서드를 이용하여 ListenableFuture 를 CompletableFuture 로 감싸주어 반환한다.

@SpringBootApplication
@EnableAsync
@Slf4j
public class Main {

  @RestController
  public static class MyController {
    AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));

    @Autowired
    MyService myService;

    static final String URL1 = "http://localhost:8081/service?req={req}";
    static final String URL2 = "http://localhost:8081/service2?req={req}";

    @GetMapping("/rest")
    public DeferredResult<String> rest(int idx) {
      DeferredResult<String> dr = new DeferredResult<>();

      toCF(rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx))
          .thenCompose(s -> toCF(rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody())))
          .thenCompose(s -> toCF(myService.work(s.getBody())))
          .thenAccept(s -> dr.setResult(s))
          .exceptionally(e -> {
            dr.setErrorResult(e.getMessage());
            return null;
          });

      return dr;
    }

    <T> CompletableFuture<T> toCF(ListenableFuture<T> lf) {
      CompletableFuture<T> cf = new CompletableFuture<>();
      lf.addCallback(s -> cf.complete(s), e -> cf.completeExceptionally(e));
      return cf;
    }
  }

  @Service
  public static class MyService {
    @Async
    public ListenableFuture<String> work(String req) {
      return new AsyncResult<>(req + "/asyncwork");
    }
  }

  @Bean
  public ThreadPoolTaskExecutor myThreadPool() {
    ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
    te.setCorePoolSize(1);
    te.setMaxPoolSize(1);
    te.initialize();
    return te;
  }

  public static void main(String[] args) {
    SpringApplication.run(Main.class, args);
  }
}

 

 

 

참고


Jongmin92 님 블로그

Toby 님 강의