Spring/Webflux

토비의 봄 TV - 스프링의 비동기 기술 (4) 2/2

개발정리 2022. 6. 17. 10:48

@Async


  • Spring MVC 3.2 부터 Servlet 3.0 기반의 비동기 요청 처리가 가능해졌다.
  • @Async 어노테이션을 추가해 해당 메서드를 비동기적으로 호출할 수 있다. 
  • 해당 메서드를 호출한 호출자(caller)는 즉시 리턴하고 메소드의 실제 실행은 Spring TaskExecutor에 의해서 실행된다.
  • 비동기로 실행되는 메서드는 Future 형식의 값을 리턴하고, 호출자는 해당 Future의 get() 메서드를 호출하기 전에 다른 작업을 수행할 수 있다.
@SpringBootApplication
@EnableAsync
@Slf4j
public class AsyncApp {
  @Service
  public static class MyService {
     /*
       내부적으로 AOP를 이용해 복잡한 로직이 실행된다.
       비동기 작업은 return값으로 바로 결과를 줄 수 없다.
       (Future 혹은 Callback을 이용해야 한다.)
     */
    @Async
    public Future<String> hello() throws InterruptedException {
      log.info("hello()");
      Thread.sleep(1000);
      return new AsyncResult<>("Hello");
    }
  }
  public static void main(String[] args) {
    // try with resource 블록을 이용해 빈이 다 준비된 후 종료되도록 설정
    try (ConfigurableApplicationContext c = SpringApplication.run(AsyncApp.class, args)) {}
  }
  @Autowired
  MyService myService;

  // 모든 빈이 다 준비된 후 실행됨 (현재는 일종의 컨트롤러라고 생각)
  @Bean
  ApplicationRunner run() {
    return args -> {
      log.info("run()");
      Future<String> res = myService.hello();
      log.info("exit: {}", res.isDone());
      log.info("result: {}", res.get());
    };
  }
}
//결과
2022-06-17 08:40:41.488  INFO 35957 --- [  restartedMain] c.e.r._04_SyncAsync.AsyncApp             : Started AsyncApp in 0.682 seconds (JVM running for 1.15)
2022-06-17 08:40:41.489  INFO 35957 --- [  restartedMain] c.e.r._04_SyncAsync.AsyncApp             : run()
2022-06-17 08:40:41.493  INFO 35957 --- [  restartedMain] c.e.r._04_SyncAsync.AsyncApp             : exit: false
2022-06-17 08:40:41.496  INFO 35957 --- [         task-1] c.e.r._04_SyncAsync.AsyncApp             : hello()
2022-06-17 08:40:42.502  INFO 35957 --- [  restartedMain] c.e.r._04_SyncAsync.AsyncApp             : result: Hello

 

 

 

ListenableFuture


  • 스프링 4.0 부터 제공하는 Future 인터페이스를 확장한 ListenableFuture를 이용하면 비동기 처리의 결과 값을 사용할 수 있는callback을 추가할 수 있다.
  • @Async 어노테이션을 사용하는 메서드에서 스프링 4.1 부터 제공하는 ListenableFuture 인터페이스를 구현한 AsyncResult를 반환하면 된다.
@SpringBootApplication
@EnableAsync
@Slf4j
public class AsyncApp {

  @Service
  public static class MyService {
    @Async
    public ListenableFuture<String> hello() throws InterruptedException {
      log.info("hello()");
      Thread.sleep(1000);
      return new AsyncResult<>("Hello");
    }
  }

  public static void main(String[] args) {
    // try with resource 블록을 이용해 빈이 다 준비된 후 종료되도록 설정
    try (ConfigurableApplicationContext c = SpringApplication.run(AsyncApp.class, args)) {}
  }

  @Autowired
  MyService myService;

