-
토비의 봄 TV - 스프링의 비동기 기술 (4) 2/2Spring/Webflux 2022. 6. 17. 10:48
@Async
- Spring MVC 3.2 부터 Servlet 3.0 기반의 비동기 요청 처리가 가능해졌다.
- @Async 어노테이션을 추가해 해당 메서드를 비동기적으로 호출할 수 있다.
- 해당 메서드를 호출한 호출자(caller)는 즉시 리턴하고 메소드의 실제 실행은 Spring TaskExecutor에 의해서 실행된다.
- 비동기로 실행되는 메서드는 Future 형식의 값을 리턴하고, 호출자는 해당 Future의 get() 메서드를 호출하기 전에 다른 작업을 수행할 수 있다.
@SpringBootApplication @EnableAsync @Slf4j public class AsyncApp { @Service public static class MyService { /* 내부적으로 AOP를 이용해 복잡한 로직이 실행된다. 비동기 작업은 return값으로 바로 결과를 줄 수 없다. (Future 혹은 Callback을 이용해야 한다.) */ @Async public Future<String> hello() throws InterruptedException { log.info("hello()"); Thread.sleep(1000); return new AsyncResult<>("Hello"); } } public static void main(String[] args) { // try with resource 블록을 이용해 빈이 다 준비된 후 종료되도록 설정 try (ConfigurableApplicationContext c = SpringApplication.run(AsyncApp.class, args)) {} } @Autowired MyService myService; // 모든 빈이 다 준비된 후 실행됨 (현재는 일종의 컨트롤러라고 생각) @Bean ApplicationRunner run() { return args -> { log.info("run()"); Future<String> res = myService.hello(); log.info("exit: {}", res.isDone()); log.info("result: {}", res.get()); }; } } //결과 2022-06-17 08:40:41.488 INFO 35957 --- [ restartedMain] c.e.r._04_SyncAsync.AsyncApp : Started AsyncApp in 0.682 seconds (JVM running for 1.15) 2022-06-17 08:40:41.489 INFO 35957 --- [ restartedMain] c.e.r._04_SyncAsync.AsyncApp : run() 2022-06-17 08:40:41.493 INFO 35957 --- [ restartedMain] c.e.r._04_SyncAsync.AsyncApp : exit: false 2022-06-17 08:40:41.496 INFO 35957 --- [ task-1] c.e.r._04_SyncAsync.AsyncApp : hello() 2022-06-17 08:40:42.502 INFO 35957 --- [ restartedMain] c.e.r._04_SyncAsync.AsyncApp : result: Hello
ListenableFuture
- 스프링 4.0 부터 제공하는 Future 인터페이스를 확장한 ListenableFuture를 이용하면 비동기 처리의 결과 값을 사용할 수 있는callback을 추가할 수 있다.
- @Async 어노테이션을 사용하는 메서드에서 스프링 4.1 부터 제공하는 ListenableFuture 인터페이스를 구현한 AsyncResult를 반환하면 된다.
@SpringBootApplication @EnableAsync @Slf4j public class AsyncApp { @Service public static class MyService { @Async public ListenableFuture<String> hello() throws InterruptedException { log.info("hello()"); Thread.sleep(1000); return new AsyncResult<>("Hello"); } } public static void main(String[] args) { // try with resource 블록을 이용해 빈이 다 준비된 후 종료되도록 설정 try (ConfigurableApplicationContext c = SpringApplication.run(AsyncApp.class, args)) {} } @Autowired MyService myService; @Bean ApplicationRunner run() { return args -> { log.info("run()"); ListenableFuture<String> f = myService.hello(); f.addCallback(s -> log.info(s), e-> log.info(e.getMessage())); log.info("exit"); Thread.sleep(2000); }; } } //결과 2022-06-17 08:47:32.224 INFO 36036 --- [ restartedMain] c.e.r._04_SyncAsync.AsyncApp : Started AsyncApp in 0.676 seconds (JVM running for 0.969) 2022-06-17 08:47:32.225 INFO 36036 --- [ restartedMain] c.e.r._04_SyncAsync.AsyncApp : run() 2022-06-17 08:47:32.229 INFO 36036 --- [ restartedMain] c.e.r._04_SyncAsync.AsyncApp : exit 2022-06-17 08:47:32.231 INFO 36036 --- [ task-1] c.e.r._04_SyncAsync.AsyncApp : hello() 2022-06-17 08:47:33.238 INFO 36036 --- [ task-1] c.e.r._04_SyncAsync.AsyncApp : Hello
ThreadPoolTaskExecutor
- @Async 어노테이션을 사용해 해당 메서드를 비동기적으로 호출할 경우 ThreadPool을 명시적으로 선언하지 않으면, 기본적으로 SimpleAsyncTaskExecutor를 사용한다.
- SimpleAsyncTaskExecutor는 각 비동기 호출마다 계속 새로운 스레드를 만들어 사용하기 때문에 비효율적이므로 이 경우 ThreadPoolTaskExecutor를 직접 만들어 사용하는게 효율적이다.
- ThreadPoolTaskExecutor는 CorePool, QueueCapacity, MaxPoolSize를 직접 설정할 수 있다.
아래 코드를 살펴보자.
@SpringBootApplication @EnableAsync @Slf4j public class AsyncApp { @Service public static class MyService { // 기본적으로 SimpleAsyncTaskExecutor 를 사용한다. // 이는 Thread 를 계속 새로 만들어 사용하기 때문에 비효율적이다. @Async // @Async("tp") ThreadPool 이 여러개일 경우 직접 지정 가능하다. public ListenableFuture<String> hello() throws InterruptedException { log.info("hello()"); Thread.sleep(1000); return new AsyncResult<>("Hello"); } } @Bean ThreadPoolTaskExecutor tp() { ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor(); // 1. Thread pool 을 해당 개수까지 기본적으로 생성함. 처음 요청이 들어올 때 poll size 만큼 생성 te.setCorePoolSize(10); // 2. 지금 당장은 Core Thread 를 모두 사용중 일 때 큐에 만들어 대기시킨다. te.setQueueCapacity(10); // 3. 대기하는 작업이 큐에 꽉 찰 경우 pool 을 해당 개수까지 더 생성한다. te.setMaxPoolSize(100); te.setThreadNamePrefix("myThread"); return te; } public static void main(String[] args) { // try with resource 블록을 이용해 빈이 다 준비된 후 종료되도록 설정 try (ConfigurableApplicationContext c = SpringApplication.run(AsyncApp.class, args)) {} } @Autowired MyService myService; @Bean ApplicationRunner run() { return args -> { log.info("run()"); ListenableFuture<String> f = myService.hello(); f.addCallback(s -> log.info(s), e-> log.info(e.getMessage())); log.info("exit"); Thread.sleep(2000); }; } } //결과 2022-06-17 09:45:22.276 INFO 36668 --- [ restartedMain] c.e.r._04_SyncAsync.AsyncApp : Started AsyncApp in 0.653 seconds (JVM running for 1.108) 2022-06-17 09:45:22.277 INFO 36668 --- [ restartedMain] c.e.r._04_SyncAsync.AsyncApp : run() 2022-06-17 09:45:22.279 INFO 36668 --- [ restartedMain] c.e.r._04_SyncAsync.AsyncApp : exit 2022-06-17 09:45:22.281 INFO 36668 --- [ myThread1] c.e.r._04_SyncAsync.AsyncApp : hello() 2022-06-17 09:45:23.289 INFO 36668 --- [ myThread1] c.e.r._04_SyncAsync.AsyncApp : Hello
Servlet Async
- @Async 어노테이션을 설명할 때 말했던 것 처럼 Spring MVC 3.2 부터 Servlet 3.0 기반의 비동기 요청 처리가 가능해졌다.
- 기존 Controller 메서드를 Callable 로 변경함으로써 비동기로 만들 수 있다.
- Controller 메서드를 비동기로 변경해도 해당 처리가 Servlet Thread 가 아닌 다른 Thread 에서 발생한다는 점을 제외하면 기존 Controller 메서드의 동작 방식과는 큰 차이가 없다.
- Servlet 3.0: 비동기 서블릿
- HTTP connection은 이미 논블록킹 IO
- 서블릿 요청 읽기, 응답 쓰기는 블록킹
- 비동기 작업 시작 즉시 서블릿 스레드 반납
- 비동기 작업이 완료되면 서블릿 스레드 재할당
- 비동기 서블릿 컨텍스트 이용 (AsyncContext)
- Servlet 3.1: 논블록킹 IO
- 논블록킹 서블릿 요청, 응답 처리
- Callback
- Thread 가 Block 되는 상황은 CPU 와 메모리 자원을 많이 소모한다.
(컨텍스트 스위칭이 일어나기 때문)
기본적으로 Thread 가 Blocking 되면 Wating 상태로 변경되면서 컨텍스트 스위칭이 일어나고 추후 I/O 작업이 끝나 running 상태로 변경되면서 다시 컨텍스트 스위칭이 일어나 총 2번의 컨텍스트 스위칭이 일어난다. - Java InputStream 과 OutputStream 은 Blocking 방식이다.
RequestHttpServletRequest, RequestHttpServletResponse 는 InputStream 과 OutputStream 을 사용하기 때문에 Servlet 은 기본적으로 Blocking I/O 방식이다.
- Thread 가 Block 되는 상황은 CPU 와 메모리 자원을 많이 소모한다.
@SpringBootApplication @EnableAsync @Slf4j public class AsyncApp { @RestController public static class MyController { @GetMapping("/callable") public Callable<String> callable() { log.info("callable"); return () -> { log.info("async"); Thread.sleep(2000); return "hello"; }; } } public static void main(String[] args) { SpringApplication.run(AsyncApp.class, args); } }
비동기 서블릿은 아래 그림과 같이 동작한다.
Load Test 진행
지금부터는 Spring에서 Sync Servlet 을 이용할 때와 Async Servlet 을 이용했을 때의 차이점을 알아보기 위해 테스트를 진행할 것이다. 먼저 여러 Request 를 동시에 생성하는 Client 를 작성해보자.
Spring 에서 제공하는 RestTemplate 을 이용해 100개의 Request 를 동시에 호출한다.
@Slf4j public class LoadTest { static AtomicInteger counter = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newFixedThreadPool(100); RestTemplate restTemplate = new RestTemplate(); String url = "http://localhost:8080/callable"; StopWatch main = new StopWatch(); main.start(); for(int i=0; i<100; i++) { es.execute(() -> { int idx = counter.addAndGet(1); log.info("Thread " + idx); StopWatch sw = new StopWatch(); sw.start(); restTemplate.getForObject(url, String.class); sw.stop(); log.info("Elapsed: {} -> {}", idx, sw.getTotalTimeSeconds()); }); } es.shutdown(); // 지정된 시간이 타임아웃 걸리기 전이라면 대기작업이 진행될 때까지 기다린다. // (100초안에 작업이 끝날때까지 기다리거나, 100초가 초과되면 종료) es.awaitTermination(100, TimeUnit.SECONDS); main.stop(); log.info("Total: {}", main.getTotalTimeSeconds()); } }
위 비동기 서블릿 구조 그림 처럼 Async Servlet 은 Client 로 부터 요청을 받은 후 실제 작업은 작업 서블릿 풀에 위임하고, 현재의 서블릿 스레드는 서블릿 스레드 풀에 반환 후 다음 요청이 들어올 경우 사용할 수 있도록 한다.
Sync Servlet 은 요청을 받은 서블릿 스레드에서 실제 작업까지 전부 진행하기 때문에 요청에 대한 응답을 반환하기 전까지는 새로운 요청을 처리할 수 없는 상태이다.
실제 이처럼 동작하는지 확인하기 위해 application.properties 파일에서 아래와 같이 Tomcat 의 스레드 개수를 1개로 설정한다.
server.tomcat.max-threads=1
Sync vs Async
먼저 아래와 같이 Sync Servlet을 이용해 서버를 띄운 후 위의 Client 코드를 이용해 테스트를 진행합니다.
@SpringBootApplication @EnableAsync @Slf4j public class StudyApplication { @RestController public static class MyController { @GetMapping("/callable") public String callable() throws InterruptedException { log.info("sync"); Thread.sleep(2000); return "hello"; } } public static void main(String[] args) { SpringApplication.run(StudyApplication.class, args); } }
해당 서버 (위 코드)를 띄우고, Client(LoadTest) 코드를 사용해 테스트를 진행하면 결과는 아래와 같다.
Tomcat 의 스레드가 하나이며, Sync 방식으로 동작하기 때문에 한 번에 하나의 클라이언트 요청만 처리할 수 있습니다.
서버 로그를 확인하면 nio-8080-exec-1 라는 이름을 가진 한 개의 스레드가 요청을 처리하고 있다.
이번에는 JMC(Java Mission Control)를 이용해 실제 서버의 스레드 상황을 살펴보자.
JMC를 이용하기 위해서는 서버를 실행할 때 아래와 같은 JVM 옵션을 추가한다.
-XX:+UnlockCommercialFeatures -XX:+FlightRecorder -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost
JMC를 이용해 클라이언트 요청이 들어올 때, Thread 상태를 보면 아래와 같다.
동시에 100개의 클라이언트 요청이 들어왔지만, 스레드 수는 그대로 유지되고 있으며 여러 스레드 목록 중에 nio-8080-exec-1 스레드가 존재하고 있는것을 확인할 수 있다.
이번에는 서버 코드를 아래와 같이 Async Servlet 을 이용하도록 수정한 후 서버를 띄워 Client 코드를 이용해 테스트를 진행한다.
(작업 스레드 풀은 WebMvcConfigurer 를 통해 설정해주자.)
@SpringBootApplication @EnableAsync @Slf4j public class AsyncApp { @RestController public static class MyController { @GetMapping("/callable") public Callable<String> callable() { return () -> { log.info("async"); Thread.sleep(2000); return "hello"; }; } } @Bean WebMvcConfigurer configurer() { return new WebMvcConfigurer() { // 워커 스레드 풀 설정 @Override public void configureAsyncSupport(AsyncSupportConfigurer configurer) { ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor(); te.setCorePoolSize(100); te.setQueueCapacity(50); te.setMaxPoolSize(200); te.setThreadNamePrefix("workThread"); te.initialize(); configurer.setTaskExecutor(te); } }; } public static void main(String[] args) { SpringApplication.run(AsyncApp.class, args); } }
Client(LoadTest) 코드를 사용해 테스트를 진행하면 결과는 아래와 같다.
Tomcat 의 스레드가 하나이지만 Async 방식으로 동작하기 때문에 해당 요청에 대한 실제 처리는 워커 스레드 풀에서 사용되고 있지 않은 스레드를 이용해 처리한다. 서버 로그를 확인하면 nio-8080-exec-1 라는 이름을 가진 한개의 Tomcat 스레드와 workThreadX 라는 이름을 가진 100개의 워커 스레드를 확인할 수 있습니다.
이번에도 JMC(Java Mission Control)를 이용해 실제 서버의 스레드 상황을 살펴보자.
nio-8080-exec-1 라는 이름을 가진 한개의 Tomcat 스레드와 workThreadX 라는 이름을 가진 100개의 워커 스레드를 확인할 수 있다.
DeferredResult
- DeferredResult 는 Spring 3.2 부터 사용 가능하다.
- 비동기 요청 처리를 위해 사용하는 Callable 의 대안을 제공한다.
- “지연된 결과” 를 의미하며 외부의 이벤트 혹은 클라이언트 요청에 의해서 지연되어 있는 HTTP 요청에 대한 응답을 나중에 써줄 수 있는 기술이다.
- 별도로 워커 스레드를 만들어 대기하지 않고도 처리가 가능합니다.
@SpringBootApplication @EnableAsync @Slf4j public class StudyApplication { @RestController public static class MyController { Queue<DeferredResult<String>> results = new ConcurrentLinkedQueue<>(); @GetMapping("/dr") public DeferredResult<String> dr() { log.info("dr"); DeferredResult<String> dr = new DeferredResult<>(); results.add(dr); return dr; } @GetMapping("/dr/count") public String drCount() { return String.valueOf(results.size()); } @GetMapping("/dr/event") public String drEvent(String msg) { for (DeferredResult<String> dr : results) { dr.setResult("Hello " + msg); results.remove(dr); } return "OK"; } } public static void main(String[] args) { SpringApplication.run(StudyApplication.class, args); } }
LoadTest 코드를 이용해 /dr 로 100개의 요청을 보내고, 크롬에서 /dr/count 로 DeferredResult가 담겨있는 큐의 사이즈를 확인해보자. 그리고 마지막으로 /dr/event 로 큐에 담긴 DeferredResult 객체에 setResult 로 결과를 반환한다.
100개의 요청이 동시에 완료되는 것을 확인할 수 있다.ResponseBodyEmitter
- ResponseBodyEmitter 는 Spring 4.2 부터 사용 가능하다.
- 비동기 요청 처리의 결과로 하나 이상의 응답을 위해 사용되는 리턴 값 Type 이며, DeferredResult 가 하나의 결과를 생성해 요청을 처리했다면, ResponseBodyEmitter는 여러개의 결과를 만들어 요청을 처리할 수 있습니다.
@SpringBootApplication @EnableAsync @Slf4j public class StudyApplication { @RestController public static class MyController { @GetMapping("/emitter") public ResponseBodyEmitter emitter() { ResponseBodyEmitter emitter = new ResponseBodyEmitter(); Executors.newSingleThreadExecutor().submit(() -> { try { for (int i = 0; i < 50; i++) { emitter.send("<p>Stream " + i + "</p>"); Thread.sleep(100); } } catch (Exception e) { } }); return emitter; } } public static void main(String[] args) { SpringApplication.run(StudyApplication.class, args); } }
참고
해당 포스팅은 토비님의 유튜브 강의와 Jongmin 님 블로그를 보고 직접 실습하며 작성한 포스팅입니다.
- 토비의 봄 TV 9회 스프링 리액티브 프로그래밍 (5) 비동기 RestTemplate과 비동기 MVC/Serlvet
- JongMin 님 블로그
'Spring > Webflux' 카테고리의 다른 글
토비의 봄 TV - CompletableFuture (7) (0) 2022.06.28 토비의 봄 TV - AsyncRestTemplate의 콜백 헬과 중복 작업 문제 (6) (0) 2022.06.24 토비의 봄 TV - 자바의 비동기 기술 (4) 1/2 (0) 2022.06.17 토비의 봄 TV - Reactive Streams Scheduler (3) (0) 2022.06.14 토비의 봄 TV - Reactive Streams Operators (2) (0) 2022.06.07