Spring/Webflux

토비의 봄 TV - 자바의 비동기 기술 (4) 1/2

개발정리 2022. 6. 17. 00:11

ExcutorService


  • 쉽게 비동기로 작업을 실행할 수 있도록 도와주는 JDK(1.5부터)에서 제공하는 interface 이다. 
  • 일반적으로 ExecutorService는 작업 할당을 위한 스레드 풀과 API를 제공한다.
@Slf4j
public class ExcutorServiceApp {
  public static void main(String[] args) {
    ExecutorService es = Executors.newCachedThreadPool();
    es.execute(() -> {
      try {
        Thread.sleep(2000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      log.info("Async");
      System.out.println("Async");
    });
    log.info("Exit");
  }
}
// 결과
22:43:32.350 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.ExcutorServiceApp - Exit
22:43:34.350 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.ExcutorServiceApp - Async

 

 

 

Future


  • Future 는 자바 1.5 에 등장한 비동기 계산 결과를 나타내는 인터페이스
  • Future를 이용하면 멀티쓰레드 환경에서 처리된 어떤 데이터를 다른 쓰레드에 전달할 수 있다.
  • Future 내부적으로 Thread-Safe 하도록 구현되었기 때문에 synchronized block을 사용하지 않아도 된다.

 

  • 비동기적인 작업을 수행?
    • 현재 진행하고 있는 Thread 가 아닌 별도의 Thread 에서 작업을 수행하는 것을 말한다.
    • 같은 Thread 에서 메서드를 호출할 때는 리턴 값을 받지만, 비동기적으로 작업을 수행할 때는 리턴 값을 전달받을 수 있는 무언가의 interface 가 필요한데 Future 가 그 역할을 한다.
  • 비동기 작업에서 결과를 반환하고 싶을 때
    • runnable 대신 callable 인터페이스를 이용하면 리턴 값을 받을 수 있다. 
    • 예외가 발생했을 때 해당 예외를 비동기 코드를 처리하는 Thread 안에서 처리하지 않고 밖으로 던질 수 있다.
@Slf4j
public class FutureApp {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService es = Executors.newCachedThreadPool();

    Future<String> f = es.submit(() -> {
      Thread.sleep(2000);
      log.info("Async");
      return "Hello";
    });

    log.info(f.get());
    log.info("Exit");
  }
}

//결과
22:55:01.532 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Async
22:55:01.537 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Hello
22:55:01.537 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Exit
  • Future 를 통해 비동기 결과의 값을 가져올 때는 get 메서드를 사용한다.
  • 그러나 get 메서드를 호출하게 되면 비동기 작업이 완료될 때 까지 Thread 가 blocking 된다.
  • Future 는 비동기적인 작업을 수행하고 그 결과를 갖고 있으며, 완료를 기다리고 계산 결과를 가져오는 get 메서드와
    해당 연산이 완료 되었는지 확인하는 isDone 메서드를 제공한다.
@Slf4j
public class FutureApp {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService es = Executors.newCachedThreadPool();

    Future<String> f = es.submit(() -> {
      Thread.sleep(2000);
      log.info("Async");
      return "Hello";
    });

    log.info(String.valueOf(f.isDone()));
    Thread.sleep(2000);
    log.info("Exit");
    log.info(String.valueOf(f.isDone()));
    log.info(f.get());
  }
}
//결과
23:01:36.065 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - false
23:01:38.071 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Async
23:01:38.072 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Exit
23:01:38.072 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - false
23:01:38.072 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureApp - Hello

 

 

 

FutureTask


  • FutureTask 는 비동기 작업을 생성한다.
  • 위의 실습은 비동기 작업 생성과 실행을 동시에 했다면, FutureTask 는 비동기 작업 생성과 실행을 분리하여 진행할 수 있다.
@Slf4j
public class FutureTaskApp {
  public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService es = Executors.newCachedThreadPool();
    FutureTask<String> f = new FutureTask<>(() -> {
      Thread.sleep(2000);
      log.info("Async");
      return "Hello";
    });

    es.execute(f);

    log.info(String.valueOf(f.isDone()));
    Thread.sleep(2000);
    log.info("Exit");
    log.info(String.valueOf(f.isDone()));
    log.info(f.get());
  }
}
//결과
23:41:40.003 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - false
23:41:42.007 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Async
23:41:42.009 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Exit
23:41:42.009 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - true
23:41:42.009 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Hello
  • 비동기 작업 결과를 가져오는 방법으로 두 가지가 있다.
    • Future 와 같은 결과를 다루는 Handler 를 이용하는 방법
    • Callback 을 이용하는 방법

 