  @Bean
  ApplicationRunner run() {
    return args -> {
      log.info("run()");
      ListenableFuture<String> f = myService.hello();
      f.addCallback(s -> log.info(s), e-> log.info(e.getMessage()));
      log.info("exit");

      Thread.sleep(2000);
    };
  }
}
//결과
2022-06-17 08:47:32.224  INFO 36036 --- [  restartedMain] c.e.r._04_SyncAsync.AsyncApp             : Started AsyncApp in 0.676 seconds (JVM running for 0.969)
2022-06-17 08:47:32.225  INFO 36036 --- [  restartedMain] c.e.r._04_SyncAsync.AsyncApp             : run()
2022-06-17 08:47:32.229  INFO 36036 --- [  restartedMain] c.e.r._04_SyncAsync.AsyncApp             : exit
2022-06-17 08:47:32.231  INFO 36036 --- [         task-1] c.e.r._04_SyncAsync.AsyncApp             : hello()
2022-06-17 08:47:33.238  INFO 36036 --- [         task-1] c.e.r._04_SyncAsync.AsyncApp             : Hello

 

 

 

ThreadPoolTaskExecutor


  • @Async 어노테이션을 사용해 해당 메서드를 비동기적으로 호출할 경우 ThreadPool을 명시적으로 선언하지 않으면, 기본적으로 SimpleAsyncTaskExecutor를 사용한다. 
  • SimpleAsyncTaskExecutor는 각 비동기 호출마다 계속 새로운 스레드를 만들어 사용하기 때문에 비효율적이므로 이 경우 ThreadPoolTaskExecutor를 직접 만들어 사용하는게 효율적이다.
  • ThreadPoolTaskExecutor는 CorePool, QueueCapacity, MaxPoolSize를 직접 설정할 수 있다. 
    아래 코드를 살펴보자.
@SpringBootApplication
@EnableAsync
@Slf4j
public class AsyncApp {

  @Service
  public static class MyService {
    // 기본적으로 SimpleAsyncTaskExecutor 를 사용한다.
    // 이는 Thread 를 계속 새로 만들어 사용하기 때문에 비효율적이다.
    @Async
    // @Async("tp") ThreadPool 이 여러개일 경우 직접 지정 가능하다.
    public ListenableFuture<String> hello() throws InterruptedException {
      log.info("hello()");
      Thread.sleep(1000);
      return new AsyncResult<>("Hello");
    }
  }

  @Bean
  ThreadPoolTaskExecutor tp() {
    ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
    // 1. Thread pool 을 해당 개수까지 기본적으로 생성함. 처음 요청이 들어올 때 poll size 만큼 생성
    te.setCorePoolSize(10);
    // 2. 지금 당장은 Core Thread 를 모두 사용중 일 때 큐에 만들어 대기시킨다.
    te.setQueueCapacity(10);
    // 3. 대기하는 작업이 큐에 꽉 찰 경우 pool 을 해당 개수까지 더 생성한다.
    te.setMaxPoolSize(100);
    te.setThreadNamePrefix("myThread");
    return te;
  }
  
  public static void main(String[] args) {
    // try with resource 블록을 이용해 빈이 다 준비된 후 종료되도록 설정
    try (ConfigurableApplicationContext c = SpringApplication.run(AsyncApp.class, args)) {}
  }

  @Autowired
  MyService myService;

  @Bean
  ApplicationRunner run() {
    return args -> {
      log.info("run()");
      ListenableFuture<String> f = myService.hello();
      f.addCallback(s -> log.info(s), e-> log.info(e.getMessage()));
      log.info("exit");

      Thread.sleep(2000);
    };
  }
}

//결과
2022-06-17 09:45:22.276  INFO 36668 --- [  restartedMain] c.e.r._04_SyncAsync.AsyncApp             : Started AsyncApp in 0.653 seconds (JVM running for 1.108)
2022-06-17 09:45:22.277  INFO 36668 --- [  restartedMain] c.e.r._04_SyncAsync.AsyncApp             : run()
2022-06-17 09:45:22.279  INFO 36668 --- [  restartedMain] c.e.r._04_SyncAsync.AsyncApp             : exit
2022-06-17 09:45:22.281  INFO 36668 --- [      myThread1] c.e.r._04_SyncAsync.AsyncApp             : hello()
2022-06-17 09:45:23.289  INFO 36668 --- [      myThread1] c.e.r._04_SyncAsync.AsyncApp             : Hello

 

 

 

