ExcutorService
- 쉽게 비동기로 작업을 실행할 수 있도록 도와주는 JDK(1.5부터)에서 제공하는 interface 이다.
- 일반적으로 ExecutorService는 작업 할당을 위한 스레드 풀과 API를 제공한다.
@Slf4j
public class ExcutorServiceApp {
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("Async");
System.out.println("Async");
});
log.info("Exit");
}
}
// 결과
22:43:32.350 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.ExcutorServiceApp - Exit
22:43:34.350 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.ExcutorServiceApp - Async
Future
- Future 는 자바 1.5 에 등장한 비동기 계산 결과를 나타내는 인터페이스
- Future를 이용하면 멀티쓰레드 환경에서 처리된 어떤 데이터를 다른 쓰레드에 전달할 수 있다.
- Future 내부적으로 Thread-Safe 하도록 구현되었기 때문에 synchronized block을 사용하지 않아도 된다.
- 비동기적인 작업을 수행?
- 현재 진행하고 있는 Thread 가 아닌 별도의 Thread 에서 작업을 수행하는 것을 말한다.
- 같은 Thread 에서 메서드를 호출할 때는 리턴 값을 받지만, 비동기적으로 작업을 수행할 때는 리턴 값을 전달받을 수 있는 무언가의 interface 가 필요한데 Future 가 그 역할을 한다.
- 비동기 작업에서 결과를 반환하고 싶을 때
- runnable 대신 callable 인터페이스를 이용하면 리턴 값을 받을 수 있다.
- 예외가 발생했을 때 해당 예외를 비동기 코드를 처리하는 Thread 안에서 처리하지 않고 밖으로 던질 수 있다.
@Slf4j
public class FutureApp {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
Future<String> f = es.submit(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
});
log.info(f.get());
log.info("Exit");
}
}
//결과
22:55:01.532 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Async
22:55:01.537 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Hello
22:55:01.537 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Exit
- Future 를 통해 비동기 결과의 값을 가져올 때는 get 메서드를 사용한다.
- 그러나 get 메서드를 호출하게 되면 비동기 작업이 완료될 때 까지 Thread 가 blocking 된다.
- Future 는 비동기적인 작업을 수행하고 그 결과를 갖고 있으며, 완료를 기다리고 계산 결과를 가져오는 get 메서드와
해당 연산이 완료 되었는지 확인하는 isDone 메서드를 제공한다.
@Slf4j
public class FutureApp {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
Future<String> f = es.submit(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
});
log.info(String.valueOf(f.isDone()));
Thread.sleep(2000);
log.info("Exit");
log.info(String.valueOf(f.isDone()));
log.info(f.get());
}
}
//결과
23:01:36.065 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - false
23:01:38.071 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Async
23:01:38.072 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Exit
23:01:38.072 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - false
23:01:38.072 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Hello
FutureTask
- FutureTask 는 비동기 작업을 생성한다.
- 위의 실습은 비동기 작업 생성과 실행을 동시에 했다면, FutureTask 는 비동기 작업 생성과 실행을 분리하여 진행할 수 있다.
@Slf4j
public class FutureTaskApp {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newCachedThreadPool();
FutureTask<String> f = new FutureTask<>(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
});
es.execute(f);
log.info(String.valueOf(f.isDone()));
Thread.sleep(2000);
log.info("Exit");
log.info(String.valueOf(f.isDone()));
log.info(f.get());
}
}
//결과
23:41:40.003 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - false
23:41:42.007 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Async
23:41:42.009 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Exit
23:41:42.009 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - true
23:41:42.009 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Hello
- 비동기 작업 결과를 가져오는 방법으로 두 가지가 있다.
- Future 와 같은 결과를 다루는 Handler 를 이용하는 방법
- Callback 을 이용하는 방법
아래 예시코드는 FutureTask 의 비동기 작업이 완료될 경우 호출되는 done 메서드를 재정의하여 callback 을 이용하는 방법이다.
@Slf4j
public class FutureTaskApp {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newCachedThreadPool();
FutureTask<String> f = new FutureTask<>(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
}){
@Override
protected void done() {
super.done();
try {
log.info(get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
};
es.execute(f);
es.shutdown();
log.info("Exit");
}
}
//결과
23:46:52.088 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Exit
23:46:54.092 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Async
23:46:54.094 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Hello
위 비동기 코드와 그 결과를 갖고 작업을 수행하는 callback 부분을 가독성이 좋게 작성할 수 있다.
클래스를 정의한 후 FutrueTask 를 상속받아 done 메서드를 재정의하자. (아래코드 참고)
@Slf4j
public class FutureTaskApp {
interface SuccessCallback {
void onSuccess(String result);
}
public static class CallbackFutureTask extends FutureTask<String> {
SuccessCallback sc;
public CallbackFutureTask(Callable<String> callable, SuccessCallback sc) {
super(callable);
this.sc = Objects.requireNonNull(sc);
}
@Override
protected void done() {
super.done();
try {
this.sc.onSuccess(get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();
CallbackFutureTask f = new CallbackFutureTask(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
},log::info);
es.execute(f);
es.shutdown();
}
}
}
//결과
00:01:58.203 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Async
00:01:58.206 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Hello
위 예시코드에 SuccessCallback을 추가한 것처럼 ExceptionCallback을 추가하여 비동기 코드에서 예외가 발생할 경우, 해당 예외를 처리하는 callback도 추가할 수 있다. (아래코드 참고)
@Slf4j
public class FutureTaskApp {
interface SuccessCallback {
void onSuccess(String result);
}
interface ExceptionCallback {
void onError(Throwable t);
}
public static class CallbackFutureTask extends FutureTask<String> {
SuccessCallback sc;
ExceptionCallback ec;
public CallbackFutureTask(Callable<String> callable, SuccessCallback sc, ExceptionCallback ec) {
super(callable);
this.sc = Objects.requireNonNull(sc);
this.ec = Objects.requireNonNull(ec);
}
@Override
protected void done() {
super.done();
try {
this.sc.onSuccess(get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
ec.onError(e.getCause());
}
}
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();
CallbackFutureTask f = new CallbackFutureTask(() -> {
Thread.sleep(2000);
if (1 == 1) throw new RuntimeException("Async ERROR!!!");
log.info("Async");
return "Hello";
},
s -> log.info("Result: {}", s),
e -> log.info("Error: {}", e.getMessage()));
es.execute(f);
es.shutdown();
log.info("EXIT");
}
}
}
//결과
00:06:53.557 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - EXIT
00:06:55.508 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Error: Async ERROR!!!
InterruptedException은 예외긴 예외이지만, "현재 작업을 수행하지 말고 중단해라." 라고 메시지를 보내는 용도이다.
따라서 현재 Thread 에 interrupt를 체크하고 종료한다.
참고
해당 포스팅은 토비님의 유튜브 강의와 Jongmin 님 블로그를 보고 직접 실습하며 작성한 포스팅입니다.
- 자바 Concurrent 프로그래밍 소개
- Excutor
- Callable 과 Future
- 토비의 봄 TV 9회 스프링 리액티브 프로그래밍 (5) 비동기 RestTemplate과 비동기 MVC/Serlvet
- JongMin 님 블로그
'Spring > Webflux' 카테고리의 다른 글
토비의 봄 TV - AsyncRestTemplate의 콜백 헬과 중복 작업 문제 (6) (0) | 2022.06.24 |
---|---|
토비의 봄 TV - 스프링의 비동기 기술 (4) 2/2 (0) | 2022.06.17 |
토비의 봄 TV - Reactive Streams Scheduler (3) (0) | 2022.06.14 |
토비의 봄 TV - Reactive Streams Operators (2) (0) | 2022.06.07 |
토비의 봄 TV - Reactive Streams (1) (0) | 2022.06.04 |