프로그래밍/Java

CompletableFuture (4)

개발정리 2022. 5. 6. 10:13

자바에서 비동기(Asynchronous) 프로그래밍을 가능케하는 인터페이스이다.  

Future 인터페이스는 java5부터 java.util.concurrency 패키지에서 비동기의 결과값을 받는 용도로 사용했지만 비동기의 결과값을 조합하거나, error를 핸들링할 수가 없었다.

 

자바8부터 CompletableFuture 인터페이스가 소개되었고, Future 인터페이스를 구현함과 동시에 CompletionStage 인터페이스를 구현한다. CompletionStage 는 비동기 연산 Step을 제공해서 계속 체이닝 형태로 조합이 가능하다.

 

CompletionStage 란, 

하나의 비동기 작업을 수행하고 완료가 되었을때 여기에 의존적으로 또 다른 작업을 수행할 수 있도록하는 명령들을 가지고있는 인터페이스

 

 

 

Future 로는 하기 어렵던 작업들


  • Future 를 외부에서 완료시킬 수 없다. 취소하거나, get() 에 타임아웃을 설정할 수는 있다.
  • 블로킹 코드(get())를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다. 
  • 여러 Future 를 조합할 수 없다.
    • ex) Event 정보 가져온 다음 Event 에 참석하는 회원 목록 가져오기
  • 예외처리용 API 를 제공하지 않는다.

[참고] 블로킹이란? 결과값을 가져올때까지 기다림

 

 

 

 

CompletableFuture


  • Implements Future
  • Implements CompletionStage
  • 외부에서 명시적으로 complete 을 시킬 수가 있다.
    • 예를들면, 몇초 이내에 응답이 안오면 기본값으로 세팅하기
  • 명시적으로 Executor 를 만들어 사용하지 않아도 된다.
    • CompletableFuture 만 가지고 비동기적으로 작업을 실행 할 수 있다.
public class App {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = new CompletableFuture<>();
    future.complete("hyokeun");

    System.out.println(future.get());
  }
}
// hyokeun

 

 

 

비동기로 작업 실행하기 


리턴값이 없는 경우:  runAsync()

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
      System.out.println("Hello " + Thread.currentThread().getName());
    });

    future.join(); // unchecked exception 으로 던져주기 때문에 에러처리를 따로 해주지 않아도된다.
    future.get(); // checked exception
  }

 

리턴값이 있는 경우: supplyAsync()

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
      System.out.println("Hello " + Thread.currentThread().getName());
      return "hello thread";
    });

    System.out.println(future.get());
  }

 

콜백 구현

  • .thenApply(Function) 를 이용하여 받은 결과값을 가지고 다른 이벤트를 수행할 수 있다. 
  • Future 로는 get() 호출 전에 콜백이 불가능하였다. 
    • CompletableFuture 을 이용하면 get() 호출 전에 thenApply() 를 이용하여 콜백처리가 가능하다.
 public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
      System.out.println("Hello " + Thread.currentThread().getName());
      return "hello thread";
    }).thenApply((s) -> {
      System.out.println(Thread.currentThread().getName());
      return s.toUpperCase();
    });

    System.out.println(future.get());
}

 

return 이 없는 콜백 구현 

  • .thenAccept(Consumer) 사용
public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
      System.out.println("Hello " + Thread.currentThread().getName());
      return "hello thread";
    }).thenAccept((s) -> { // thenAccept 대신 thenRun 을 사용하면 매개변수 값을 사용하지 않는다.
      System.out.println(Thread.currentThread().getName());
      System.out.println(s.toUpperCase());
    });

    future.get();
  • .thenRun(Runable) 결과값을 참고도 못하고 실행만 콜백만 실행시키고 싶을때 사용
public static void main(String[] args) throws ExecutionException, InterruptedException {
  CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello " + Thread.currentThread().getName());
    return "hello thread";
  }).thenRun(() -> {
    System.out.println(Thread.currentThread().getName());
  });

  future.get();
}

//결과
Hello ForkJoinPool.commonPool-worker-19
ForkJoinPool.commonPool-worker-19

쓰레드 풀을 만들지 않고도 어떻게 동작을 하는거지? 

  •   때문에 가능한 것이다 (java 7)
  • Executor 를 구현한 구현체 중 하나이고, 작업을 Dequeue 방식(맨 마지막에 들어온것이 먼저 나가는 방식) 을 이용하여 자기 쓰레드가 할 일이 없으면 쓰레드가 직접 Dequeue 에서 자기가 할 일을 가져와서 처리를 하는 방식의 프레임워크 
  • 작업 단위를 서브 테스크 스레드에 분산 시켜서 작업을 처리하고 모아서 (join) 결과값을 도출하는 방식이다.
  • 별다른  Executor 를 사용하지 않아도 내부적으로 ForkJoinPool에 있는 commonPool 을 쓰게된다. 
  • 하지만 원한다면 쓰레드 풀을 직접 만들어 줄 수 도 있다. 아래를 살펴보자.

 

원하는 Executor(쓰레드 풀) 을 사용해서 실행할 수도 있다.   

  • 기본은 ForkJoinPool.commonPool() 을 사용하지만,
  • 원한다면  ExecutorService.newFixedThreadPool(n) 을 줄 수 있다.
    • 어디에 줄 수 있느냐면, supplyAync 를 호출할 때 두번째 인자로 줄 수 있고 두번째 인자로 넘겨준 스레드 풀을 이용하여 작업을 처리한다.
    • 또 줄 수 있는곳은 콜백을 실행하는 .thenRunAsync() 에 인자로 주면 된다. 
      • 만약 인자값으로 넘겨주지 않으면 forkjoinpool 사용.
  public static void main(String[] args) throws ExecutionException, InterruptedException {

    ExecutorService executorService = Executors.newFixedThreadPool(4);

    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
      System.out.println(Thread.currentThread().getName());
      return "hello thread";
    }, executorService).thenRunAsync(() -> {
      System.out.println(Thread.currentThread().getName());
    }, executorService);

    future.get();
    executorService.shutdown();
  }
  
//결과  
pool-1-thread-1
pool-1-thread-2

 

 

 

참고


백기선 - 더 자바, Java8

'프로그래밍 > Java' 카테고리의 다른 글

배열 Parallel 정렬  (0) 2022.05.06
CompletableFuture (5)  (0) 2022.05.06
CompletableFuture (2) - Excutors  (0) 2022.05.06
CompletableFuture (1) - 자바 Concurrent 프로그래밍 소개  (0) 2022.05.06
Date와 Time API  (0) 2022.05.05