Servlet Async


  • @Async 어노테이션을 설명할 때 말했던 것 처럼 Spring MVC 3.2 부터 Servlet 3.0 기반의 비동기 요청 처리가 가능해졌다.
  • 기존 Controller 메서드를 Callable 로 변경함으로써 비동기로 만들 수 있다. 
  • Controller 메서드를 비동기로 변경해도 해당 처리가 Servlet Thread 가 아닌 다른 Thread 에서 발생한다는 점을 제외하면 기존 Controller 메서드의 동작 방식과는 큰 차이가 없다. 
  • Servlet 3.0: 비동기 서블릿
    • HTTP connection은 이미 논블록킹 IO
    • 서블릿 요청 읽기, 응답 쓰기는 블록킹
    • 비동기 작업 시작 즉시 서블릿 스레드 반납
    • 비동기 작업이 완료되면 서블릿 스레드 재할당
    • 비동기 서블릿 컨텍스트 이용 (AsyncContext)
  • Servlet 3.1: 논블록킹 IO
    • 논블록킹 서블릿 요청, 응답 처리
    • Callback
      • Thread 가 Block 되는 상황은 CPU 와 메모리 자원을 많이 소모한다. 
        (컨텍스트 스위칭이 일어나기 때문)
        기본적으로 Thread 가 Blocking 되면 Wating 상태로 변경되면서 컨텍스트 스위칭이 일어나고 추후 I/O 작업이 끝나 running 상태로 변경되면서 다시 컨텍스트 스위칭이 일어나 총 2번의 컨텍스트 스위칭이 일어난다.
      • Java InputStream 과 OutputStream 은 Blocking 방식이다.
        RequestHttpServletRequest, RequestHttpServletResponse 는 InputStream 과 OutputStream 을 사용하기 때문에 Servlet 은 기본적으로 Blocking I/O 방식이다.
@SpringBootApplication
@EnableAsync
@Slf4j
public class AsyncApp {

  @RestController
  public static class MyController {
    @GetMapping("/callable")
    public Callable<String> callable() {
      log.info("callable");
      return () -> {
        log.info("async");
        Thread.sleep(2000);
        return "hello";
      };
    }
  }

  public static void main(String[] args) {
    SpringApplication.run(AsyncApp.class, args);
  }
}

 

비동기 서블릿은 아래 그림과 같이 동작한다.

비동기 서블릿 구조

 

 

 

Load Test 진행 


지금부터는 Spring에서 Sync Servlet 을 이용할 때와 Async Servlet 을 이용했을 때의 차이점을 알아보기 위해 테스트를 진행할 것이다. 먼저 여러 Request 를 동시에 생성하는 Client 를 작성해보자.

Spring 에서 제공하는 RestTemplate 을 이용해 100개의 Request 를 동시에 호출한다.

@Slf4j
public class LoadTest {
  static AtomicInteger counter = new AtomicInteger(0);

  public static void main(String[] args) throws InterruptedException {
    ExecutorService es = Executors.newFixedThreadPool(100);
    RestTemplate restTemplate = new RestTemplate();
    String url = "http://localhost:8080/callable";

    StopWatch main = new StopWatch();
    main.start();

    for(int i=0; i<100; i++) {
      es.execute(() -> {
        int idx = counter.addAndGet(1);
        log.info("Thread " + idx);

        StopWatch sw = new StopWatch();
        sw.start();

        restTemplate.getForObject(url, String.class);

        sw.stop();
        log.info("Elapsed: {} -> {}", idx, sw.getTotalTimeSeconds());
      });
    }
    es.shutdown();
    // 지정된 시간이 타임아웃 걸리기 전이라면 대기작업이 진행될 때까지 기다린다.
    // (100초안에 작업이 끝날때까지 기다리거나, 100초가 초과되면 종료)
    es.awaitTermination(100, TimeUnit.SECONDS);
    main.stop();
    log.info("Total: {}", main.getTotalTimeSeconds());
  }
}

 

