프로그래밍/Java

CompletableFuture (5)

개발정리 2022. 5. 6. 11:30

지난 시간에 이어 진행해보자. 

이번엔 CompletableFuture 를 가지고 여러 작업을 조합하는 방법과 예외를 처리하는 방법에 대해 살펴보자.

 

Future 만 가지고는 특정 작업들을 이어서 처리하는게 힘들었다. 

(예를들면, 이벤트 정보 가져온 다음 이벤트에 참석하는 회원 목록 가져오기 등.)

콜백을 줄 수 없었기 때문에 비동기적인 작업을 이어서 처리하기가 힘들었던 것이다.

 

 

 

조합하기


thenCompose( )

  • 두 작업이 서로 이어서 실행하도록 조합
public class CompletableFutureStudy {

  public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
      System.out.println("A " + Thread.currentThread().getName());
      return "Hello";
    });
    //hello 의 결과를 받아서 compose 를 통해 특정 작업을 호출하면 결과를 future 로 받을 수 있다.
    CompletableFuture<String> future = hello.thenCompose(CompletableFutureStudy::getWorld);
    System.out.println(future.get());
  }

  private static CompletableFuture<String>  getWorld(String msg) {
    return CompletableFuture.supplyAsync(() -> {
      System.out.println("B " + Thread.currentThread().getName());
      return msg + " World";
    });
  }
}

//결과
A ForkJoinPool.commonPool-worker-19
B ForkJoinPool.commonPool-worker-5
Hello World

 

thenCombine( )

  • 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백 실행 
  • 둘의 연관관계는 없지만 비동기적으로 실행
public class CompletableFutureStudy {

  public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
      System.out.println("Get Apple Stock " + Thread.currentThread().getName());
      return "apple $19";
    });

    CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
      System.out.println("Get MS Stock " + Thread.currentThread().getName());
      return "ms $20";
    });

    CompletableFuture<String> future = hello.thenCombine(world, (h, w) -> h + " | " + w);
    System.out.println(future.get());
  }
}
//결과
Get Apple Stock ForkJoinPool.commonPool-worker-19
Get MS Stock ForkJoinPool.commonPool-worker-5
apple $19 | ms $20

 

 

allOf( )

  • 모든 서브테스크를 다 합쳐서 실행할 경우
  • 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행
public class App {
  public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<String> apple = CompletableFuture.supplyAsync(() -> {
      System.out.println("Get Apple Stock " + Thread.currentThread().getName());
      return "apple $19";
    });

    CompletableFuture<String> ms = CompletableFuture.supplyAsync(() -> {
      System.out.println("Get MS Stock " + Thread.currentThread().getName());
      return "ms $20";
    });

    CompletableFuture<String> aws = CompletableFuture.supplyAsync(() -> {
      System.out.println("Get AWS Stock " + Thread.currentThread().getName());
      return "aws $30";
    });

    // .allOf 를 사용하여 결과값을 받아보면 null 이 출력된다.
    // 각 task 마다 return  타입도 다르고 에러 발생도 있을 수 있어 결과값이 의미가 없어 null 을 반환한다.
    CompletableFuture<Void> future = CompletableFuture.allOf(apple, ms, aws)
        .thenAccept(System.out::println);

    System.out.println(future.get());
  }
}

//결과
Get MS Stock ForkJoinPool.commonPool-worker-5
Get Apple Stock ForkJoinPool.commonPool-worker-19
Get AWS Stock ForkJoinPool.commonPool-worker-23
null
null

위 와 같이 .allOf( ) 를 사용하면 결과값을 받을 수 없다는 문제가 있다. 

결과값을 받기 위해선 아래와 같이 구현해보자.

public class App {
  public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<String> apple = CompletableFuture.supplyAsync(() -> {
      System.out.println("Get Apple Stock " + Thread.currentThread().getName());
      return "apple $19";
    });

    CompletableFuture<String> ms = CompletableFuture.supplyAsync(() -> {
      System.out.println("Get MS Stock " + Thread.currentThread().getName());
      return "ms $20";
    });

    CompletableFuture<String> aws = CompletableFuture.supplyAsync(() -> {
      System.out.println("Get AWS Stock " + Thread.currentThread().getName());
      return "aws $30";
    });

    // 모든 task 들을 list 로 모아놓는다.
    List<CompletableFuture<String>> futures = Arrays.asList(apple, ms, aws);

    CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]);
    CompletableFuture<List<String>> results = CompletableFuture.allOf(futuresArray)
        .thenApply(v -> {
          // get 과 join 이 같은데, get 을 사용하면 checked exception 이 발생해서 예외처리를 해주어야 한다.
          // join 은  unchecked exception 이여서 예외처리를 해주지 않아도 된다.
          return futures.stream()
              .map(CompletableFuture::join)
              .collect(Collectors.toList());
        });
    
      results.get().forEach(System.out::println);
  }
}

//결과
Get AWS Stock ForkJoinPool.commonPool-worker-23
Get Apple Stock ForkJoinPool.commonPool-worker-19
Get MS Stock ForkJoinPool.commonPool-worker-5
apple $19
ms $20
aws $30

 

 

anyOf( )

  • 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백 실행
public class CompletableFutureStudy {
  public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<String> apple = CompletableFuture.supplyAsync(() -> {
      System.out.println("Get Apple Stock " + Thread.currentThread().getName());
      
      return "apple $19";
    });

    CompletableFuture<String> ms = CompletableFuture.supplyAsync(() -> {
      System.out.println("Get MS Stock " + Thread.currentThread().getName());

      return "ms $20";
    });

    CompletableFuture<String> aws = CompletableFuture.supplyAsync(() -> {
      System.out.println("Get AWS Stock " + Thread.currentThread().getName());

      return "aws $30";
    });

    CompletableFuture<Void> future = CompletableFuture.anyOf(apple, ms, aws)
        .thenAccept(System.out::println);

    future.get();
  }
}
//결과
Get Apple Stock ForkJoinPool.commonPool-worker-19
Get AWS Stock ForkJoinPool.commonPool-worker-23
Get MS Stock ForkJoinPool.commonPool-worker-5
apple $19

 

 

 

예외처리


exeptionally(Function)

public class CompletableFutureStudy {
  public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<String> apple = CompletableFuture.supplyAsync(() -> {
      if (true) {
        throw new IllegalArgumentException();
      }

      System.out.println("get apple stock " + Thread.currentThread().getName());
      return "apple $19";
    }).exceptionally(ex -> {
      System.out.println(ex);
      return "Error";
    });

    System.out.println(apple.get());
  }
}

 

handle(BiFunction)

  • 정상적으로 종료되었을때 혹은 에러가 발생하였을때 둘 다 리턴값을 받을 수 있다.
public class CompletableFutureStudy {
  public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<String> apple = CompletableFuture.supplyAsync(() -> {
      if (false) {
        throw new IllegalArgumentException();
      }
      System.out.println("get apple stock " + Thread.currentThread().getName());
      return "apple $19";
    }).handle((result, error) -> {
      if (error != null) {
        System.out.println(error);
        return "Error";
      }
      return result;
    });

    System.out.println(apple.get());
  }
}

 

 

 

  참고


백기선 - 더 자바, Java8

'프로그래밍 > Java' 카테고리의 다른 글

Metaspace  (0) 2022.05.06
배열 Parallel 정렬  (0) 2022.05.06
CompletableFuture (4)  (0) 2022.05.06
CompletableFuture (2) - Excutors  (0) 2022.05.06
CompletableFuture (1) - 자바 Concurrent 프로그래밍 소개  (0) 2022.05.06