-
토비의 봄 TV - 자바의 비동기 기술 (4) 1/2Spring/Webflux 2022. 6. 17. 00:11
ExcutorService
- 쉽게 비동기로 작업을 실행할 수 있도록 도와주는 JDK(1.5부터)에서 제공하는 interface 이다.
- 일반적으로 ExecutorService는 작업 할당을 위한 스레드 풀과 API를 제공한다.
@Slf4j public class ExcutorServiceApp { public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); es.execute(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("Async"); System.out.println("Async"); }); log.info("Exit"); } } // 결과 22:43:32.350 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.ExcutorServiceApp - Exit 22:43:34.350 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.ExcutorServiceApp - Async
Future
- Future 는 자바 1.5 에 등장한 비동기 계산 결과를 나타내는 인터페이스
- Future를 이용하면 멀티쓰레드 환경에서 처리된 어떤 데이터를 다른 쓰레드에 전달할 수 있다.
- Future 내부적으로 Thread-Safe 하도록 구현되었기 때문에 synchronized block을 사용하지 않아도 된다.
- 비동기적인 작업을 수행?
- 현재 진행하고 있는 Thread 가 아닌 별도의 Thread 에서 작업을 수행하는 것을 말한다.
- 같은 Thread 에서 메서드를 호출할 때는 리턴 값을 받지만, 비동기적으로 작업을 수행할 때는 리턴 값을 전달받을 수 있는 무언가의 interface 가 필요한데 Future 가 그 역할을 한다.
- 비동기 작업에서 결과를 반환하고 싶을 때
- runnable 대신 callable 인터페이스를 이용하면 리턴 값을 받을 수 있다.
- 예외가 발생했을 때 해당 예외를 비동기 코드를 처리하는 Thread 안에서 처리하지 않고 밖으로 던질 수 있다.
@Slf4j public class FutureApp { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService es = Executors.newCachedThreadPool(); Future<String> f = es.submit(() -> { Thread.sleep(2000); log.info("Async"); return "Hello"; }); log.info(f.get()); log.info("Exit"); } } //결과 22:55:01.532 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Async 22:55:01.537 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Hello 22:55:01.537 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Exit
- Future 를 통해 비동기 결과의 값을 가져올 때는 get 메서드를 사용한다.
- 그러나 get 메서드를 호출하게 되면 비동기 작업이 완료될 때 까지 Thread 가 blocking 된다.
- Future 는 비동기적인 작업을 수행하고 그 결과를 갖고 있으며, 완료를 기다리고 계산 결과를 가져오는 get 메서드와
해당 연산이 완료 되었는지 확인하는 isDone 메서드를 제공한다.
@Slf4j public class FutureApp { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService es = Executors.newCachedThreadPool(); Future<String> f = es.submit(() -> { Thread.sleep(2000); log.info("Async"); return "Hello"; }); log.info(String.valueOf(f.isDone())); Thread.sleep(2000); log.info("Exit"); log.info(String.valueOf(f.isDone())); log.info(f.get()); } } //결과 23:01:36.065 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - false 23:01:38.071 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Async 23:01:38.072 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Exit 23:01:38.072 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - false 23:01:38.072 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Hello
FutureTask
- FutureTask 는 비동기 작업을 생성한다.
- 위의 실습은 비동기 작업 생성과 실행을 동시에 했다면, FutureTask 는 비동기 작업 생성과 실행을 분리하여 진행할 수 있다.
@Slf4j public class FutureTaskApp { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService es = Executors.newCachedThreadPool(); FutureTask<String> f = new FutureTask<>(() -> { Thread.sleep(2000); log.info("Async"); return "Hello"; }); es.execute(f); log.info(String.valueOf(f.isDone())); Thread.sleep(2000); log.info("Exit"); log.info(String.valueOf(f.isDone())); log.info(f.get()); } } //결과 23:41:40.003 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - false 23:41:42.007 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Async 23:41:42.009 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Exit 23:41:42.009 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - true 23:41:42.009 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Hello
- 비동기 작업 결과를 가져오는 방법으로 두 가지가 있다.
- Future 와 같은 결과를 다루는 Handler 를 이용하는 방법
- Callback 을 이용하는 방법
아래 예시코드는 FutureTask 의 비동기 작업이 완료될 경우 호출되는 done 메서드를 재정의하여 callback 을 이용하는 방법이다.
@Slf4j public class FutureTaskApp { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService es = Executors.newCachedThreadPool(); FutureTask<String> f = new FutureTask<>(() -> { Thread.sleep(2000); log.info("Async"); return "Hello"; }){ @Override protected void done() { super.done(); try { log.info(get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }; es.execute(f); es.shutdown(); log.info("Exit"); } } //결과 23:46:52.088 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Exit 23:46:54.092 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Async 23:46:54.094 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Hello
위 비동기 코드와 그 결과를 갖고 작업을 수행하는 callback 부분을 가독성이 좋게 작성할 수 있다.
클래스를 정의한 후 FutrueTask 를 상속받아 done 메서드를 재정의하자. (아래코드 참고)
@Slf4j public class FutureTaskApp { interface SuccessCallback { void onSuccess(String result); } public static class CallbackFutureTask extends FutureTask<String> { SuccessCallback sc; public CallbackFutureTask(Callable<String> callable, SuccessCallback sc) { super(callable); this.sc = Objects.requireNonNull(sc); } @Override protected void done() { super.done(); try { this.sc.onSuccess(get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); CallbackFutureTask f = new CallbackFutureTask(() -> { Thread.sleep(2000); log.info("Async"); return "Hello"; },log::info); es.execute(f); es.shutdown(); } } } //결과 00:01:58.203 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Async 00:01:58.206 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Hello
위 예시코드에 SuccessCallback을 추가한 것처럼 ExceptionCallback을 추가하여 비동기 코드에서 예외가 발생할 경우, 해당 예외를 처리하는 callback도 추가할 수 있다. (아래코드 참고)
@Slf4j public class FutureTaskApp { interface SuccessCallback { void onSuccess(String result); } interface ExceptionCallback { void onError(Throwable t); } public static class CallbackFutureTask extends FutureTask<String> { SuccessCallback sc; ExceptionCallback ec; public CallbackFutureTask(Callable<String> callable, SuccessCallback sc, ExceptionCallback ec) { super(callable); this.sc = Objects.requireNonNull(sc); this.ec = Objects.requireNonNull(ec); } @Override protected void done() { super.done(); try { this.sc.onSuccess(get()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { ec.onError(e.getCause()); } } public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); CallbackFutureTask f = new CallbackFutureTask(() -> { Thread.sleep(2000); if (1 == 1) throw new RuntimeException("Async ERROR!!!"); log.info("Async"); return "Hello"; }, s -> log.info("Result: {}", s), e -> log.info("Error: {}", e.getMessage())); es.execute(f); es.shutdown(); log.info("EXIT"); } } } //결과 00:06:53.557 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - EXIT 00:06:55.508 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Error: Async ERROR!!!
InterruptedException은 예외긴 예외이지만, "현재 작업을 수행하지 말고 중단해라." 라고 메시지를 보내는 용도이다.
따라서 현재 Thread 에 interrupt를 체크하고 종료한다.참고
해당 포스팅은 토비님의 유튜브 강의와 Jongmin 님 블로그를 보고 직접 실습하며 작성한 포스팅입니다.
- 자바 Concurrent 프로그래밍 소개
- Excutor
- Callable 과 Future
- 토비의 봄 TV 9회 스프링 리액티브 프로그래밍 (5) 비동기 RestTemplate과 비동기 MVC/Serlvet
- JongMin 님 블로그
'Spring > Webflux' 카테고리의 다른 글
토비의 봄 TV - AsyncRestTemplate의 콜백 헬과 중복 작업 문제 (6) (0) 2022.06.24 토비의 봄 TV - 스프링의 비동기 기술 (4) 2/2 (0) 2022.06.17 토비의 봄 TV - Reactive Streams Scheduler (3) (0) 2022.06.14 토비의 봄 TV - Reactive Streams Operators (2) (0) 2022.06.07 토비의 봄 TV - Reactive Streams (1) (0) 2022.06.04