위 비동기 서블릿 구조 그림 처럼 Async Servlet 은 Client 로 부터 요청을 받은 후 실제 작업은 작업 서블릿 풀에 위임하고, 현재의 서블릿 스레드는 서블릿 스레드 풀에 반환 후 다음 요청이 들어올 경우 사용할 수 있도록 한다.

 

Sync Servlet 은 요청을 받은 서블릿 스레드에서 실제 작업까지 전부 진행하기 때문에 요청에 대한 응답을 반환하기 전까지는 새로운 요청을 처리할 수 없는 상태이다. 

 

실제 이처럼 동작하는지 확인하기 위해 application.properties 파일에서 아래와 같이 Tomcat 의 스레드 개수를 1개로 설정한다.

server.tomcat.max-threads=1

 

 

 

Sync vs Async


먼저 아래와 같이 Sync Servlet을 이용해 서버를 띄운 후 위의 Client 코드를 이용해 테스트를 진행합니다.

@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

    @RestController
    public static class MyController {
        @GetMapping("/callable")
        public String callable() throws InterruptedException {
            log.info("sync");
            Thread.sleep(2000);
            return "hello";
        }
    }

    public static void main(String[] args) {
        SpringApplication.run(StudyApplication.class, args);
    }
}

 

해당 서버 (위 코드)를 띄우고, Client(LoadTest) 코드를 사용해 테스트를 진행하면 결과는 아래와 같다.

Tomcat 의 스레드가 하나이며, Sync 방식으로 동작하기 때문에 한 번에 하나의 클라이언트 요청만 처리할 수 있습니다.

서버 로그를 확인하면 nio-8080-exec-1 라는 이름을 가진 한 개의 스레드가 요청을 처리하고 있다.

동기 서블릿 테스트 결과 - https://jongmin92.github.io/

 

이번에는 JMC(Java Mission Control)를 이용해 실제 서버의 스레드 상황을 살펴보자.

JMC를 이용하기 위해서는 서버를 실행할 때 아래와 같은 JVM 옵션을 추가한다.

-XX:+UnlockCommercialFeatures
-XX:+FlightRecorder
-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Djava.rmi.server.hostname=localhost
 

JMC를 이용해 클라이언트 요청이 들어올 때, Thread 상태를 보면 아래와 같다.

동시에 100개의 클라이언트 요청이 들어왔지만, 스레드 수는 그대로 유지되고 있으며 여러 스레드 목록 중에 nio-8080-exec-1 스레드가 존재하고 있는것을 확인할 수 있다.

동기 서블릿 테스트 결과 - 스레드

 

이번에는 서버 코드를 아래와 같이 Async Servlet 을 이용하도록 수정한 후 서버를 띄워 Client 코드를 이용해 테스트를 진행한다.

(작업 스레드 풀은 WebMvcConfigurer 를 통해 설정해주자.)

@SpringBootApplication
@EnableAsync
@Slf4j
public class AsyncApp {

  @RestController
  public static class MyController {
    @GetMapping("/callable")
    public Callable<String> callable() {
      return () -> {
        log.info("async");
        Thread.sleep(2000);
        return "hello";
      };
    }
  }
  
  @Bean
  WebMvcConfigurer configurer() {
    return new WebMvcConfigurer() {
      // 워커 스레드 풀 설정
      @Override
      public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        te.setCorePoolSize(100);
        te.setQueueCapacity(50);
        te.setMaxPoolSize(200);
        te.setThreadNamePrefix("workThread");
        te.initialize();
        configurer.setTaskExecutor(te);
      }
    };
  }

  public static void main(String[] args) {
    SpringApplication.run(AsyncApp.class, args);
  }
}

 

Client(LoadTest) 코드를 사용해 테스트를 진행하면 결과는 아래와 같다.

