ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 토비의 봄 TV - 자바의 비동기 기술 (4) 1/2
    Spring/Webflux 2022. 6. 17. 00:11

    ExcutorService


    • 쉽게 비동기로 작업을 실행할 수 있도록 도와주는 JDK(1.5부터)에서 제공하는 interface 이다. 
    • 일반적으로 ExecutorService는 작업 할당을 위한 스레드 풀과 API를 제공한다.
    @Slf4j
    public class ExcutorServiceApp {
      public static void main(String[] args) {
        ExecutorService es = Executors.newCachedThreadPool();
        es.execute(() -> {
          try {
            Thread.sleep(2000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          log.info("Async");
          System.out.println("Async");
        });
        log.info("Exit");
      }
    }
    // 결과
    22:43:32.350 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.ExcutorServiceApp - Exit
    22:43:34.350 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.ExcutorServiceApp - Async

     

     

     

    Future


    • Future 는 자바 1.5 에 등장한 비동기 계산 결과를 나타내는 인터페이스
    • Future를 이용하면 멀티쓰레드 환경에서 처리된 어떤 데이터를 다른 쓰레드에 전달할 수 있다.
    • Future 내부적으로 Thread-Safe 하도록 구현되었기 때문에 synchronized block을 사용하지 않아도 된다.

     

    • 비동기적인 작업을 수행?
      • 현재 진행하고 있는 Thread 가 아닌 별도의 Thread 에서 작업을 수행하는 것을 말한다.
      • 같은 Thread 에서 메서드를 호출할 때는 리턴 값을 받지만, 비동기적으로 작업을 수행할 때는 리턴 값을 전달받을 수 있는 무언가의 interface 가 필요한데 Future 가 그 역할을 한다.
    • 비동기 작업에서 결과를 반환하고 싶을 때
      • runnable 대신 callable 인터페이스를 이용하면 리턴 값을 받을 수 있다. 
      • 예외가 발생했을 때 해당 예외를 비동기 코드를 처리하는 Thread 안에서 처리하지 않고 밖으로 던질 수 있다.
    @Slf4j
    public class FutureApp {
      public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();
    
        Future<String> f = es.submit(() -> {
          Thread.sleep(2000);
          log.info("Async");
          return "Hello";
        });
    
        log.info(f.get());
        log.info("Exit");
      }
    }
    
    //결과
    22:55:01.532 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Async
    22:55:01.537 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Hello
    22:55:01.537 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Exit
    • Future 를 통해 비동기 결과의 값을 가져올 때는 get 메서드를 사용한다.
    • 그러나 get 메서드를 호출하게 되면 비동기 작업이 완료될 때 까지 Thread 가 blocking 된다.
    • Future 는 비동기적인 작업을 수행하고 그 결과를 갖고 있으며, 완료를 기다리고 계산 결과를 가져오는 get 메서드와
      해당 연산이 완료 되었는지 확인하는 isDone 메서드를 제공한다.
    @Slf4j
    public class FutureApp {
      public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();
    
        Future<String> f = es.submit(() -> {
          Thread.sleep(2000);
          log.info("Async");
          return "Hello";
        });
    
        log.info(String.valueOf(f.isDone()));
        Thread.sleep(2000);
        log.info("Exit");
        log.info(String.valueOf(f.isDone()));
        log.info(f.get());
      }
    }
    //결과
    23:01:36.065 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - false
    23:01:38.071 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Async
    23:01:38.072 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Exit
    23:01:38.072 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - false
    23:01:38.072 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Hello

     

     

     

    FutureTask


    • FutureTask 는 비동기 작업을 생성한다.
    • 위의 실습은 비동기 작업 생성과 실행을 동시에 했다면, FutureTask 는 비동기 작업 생성과 실행을 분리하여 진행할 수 있다.
    @Slf4j
    public class FutureTaskApp {
      public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService es = Executors.newCachedThreadPool();
        FutureTask<String> f = new FutureTask<>(() -> {
          Thread.sleep(2000);
          log.info("Async");
          return "Hello";
        });
    
        es.execute(f);
    
        log.info(String.valueOf(f.isDone()));
        Thread.sleep(2000);
        log.info("Exit");
        log.info(String.valueOf(f.isDone()));
        log.info(f.get());
      }
    }
    //결과
    23:41:40.003 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - false
    23:41:42.007 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Async
    23:41:42.009 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Exit
    23:41:42.009 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - true
    23:41:42.009 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Hello
    • 비동기 작업 결과를 가져오는 방법으로 두 가지가 있다.
      • Future 와 같은 결과를 다루는 Handler 를 이용하는 방법
      • Callback 을 이용하는 방법

     

    아래 예시코드는 FutureTask 의 비동기 작업이 완료될 경우 호출되는 done 메서드를 재정의하여 callback 을 이용하는 방법이다.

    @Slf4j
    public class FutureTaskApp {
      public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService es = Executors.newCachedThreadPool();
        FutureTask<String> f = new FutureTask<>(() -> {
          Thread.sleep(2000);
          log.info("Async");
          return "Hello";
        }){
          @Override
          protected void done() {
            super.done();
            try {
              log.info(get());
            } catch (InterruptedException e) {
              e.printStackTrace();
            } catch (ExecutionException e) {
              e.printStackTrace();
            }
          }
        };
    
        es.execute(f);
        es.shutdown();
    
        log.info("Exit");
      }
    }
    //결과
    23:46:52.088 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Exit
    23:46:54.092 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Async
    23:46:54.094 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Hello

    비동기 코드와 그 결과를 갖고 작업을 수행하는 callback 부분을 가독성이 좋게 작성할 수 있다.

    클래스를 정의한 후 FutrueTask 를 상속받아 done 메서드를 재정의하자. (아래코드 참고)

    @Slf4j
    public class FutureTaskApp {
    
      interface SuccessCallback {
        void onSuccess(String result);
      }
    
      public static class CallbackFutureTask extends FutureTask<String> {
        SuccessCallback sc;
    
        public CallbackFutureTask(Callable<String> callable, SuccessCallback sc) {
          super(callable);
          this.sc = Objects.requireNonNull(sc);
        }
    
        @Override
        protected void done() {
          super.done();
          try {
            this.sc.onSuccess(get());
          } catch (InterruptedException e) {
            e.printStackTrace();
          } catch (ExecutionException e) {
            e.printStackTrace();
          }
        }
    
        public static void main(String[] args) {
          ExecutorService es = Executors.newCachedThreadPool();
    
          CallbackFutureTask f = new CallbackFutureTask(() -> {
            Thread.sleep(2000);
            log.info("Async");
            return "Hello";
          },log::info);
    
          es.execute(f);
          es.shutdown();
        }
      }
    }
    //결과
    00:01:58.203 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Async
    00:01:58.206 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Hello

    위 예시코드에 SuccessCallback을 추가한 것처럼 ExceptionCallback을 추가하여 비동기 코드에서 예외가 발생할 경우, 해당 예외를 처리하는 callback도 추가할 수 있다. (아래코드 참고)

    @Slf4j
    public class FutureTaskApp {
    
      interface SuccessCallback {
        void onSuccess(String result);
      }
    
      interface ExceptionCallback {
        void onError(Throwable t);
      }
    
      public static class CallbackFutureTask extends FutureTask<String> {
        SuccessCallback sc;
        ExceptionCallback ec;
    
        public CallbackFutureTask(Callable<String> callable, SuccessCallback sc, ExceptionCallback ec) {
          super(callable);
          this.sc = Objects.requireNonNull(sc);
          this.ec = Objects.requireNonNull(ec);
        }
    
        @Override
        protected void done() {
          super.done();
          try {
            this.sc.onSuccess(get());
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
          } catch (ExecutionException e) {
            ec.onError(e.getCause());
          }
        }
    
        public static void main(String[] args) {
          ExecutorService es = Executors.newCachedThreadPool();
    
          CallbackFutureTask f = new CallbackFutureTask(() -> {
            Thread.sleep(2000);
            if (1 == 1) throw new RuntimeException("Async ERROR!!!");
            log.info("Async");
            return "Hello";
          },
          s -> log.info("Result: {}", s),
          e -> log.info("Error: {}", e.getMessage()));
    
          es.execute(f);
          es.shutdown();
    
          log.info("EXIT");
        }
      }
    }
    
    //결과
    00:06:53.557 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - EXIT
    00:06:55.508 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Error: Async ERROR!!!

    InterruptedException은 예외긴 예외이지만, "현재 작업을 수행하지 말고 중단해라." 라고 메시지를 보내는 용도이다.
    따라서 현재 Thread 에 interrupt를 체크하고 종료한다.

     

     

     

    참고


    해당 포스팅은 토비님의 유튜브 강의와 Jongmin 님 블로그를 보고 직접 실습하며 작성한 포스팅입니다.

     

     

Designed by Tistory.