https://hyokeun0419.tistory.com/89
https://hyokeun0419.tistory.com/90
해당 포스팅은 토비의 봄 TV 유튜브 영상을 참고하여 실습한 내용이다.
CompletableFuture 의 자세한 설명은 위 포스트팅을 참고하자.
지난번엔 Completion 클래스를 만들고 함수형으로 비동기처리가 될 수 있도록 변경하여 콜백헬 문제를 처리하였다.
이번엔 자바8 부터 제공되는 새로운 비동기 자바 프로그래밍 기술인 CompletableFuture 를 이용하여 해당 코드를 변경해보자.
CompletableFuture 실습
runAsync & thenRun
@Slf4j
public class CFutureApp {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다.
CompletableFuture
.runAsync(() -> log.info("runAsync"))
.thenRun(() -> log.info("thenRun"))
.thenRun(() -> log.info("thenRun"));
log.info("exit");
// 별도의 pool을 설정하지 않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다.
ForkJoinPool.commonPool().shutdown();
ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
}
//결과
22:03:45.495 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - runAsync
22:03:45.497 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenRun
22:03:45.495 [main] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exit
22:03:45.497 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenRun
supplyAsync, thenApply, thenAccept
@Slf4j
public class CFutureApp {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다.
CompletableFuture
.supplyAsync(() -> {
log.info("supplyAsync");
return 1;
})
// 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다.
.thenApply(s -> {
log.info("thenApply {}", s);
return s + 1;
})
// 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다.
.thenAccept(s -> log.info("thenAccept {}", s));
log.info("exit");
// 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다.
ForkJoinPool.commonPool().shutdown();
ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
}
//결과
22:05:10.103 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - supplyAsync
22:05:10.103 [main] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exit
22:05:10.105 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 1
22:05:10.106 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenAccept 2
thenCompose
@Slf4j
public class CFutureApp {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다.
CompletableFuture
.supplyAsync(() -> {
log.info("supplyAsync");
return 1;
})
// return이 CompletableFuture인 경우 thenCompose를 사용한다.
.thenCompose(s -> {
log.info("thenApply {}", s);
return CompletableFuture.completedFuture(s + 1);
})
// 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다.
.thenApply(s -> {
log.info("thenApply {}", s);
return s + 1;
})
// 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다.
.thenAccept(s -> log.info("thenAccept {}", s));
log.info("exit");
// 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다.
ForkJoinPool.commonPool().shutdown();
ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
}
//결과
22:06:28.763 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - supplyAsync
22:06:28.763 [main] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exit
22:06:28.765 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 1
22:06:28.766 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 2
22:06:28.766 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenAccept 3
exceptionally
@Slf4j
public class CFutureApp {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다.
CompletableFuture
.supplyAsync(() -> {
log.info("supplyAsync");
return 1;
})
// return이 CompletableFuture인 경우 thenCompose를 사용한다.
.thenCompose(s -> {
log.info("thenApply {}", s);
if (1 == 1)
throw new RuntimeException();
return CompletableFuture.completedFuture(s + 1);
})
// 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다.
.thenApply(s -> {
log.info("thenApply {}", s);
return s + 1;
})
.exceptionally(e -> {
log.info("exceptionally");
return -10;
})
// 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다.
.thenAccept(s -> log.info("thenAccept {}", s));
log.info("exit");
// 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다.
ForkJoinPool.commonPool().shutdown();
ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
}
//결과
22:09:05.923 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - supplyAsync
22:09:05.923 [main] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exit
22:09:05.925 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 1
22:09:05.926 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exceptionally
22:09:05.926 [ForkJoinPool.commonPool-worker-19] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenAccept -10
thenApplyAsync
@Slf4j
public class CFutureApp {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(10);
// Async 작업이 끝나고 해당 스레드에서 계속해서 작업을 수행한다.
CompletableFuture
.supplyAsync(() -> {
log.info("supplyAsync");
return 1;
}, es)
// return이 CompletableFuture인 경우 thenCompose를 사용한다.
.thenCompose(s -> {
log.info("thenApply {}", s);
return CompletableFuture.completedFuture(s + 1);
})
// 앞의 비동기 작업의 결과를 받아 사용해 새로운 값을 return 한다.
.thenApply(s -> {
log.info("thenApply {}", s);
return s + 2;
})
// 이 작업은 다른 스레드에서 처리를 하려고 할 때, thenApplyAsync를 사용한다.
// 스레드의 사용을 더 효율적으로 하고 자원을 더 효율적으로 사용한다.
// 현재 스레드 풀의 정책에 따라서 새로운 스레드를 할당하거나 대기중인 스레드를 사용한다. (스레드 풀 전략에 따라 다르다.)
.thenApplyAsync(s -> {
log.info("thenApply {}", s);
return s + 3;
}, es)
.exceptionally(e -> {
log.info("exceptionally");
return -10;
})
// 앞의 비동기 작업의 결과를 받아 사용하며 return이 없다.
.thenAcceptAsync(s -> log.info("thenAccept {}", s), es);
log.info("exit");
// 별도의 pool을 설정하지않으면 자바7 부터는 ForkJoinPool이 자동으로 사용된다.
ForkJoinPool.commonPool().shutdown();
ForkJoinPool.commonPool().awaitTermination(10, TimeUnit.SECONDS);
}
}
//결과
22:10:17.201 [main] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - exit
22:10:17.201 [pool-1-thread-1] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - supplyAsync
22:10:17.203 [pool-1-thread-1] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 1
22:10:17.204 [pool-1-thread-1] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 2
22:10:17.204 [pool-1-thread-2] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenApply 4
22:10:17.204 [pool-1-thread-3] INFO com.example.reactivestreamstoby._07_CompletableFuture.CFutureApp - thenAccept 7
ListenableFuture에서 CompletableFuture로 변환
이전 AsyncRestTemplate 을 사용하여 비동기를 구현하였을때 return 값을 확인해보면 ListenableFuture 로 리턴되는것을 확인 할 수 있다. 이번엔 ListenalbeFuture 를 사용할 것이 아니라 CompletableFuture 를 사용하여 체이닝 방식으로 비동기 작업을 구현할 것이다. 이를 위해선 유틸성 wrapper 메서드를 만들어야 한다. toCF 메서드를 이용하여 ListenableFuture 를 CompletableFuture 로 감싸주어 반환한다.
@SpringBootApplication
@EnableAsync
@Slf4j
public class Main {
@RestController
public static class MyController {
AsyncRestTemplate rt = new AsyncRestTemplate(new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1)));
@Autowired
MyService myService;
static final String URL1 = "http://localhost:8081/service?req={req}";
static final String URL2 = "http://localhost:8081/service2?req={req}";
@GetMapping("/rest")
public DeferredResult<String> rest(int idx) {
DeferredResult<String> dr = new DeferredResult<>();
toCF(rt.getForEntity("http://localhost:8081/service?req={req}", String.class, "hello" + idx))
.thenCompose(s -> toCF(rt.getForEntity("http://localhost:8081/service2?req={req}", String.class, s.getBody())))
.thenCompose(s -> toCF(myService.work(s.getBody())))
.thenAccept(s -> dr.setResult(s))
.exceptionally(e -> {
dr.setErrorResult(e.getMessage());
return null;
});
return dr;
}
<T> CompletableFuture<T> toCF(ListenableFuture<T> lf) {
CompletableFuture<T> cf = new CompletableFuture<>();
lf.addCallback(s -> cf.complete(s), e -> cf.completeExceptionally(e));
return cf;
}
}
@Service
public static class MyService {
@Async
public ListenableFuture<String> work(String req) {
return new AsyncResult<>(req + "/asyncwork");
}
}
@Bean
public ThreadPoolTaskExecutor myThreadPool() {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setCorePoolSize(1);
te.setMaxPoolSize(1);
te.initialize();
return te;
}
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}
참고
Jongmin92 님 블로그
Toby 님 강의
'Spring > Webflux' 카테고리의 다른 글
리액티브 프로그래밍이란 (0) | 2024.01.27 |
---|---|
Thread, Future, CompletableFuture 란 (1) | 2024.01.27 |
토비의 봄 TV - AsyncRestTemplate의 콜백 헬과 중복 작업 문제 (6) (0) | 2022.06.24 |
토비의 봄 TV - 스프링의 비동기 기술 (4) 2/2 (0) | 2022.06.17 |
토비의 봄 TV - 자바의 비동기 기술 (4) 1/2 (0) | 2022.06.17 |