Tomcat 의 스레드가 하나이지만 Async 방식으로 동작하기 때문에 해당 요청에 대한 실제 처리는 워커 스레드 풀에서 사용되고 있지 않은 스레드를 이용해 처리한다. 서버 로그를 확인하면 nio-8080-exec-1 라는 이름을 가진 한개의 Tomcat 스레드와 workThreadX 라는 이름을 가진 100개의 워커 스레드를 확인할 수 있습니다.

 

비동기 서블릿 테스트 결과 - https://jongmin92.github.io/

 

이번에도 JMC(Java Mission Control)를 이용해 실제 서버의 스레드 상황을 살펴보자.

nio-8080-exec-1 라는 이름을 가진 한개의 Tomcat 스레드와 workThreadX 라는 이름을 가진 100개의 워커 스레드를 확인할 수 있다.

비동기 서블릿 테스트 결과 - 스레드

 

 

 

DeferredResult


  • DeferredResult 는 Spring 3.2 부터 사용 가능하다. 
  • 비동기 요청 처리를 위해 사용하는 Callable 의 대안을 제공한다. 
  • “지연된 결과” 를 의미하며 외부의 이벤트 혹은 클라이언트 요청에 의해서 지연되어 있는 HTTP 요청에 대한 응답을 나중에 써줄 수 있는 기술이다. 
  • 별도로 워커 스레드를 만들어 대기하지 않고도 처리가 가능합니다.

DeferredResult 구조

@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

    @RestController
    public static class MyController {
        Queue<DeferredResult<String>> results = new ConcurrentLinkedQueue<>();

        @GetMapping("/dr")
        public DeferredResult<String> dr() {
            log.info("dr");
            DeferredResult<String> dr = new DeferredResult<>();
            results.add(dr);
            return dr;
        }

        @GetMapping("/dr/count")
        public String drCount() {
            return String.valueOf(results.size());
        }

        @GetMapping("/dr/event")
        public String drEvent(String msg) {
            for (DeferredResult<String> dr : results) {
                dr.setResult("Hello " + msg);
                results.remove(dr);
            }
            return "OK";
        }
    }

    public static void main(String[] args) {
        SpringApplication.run(StudyApplication.class, args);
    }
}

LoadTest 코드를 이용해 /dr 로 100개의 요청을 보내고, 크롬에서 /dr/count 로 DeferredResult가 담겨있는 큐의 사이즈를 확인해보자. 그리고 마지막으로 /dr/event 로 큐에 담긴 DeferredResult 객체에 setResult 로 결과를 반환한다.
100개의 요청이 동시에 완료되는 것을 확인할 수 있다.

DeferredResult 결과 -&nbsp;https://jongmin92.github.io/

 

 

 

ResponseBodyEmitter


  • ResponseBodyEmitter 는 Spring 4.2 부터 사용 가능하다. 
  • 비동기 요청 처리의 결과로 하나 이상의 응답을 위해 사용되는 리턴 값 Type 이며, DeferredResult 가 하나의 결과를 생성해 요청을 처리했다면, ResponseBodyEmitter는 여러개의 결과를 만들어 요청을 처리할 수 있습니다.
@SpringBootApplication
@EnableAsync
@Slf4j
public class StudyApplication {

    @RestController
    public static class MyController {

        @GetMapping("/emitter")
        public ResponseBodyEmitter emitter() {
            ResponseBodyEmitter emitter = new ResponseBodyEmitter();

            Executors.newSingleThreadExecutor().submit(() -> {
                try {
                    for (int i = 0; i < 50; i++) {
                        emitter.send("<p>Stream " + i + "</p>");
                        Thread.sleep(100);
                    }
                } catch (Exception e) {
                }
            });

            return emitter;
        }
    }

    public static void main(String[] args) {
        SpringApplication.run(StudyApplication.class, args);
    }
}

https://jongmin92.github.io/

 

 

 

참고


해당 포스팅은 토비님의 유튜브 강의와 Jongmin 님 블로그를 보고 직접 실습하며 작성한 포스팅입니다.