아래 예시코드는 FutureTask 의 비동기 작업이 완료될 경우 호출되는 done 메서드를 재정의하여 callback 을 이용하는 방법이다.

@Slf4j
public class FutureTaskApp {
  public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService es = Executors.newCachedThreadPool();
    FutureTask<String> f = new FutureTask<>(() -> {
      Thread.sleep(2000);
      log.info("Async");
      return "Hello";
    }){
      @Override
      protected void done() {
        super.done();
        try {
          log.info(get());
        } catch (InterruptedException e) {
          e.printStackTrace();
        } catch (ExecutionException e) {
          e.printStackTrace();
        }
      }
    };

    es.execute(f);
    es.shutdown();

    log.info("Exit");
  }
}
//결과
23:46:52.088 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Exit
23:46:54.092 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Async
23:46:54.094 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Hello

비동기 코드와 그 결과를 갖고 작업을 수행하는 callback 부분을 가독성이 좋게 작성할 수 있다.

클래스를 정의한 후 FutrueTask 를 상속받아 done 메서드를 재정의하자. (아래코드 참고)

@Slf4j
public class FutureTaskApp {

  interface SuccessCallback {
    void onSuccess(String result);
  }

  public static class CallbackFutureTask extends FutureTask<String> {
    SuccessCallback sc;

    public CallbackFutureTask(Callable<String> callable, SuccessCallback sc) {
      super(callable);
      this.sc = Objects.requireNonNull(sc);
    }

    @Override
    protected void done() {
      super.done();
      try {
        this.sc.onSuccess(get());
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (ExecutionException e) {
        e.printStackTrace();
      }
    }

    public static void main(String[] args) {
      ExecutorService es = Executors.newCachedThreadPool();

      CallbackFutureTask f = new CallbackFutureTask(() -> {
        Thread.sleep(2000);
        log.info("Async");
        return "Hello";
      },log::info);

      es.execute(f);
      es.shutdown();
    }
  }
}
//결과
00:01:58.203 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Async
00:01:58.206 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Hello

위 예시코드에 SuccessCallback을 추가한 것처럼 ExceptionCallback을 추가하여 비동기 코드에서 예외가 발생할 경우, 해당 예외를 처리하는 callback도 추가할 수 있다. (아래코드 참고)

@Slf4j
public class FutureTaskApp {

  interface SuccessCallback {
    void onSuccess(String result);
  }

  interface ExceptionCallback {
    void onError(Throwable t);
  }

  public static class CallbackFutureTask extends FutureTask<String> {
    SuccessCallback sc;
    ExceptionCallback ec;

    public CallbackFutureTask(Callable<String> callable, SuccessCallback sc, ExceptionCallback ec) {
      super(callable);
      this.sc = Objects.requireNonNull(sc);
      this.ec = Objects.requireNonNull(ec);
    }

    @Override
    protected void done() {
      super.done();
      try {
        this.sc.onSuccess(get());
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      } catch (ExecutionException e) {
        ec.onError(e.getCause());
      }
    }

    public static void main(String[] args) {
      ExecutorService es = Executors.newCachedThreadPool();

      CallbackFutureTask f = new CallbackFutureTask(() -> {
        Thread.sleep(2000);
        if (1 == 1) throw new RuntimeException("Async ERROR!!!");
        log.info("Async");
        return "Hello";
      },
      s -> log.info("Result: {}", s),
      e -> log.info("Error: {}", e.getMessage()));

      es.execute(f);
      es.shutdown();

      log.info("EXIT");
    }
  }
}

//결과
00:06:53.557 [main] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - EXIT
00:06:55.508 [pool-1-thread-1] INFO com.example.reactivestreamstoby._04_SyncAsync.FutureTaskApp - Error: Async ERROR!!!

InterruptedException은 예외긴 예외이지만, "현재 작업을 수행하지 말고 중단해라." 라고 메시지를 보내는 용도이다.
따라서 현재 Thread 에 interrupt를 체크하고 종료한다.

 

 

 

참고


해당 포스팅은 토비님의 유튜브 강의와 Jongmin 님 블로그를 보고 직접 실습하며 작성한 포스팅입니다.