ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 토비의 봄 TV - 스프링의 비동기 기술 (4) 2/2
    Spring/Webflux 2022. 6. 17. 10:48

    @Async


    • Spring MVC 3.2 부터 Servlet 3.0 기반의 비동기 요청 처리가 가능해졌다.
    • @Async 어노테이션을 추가해 해당 메서드를 비동기적으로 호출할 수 있다. 
    • 해당 메서드를 호출한 호출자(caller)는 즉시 리턴하고 메소드의 실제 실행은 Spring TaskExecutor에 의해서 실행된다.
    • 비동기로 실행되는 메서드는 Future 형식의 값을 리턴하고, 호출자는 해당 Future의 get() 메서드를 호출하기 전에 다른 작업을 수행할 수 있다.
    @SpringBootApplication
    @EnableAsync
    @Slf4j
    public class AsyncApp {
      @Service
      public static class MyService {
         /*
           내부적으로 AOP를 이용해 복잡한 로직이 실행된다.
           비동기 작업은 return값으로 바로 결과를 줄 수 없다.
           (Future 혹은 Callback을 이용해야 한다.)
         */
        @Async
        public Future<String> hello() throws InterruptedException {
          log.info("hello()");
          Thread.sleep(1000);
          return new AsyncResult<>("Hello");
        }
      }
      public static void main(String[] args) {
        // try with resource 블록을 이용해 빈이 다 준비된 후 종료되도록 설정
        try (ConfigurableApplicationContext c = SpringApplication.run(AsyncApp.class, args)) {}
      }
      @Autowired
      MyService myService;
    
      // 모든 빈이 다 준비된 후 실행됨 (현재는 일종의 컨트롤러라고 생각)
      @Bean
      ApplicationRunner run() {
        return args -> {
          log.info("run()");
          Future<String> res = myService.hello();
          log.info("exit: {}", res.isDone());
          log.info("result: {}", res.get());
        };
      }
    }
    //결과
    2022-06-17 08:40:41.488  INFO 35957 --- [  restartedMain] c.e.r._04_SyncAsync.AsyncApp             : Started AsyncApp in 0.682 seconds (JVM running for 1.15)
    2022-06-17 08:40:41.489  INFO 35957 --- [  restartedMain] c.e.r._04_SyncAsync.AsyncApp             : run()
    2022-06-17 08:40:41.493  INFO 35957 --- [  restartedMain] c.e.r._04_SyncAsync.AsyncApp             : exit: false
    2022-06-17 08:40:41.496  INFO 35957 --- [         task-1] c.e.r._04_SyncAsync.AsyncApp             : hello()
    2022-06-17 08:40:42.502  INFO 35957 --- [  restartedMain] c.e.r._04_SyncAsync.AsyncApp             : result: Hello

     

     

     

    ListenableFuture


    • 스프링 4.0 부터 제공하는 Future 인터페이스를 확장한 ListenableFuture를 이용하면 비동기 처리의 결과 값을 사용할 수 있는callback을 추가할 수 있다.
    • @Async 어노테이션을 사용하는 메서드에서 스프링 4.1 부터 제공하는 ListenableFuture 인터페이스를 구현한 AsyncResult를 반환하면 된다.
    @SpringBootApplication
    @EnableAsync
    @Slf4j
    public class AsyncApp {
    
      @Service
      public static class MyService {
        @Async
        public ListenableFuture<String> hello() throws InterruptedException {
          log.info("hello()");
          Thread.sleep(1000);
          return new AsyncResult<>("Hello");
        }
      }
    
      public static void main(String[] args) {
        // try with resource 블록을 이용해 빈이 다 준비된 후 종료되도록 설정
        try (ConfigurableApplicationContext c = SpringApplication.run(AsyncApp.class, args)) {}
      }
    
      @Autowired
      MyService myService;
    
      @Bean
      ApplicationRunner run() {
        return args -> {
          log.info("run()");
          ListenableFuture<String> f = myService.hello();
          f.addCallback(s -> log.info(s), e-> log.info(e.getMessage()));
          log.info("exit");
    
          Thread.sleep(2000);
        };
      }
    }
    //결과
    2022-06-17 08:47:32.224  INFO 36036 --- [  restartedMain] c.e.r._04_SyncAsync.AsyncApp             : Started AsyncApp in 0.676 seconds (JVM running for 0.969)
    2022-06-17 08:47:32.225  INFO 36036 --- [  restartedMain] c.e.r._04_SyncAsync.AsyncApp             : run()
    2022-06-17 08:47:32.229  INFO 36036 --- [  restartedMain] c.e.r._04_SyncAsync.AsyncApp             : exit
    2022-06-17 08:47:32.231  INFO 36036 --- [         task-1] c.e.r._04_SyncAsync.AsyncApp             : hello()
    2022-06-17 08:47:33.238  INFO 36036 --- [         task-1] c.e.r._04_SyncAsync.AsyncApp             : Hello

     

     

     

    ThreadPoolTaskExecutor


    • @Async 어노테이션을 사용해 해당 메서드를 비동기적으로 호출할 경우 ThreadPool을 명시적으로 선언하지 않으면, 기본적으로 SimpleAsyncTaskExecutor를 사용한다. 
    • SimpleAsyncTaskExecutor는 각 비동기 호출마다 계속 새로운 스레드를 만들어 사용하기 때문에 비효율적이므로 이 경우 ThreadPoolTaskExecutor를 직접 만들어 사용하는게 효율적이다.
    • ThreadPoolTaskExecutor는 CorePool, QueueCapacity, MaxPoolSize를 직접 설정할 수 있다. 
      아래 코드를 살펴보자.
    @SpringBootApplication
    @EnableAsync
    @Slf4j
    public class AsyncApp {
    
      @Service
      public static class MyService {
        // 기본적으로 SimpleAsyncTaskExecutor 를 사용한다.
        // 이는 Thread 를 계속 새로 만들어 사용하기 때문에 비효율적이다.
        @Async
        // @Async("tp") ThreadPool 이 여러개일 경우 직접 지정 가능하다.
        public ListenableFuture<String> hello() throws InterruptedException {
          log.info("hello()");
          Thread.sleep(1000);
          return new AsyncResult<>("Hello");
        }
      }
    
      @Bean
      ThreadPoolTaskExecutor tp() {
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        // 1. Thread pool 을 해당 개수까지 기본적으로 생성함. 처음 요청이 들어올 때 poll size 만큼 생성
        te.setCorePoolSize(10);
        // 2. 지금 당장은 Core Thread 를 모두 사용중 일 때 큐에 만들어 대기시킨다.
        te.setQueueCapacity(10);
        // 3. 대기하는 작업이 큐에 꽉 찰 경우 pool 을 해당 개수까지 더 생성한다.
        te.setMaxPoolSize(100);
        te.setThreadNamePrefix("myThread");
        return te;
      }
      
      public static void main(String[] args) {
        // try with resource 블록을 이용해 빈이 다 준비된 후 종료되도록 설정
        try (ConfigurableApplicationContext c = SpringApplication.run(AsyncApp.class, args)) {}
      }
    
      @Autowired
      MyService myService;
    
      @Bean
      ApplicationRunner run() {
        return args -> {
          log.info("run()");
          ListenableFuture<String> f = myService.hello();
          f.addCallback(s -> log.info(s), e-> log.info(e.getMessage()));
          log.info("exit");
    
          Thread.sleep(2000);
        };
      }
    }
    
    //결과
    2022-06-17 09:45:22.276  INFO 36668 --- [  restartedMain] c.e.r._04_SyncAsync.AsyncApp             : Started AsyncApp in 0.653 seconds (JVM running for 1.108)
    2022-06-17 09:45:22.277  INFO 36668 --- [  restartedMain] c.e.r._04_SyncAsync.AsyncApp             : run()
    2022-06-17 09:45:22.279  INFO 36668 --- [  restartedMain] c.e.r._04_SyncAsync.AsyncApp             : exit
    2022-06-17 09:45:22.281  INFO 36668 --- [      myThread1] c.e.r._04_SyncAsync.AsyncApp             : hello()
    2022-06-17 09:45:23.289  INFO 36668 --- [      myThread1] c.e.r._04_SyncAsync.AsyncApp             : Hello

     

     

     

    Servlet Async


    • @Async 어노테이션을 설명할 때 말했던 것 처럼 Spring MVC 3.2 부터 Servlet 3.0 기반의 비동기 요청 처리가 가능해졌다.
    • 기존 Controller 메서드를 Callable 로 변경함으로써 비동기로 만들 수 있다. 
    • Controller 메서드를 비동기로 변경해도 해당 처리가 Servlet Thread 가 아닌 다른 Thread 에서 발생한다는 점을 제외하면 기존 Controller 메서드의 동작 방식과는 큰 차이가 없다. 
    • Servlet 3.0: 비동기 서블릿
      • HTTP connection은 이미 논블록킹 IO
      • 서블릿 요청 읽기, 응답 쓰기는 블록킹
      • 비동기 작업 시작 즉시 서블릿 스레드 반납
      • 비동기 작업이 완료되면 서블릿 스레드 재할당
      • 비동기 서블릿 컨텍스트 이용 (AsyncContext)
    • Servlet 3.1: 논블록킹 IO
      • 논블록킹 서블릿 요청, 응답 처리
      • Callback
        • Thread 가 Block 되는 상황은 CPU 와 메모리 자원을 많이 소모한다. 
          (컨텍스트 스위칭이 일어나기 때문)
          기본적으로 Thread 가 Blocking 되면 Wating 상태로 변경되면서 컨텍스트 스위칭이 일어나고 추후 I/O 작업이 끝나 running 상태로 변경되면서 다시 컨텍스트 스위칭이 일어나 총 2번의 컨텍스트 스위칭이 일어난다.
        • Java InputStream 과 OutputStream 은 Blocking 방식이다.
          RequestHttpServletRequest, RequestHttpServletResponse 는 InputStream 과 OutputStream 을 사용하기 때문에 Servlet 은 기본적으로 Blocking I/O 방식이다.
    @SpringBootApplication
    @EnableAsync
    @Slf4j
    public class AsyncApp {
    
      @RestController
      public static class MyController {
        @GetMapping("/callable")
        public Callable<String> callable() {
          log.info("callable");
          return () -> {
            log.info("async");
            Thread.sleep(2000);
            return "hello";
          };
        }
      }
    
      public static void main(String[] args) {
        SpringApplication.run(AsyncApp.class, args);
      }
    }

     

    비동기 서블릿은 아래 그림과 같이 동작한다.

    비동기 서블릿 구조

     

     

     

    Load Test 진행 


    지금부터는 Spring에서 Sync Servlet 을 이용할 때와 Async Servlet 을 이용했을 때의 차이점을 알아보기 위해 테스트를 진행할 것이다. 먼저 여러 Request 를 동시에 생성하는 Client 를 작성해보자.

    Spring 에서 제공하는 RestTemplate 을 이용해 100개의 Request 를 동시에 호출한다.

    @Slf4j
    public class LoadTest {
      static AtomicInteger counter = new AtomicInteger(0);
    
      public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(100);
        RestTemplate restTemplate = new RestTemplate();
        String url = "http://localhost:8080/callable";
    
        StopWatch main = new StopWatch();
        main.start();
    
        for(int i=0; i<100; i++) {
          es.execute(() -> {
            int idx = counter.addAndGet(1);
            log.info("Thread " + idx);
    
            StopWatch sw = new StopWatch();
            sw.start();
    
            restTemplate.getForObject(url, String.class);
    
            sw.stop();
            log.info("Elapsed: {} -> {}", idx, sw.getTotalTimeSeconds());
          });
        }
        es.shutdown();
        // 지정된 시간이 타임아웃 걸리기 전이라면 대기작업이 진행될 때까지 기다린다.
        // (100초안에 작업이 끝날때까지 기다리거나, 100초가 초과되면 종료)
        es.awaitTermination(100, TimeUnit.SECONDS);
        main.stop();
        log.info("Total: {}", main.getTotalTimeSeconds());
      }
    }
    

     

    위 비동기 서블릿 구조 그림 처럼 Async Servlet 은 Client 로 부터 요청을 받은 후 실제 작업은 작업 서블릿 풀에 위임하고, 현재의 서블릿 스레드는 서블릿 스레드 풀에 반환 후 다음 요청이 들어올 경우 사용할 수 있도록 한다.

     

    Sync Servlet 은 요청을 받은 서블릿 스레드에서 실제 작업까지 전부 진행하기 때문에 요청에 대한 응답을 반환하기 전까지는 새로운 요청을 처리할 수 없는 상태이다. 

     

    실제 이처럼 동작하는지 확인하기 위해 application.properties 파일에서 아래와 같이 Tomcat 의 스레드 개수를 1개로 설정한다.

    server.tomcat.max-threads=1

     

     

     

    Sync vs Async


    먼저 아래와 같이 Sync Servlet을 이용해 서버를 띄운 후 위의 Client 코드를 이용해 테스트를 진행합니다.

    @SpringBootApplication
    @EnableAsync
    @Slf4j
    public class StudyApplication {
    
        @RestController
        public static class MyController {
            @GetMapping("/callable")
            public String callable() throws InterruptedException {
                log.info("sync");
                Thread.sleep(2000);
                return "hello";
            }
        }
    
        public static void main(String[] args) {
            SpringApplication.run(StudyApplication.class, args);
        }
    }

     

    해당 서버 (위 코드)를 띄우고, Client(LoadTest) 코드를 사용해 테스트를 진행하면 결과는 아래와 같다.

    Tomcat 의 스레드가 하나이며, Sync 방식으로 동작하기 때문에 한 번에 하나의 클라이언트 요청만 처리할 수 있습니다.

    서버 로그를 확인하면 nio-8080-exec-1 라는 이름을 가진 한 개의 스레드가 요청을 처리하고 있다.

    동기 서블릿 테스트 결과 - https://jongmin92.github.io/

     

    이번에는 JMC(Java Mission Control)를 이용해 실제 서버의 스레드 상황을 살펴보자.

    JMC를 이용하기 위해서는 서버를 실행할 때 아래와 같은 JVM 옵션을 추가한다.

    -XX:+UnlockCommercialFeatures
    -XX:+FlightRecorder
    -Dcom.sun.management.jmxremote
    -Dcom.sun.management.jmxremote.authenticate=false
    -Dcom.sun.management.jmxremote.ssl=false
    -Djava.rmi.server.hostname=localhost
     

    JMC를 이용해 클라이언트 요청이 들어올 때, Thread 상태를 보면 아래와 같다.

    동시에 100개의 클라이언트 요청이 들어왔지만, 스레드 수는 그대로 유지되고 있으며 여러 스레드 목록 중에 nio-8080-exec-1 스레드가 존재하고 있는것을 확인할 수 있다.

    동기 서블릿 테스트 결과 - 스레드

     

    이번에는 서버 코드를 아래와 같이 Async Servlet 을 이용하도록 수정한 후 서버를 띄워 Client 코드를 이용해 테스트를 진행한다.

    (작업 스레드 풀은 WebMvcConfigurer 를 통해 설정해주자.)

    @SpringBootApplication
    @EnableAsync
    @Slf4j
    public class AsyncApp {
    
      @RestController
      public static class MyController {
        @GetMapping("/callable")
        public Callable<String> callable() {
          return () -> {
            log.info("async");
            Thread.sleep(2000);
            return "hello";
          };
        }
      }
      
      @Bean
      WebMvcConfigurer configurer() {
        return new WebMvcConfigurer() {
          // 워커 스레드 풀 설정
          @Override
          public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
            ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
            te.setCorePoolSize(100);
            te.setQueueCapacity(50);
            te.setMaxPoolSize(200);
            te.setThreadNamePrefix("workThread");
            te.initialize();
            configurer.setTaskExecutor(te);
          }
        };
      }
    
      public static void main(String[] args) {
        SpringApplication.run(AsyncApp.class, args);
      }
    }

     

    Client(LoadTest) 코드를 사용해 테스트를 진행하면 결과는 아래와 같다.

    Tomcat 의 스레드가 하나이지만 Async 방식으로 동작하기 때문에 해당 요청에 대한 실제 처리는 워커 스레드 풀에서 사용되고 있지 않은 스레드를 이용해 처리한다. 서버 로그를 확인하면 nio-8080-exec-1 라는 이름을 가진 한개의 Tomcat 스레드와 workThreadX 라는 이름을 가진 100개의 워커 스레드를 확인할 수 있습니다.

     

    비동기 서블릿 테스트 결과 - https://jongmin92.github.io/

     

    이번에도 JMC(Java Mission Control)를 이용해 실제 서버의 스레드 상황을 살펴보자.

    nio-8080-exec-1 라는 이름을 가진 한개의 Tomcat 스레드와 workThreadX 라는 이름을 가진 100개의 워커 스레드를 확인할 수 있다.

    비동기 서블릿 테스트 결과 - 스레드

     

     

     

    DeferredResult


    • DeferredResult 는 Spring 3.2 부터 사용 가능하다. 
    • 비동기 요청 처리를 위해 사용하는 Callable 의 대안을 제공한다. 
    • “지연된 결과” 를 의미하며 외부의 이벤트 혹은 클라이언트 요청에 의해서 지연되어 있는 HTTP 요청에 대한 응답을 나중에 써줄 수 있는 기술이다. 
    • 별도로 워커 스레드를 만들어 대기하지 않고도 처리가 가능합니다.

    DeferredResult 구조

    @SpringBootApplication
    @EnableAsync
    @Slf4j
    public class StudyApplication {
    
        @RestController
        public static class MyController {
            Queue<DeferredResult<String>> results = new ConcurrentLinkedQueue<>();
    
            @GetMapping("/dr")
            public DeferredResult<String> dr() {
                log.info("dr");
                DeferredResult<String> dr = new DeferredResult<>();
                results.add(dr);
                return dr;
            }
    
            @GetMapping("/dr/count")
            public String drCount() {
                return String.valueOf(results.size());
            }
    
            @GetMapping("/dr/event")
            public String drEvent(String msg) {
                for (DeferredResult<String> dr : results) {
                    dr.setResult("Hello " + msg);
                    results.remove(dr);
                }
                return "OK";
            }
        }
    
        public static void main(String[] args) {
            SpringApplication.run(StudyApplication.class, args);
        }
    }

    LoadTest 코드를 이용해 /dr 로 100개의 요청을 보내고, 크롬에서 /dr/count 로 DeferredResult가 담겨있는 큐의 사이즈를 확인해보자. 그리고 마지막으로 /dr/event 로 큐에 담긴 DeferredResult 객체에 setResult 로 결과를 반환한다.
    100개의 요청이 동시에 완료되는 것을 확인할 수 있다.

    DeferredResult 결과 -&nbsp;https://jongmin92.github.io/

     

     

     

    ResponseBodyEmitter


    • ResponseBodyEmitter 는 Spring 4.2 부터 사용 가능하다. 
    • 비동기 요청 처리의 결과로 하나 이상의 응답을 위해 사용되는 리턴 값 Type 이며, DeferredResult 가 하나의 결과를 생성해 요청을 처리했다면, ResponseBodyEmitter는 여러개의 결과를 만들어 요청을 처리할 수 있습니다.
    @SpringBootApplication
    @EnableAsync
    @Slf4j
    public class StudyApplication {
    
        @RestController
        public static class MyController {
    
            @GetMapping("/emitter")
            public ResponseBodyEmitter emitter() {
                ResponseBodyEmitter emitter = new ResponseBodyEmitter();
    
                Executors.newSingleThreadExecutor().submit(() -> {
                    try {
                        for (int i = 0; i < 50; i++) {
                            emitter.send("<p>Stream " + i + "</p>");
                            Thread.sleep(100);
                        }
                    } catch (Exception e) {
                    }
                });
    
                return emitter;
            }
        }
    
        public static void main(String[] args) {
            SpringApplication.run(StudyApplication.class, args);
        }
    }

    https://jongmin92.github.io/

     

     

     

    참고


    해당 포스팅은 토비님의 유튜브 강의와 Jongmin 님 블로그를 보고 직접 실습하며 작성한 포스팅입니다.

Designed by Tistory.