이번 포스팅에서는 여러 번 반복되는 HTTP 요청을 비동기로 처리해 응답 속도를 단축시키는 과정을 담았습니다.
문제
이전 포스팅 마지막에서 다뤘던 문제는 반복문 안에서 HTTP 요청을 보내는 것이었습니다.
기존 코드
public void preCreateDailyReport(UUID userId, LocalDate startDate, LocalDate endDate) {
// 편지 서비스에게 `분석 가능한 편지들` 찾기 위임
List<DailyLetters> analyzableLetters = letterService.findAnalyzableLettersInRange(userId, startDate, endDate);
for (DailyLetters dailyLetters : analyzableLetters) {
// 외부 API 호출
CreateResponse createResponse = clovaService.sendWithPromptTemplate(promptTemplate, dailyLetters.getMessages());
// 응답 결과로부터 하루치 분석 추출
DailyAnalysisResult analysisResult = DailyAnalysisExtractor.extract(createResponse);
// 하루치 분석으로부터 편지 분석, 데일리 리포트 엔티티 저장
letterAnalysisService.saveAnalysisAndDailyReport(dailyLetters, analysisResult);
}
}
결과
해결: HTTP 요청 비동기 처리하기
이 문제를 해결하기 위해 HTTP 요청을 비동기로 처리해야 합니다.
현재 프로젝트에서 HTTP 요청을 보낼 때 FeignClient를 사용하고 있으며, 기본적으로 동기 방식으로 작동합니다.
따라서 비동기 방식으로 요청하기 위해 Java에서 지원하는 CompletableFuture와 스프링에서 지원하는 @Async 애너테이션을 이용했습니다.
@Async 애너테이션을 사용하기 앞서 비동기 작업 처리를 위해 알아야 할 것들을 정리했습니다.
TaskExecutor
Spring Boot에서 @Async 애너테이션을 사용하기 위해서는 먼저 @EnableAsync를 적용해야 합니다.
@EnableAsync javadoc 문서를 보면 Spring은 기본적으로 TaskExcutor 빈 또는 "taskExecutor" 이름의 빈을 찾는데, 만약 둘 다 찾지 못하면 `SimpleAsyncTaskExecutor` 구현체가 사용된다는 것을 알 수 있습니다.
하지만 유의해야 할 점이 있는데요,
바닐라 스프링을 사용하면 별도로 제공되는 TaskExcutor Bean이 없기 때문에 `SimpleAsyncTaskExecutor`가 사용되지만,
스프링 부트는 별도로 `ThreadPoolTaskExecutor`를 Bean으로 제공하기 때문에 해당 빈을 사용합니다.
등록 과정은 `@SpringBootApplication` → `@EnableAutoConfiguration`에 의해 `TaskExecutionAutoConfiguration`가 등록되고, 내부 설정 값으로 `TaskExecutionProperties`를 이용합니다.
위의 TaskExecutionProperties의 설정을 보면 스레드 개수, 큐의 크기 등 기본 값이 설정되어 있는 것을 확인할 수 있는데요,
이 설정 값들이 서비스에 최적화된 값은 아닙니다. 따라서 서비스 성격에 맞는 값으로 설정해야 할 필요가 있습니다.
TreadPoolTaskExecutor 주요 설정 옵션
옵션 | 설명 | 기본값 |
corePoolSize | 기본적으로 유지할 스레드 개수 | 1 |
maxPoolSize | 최대 생성 가능한 스레드 개수 | Integer.MAX_VALUE |
queueCapacity | 태스크 큐 크기 (스레드가 부족할 때 작업을 담아둘 버퍼) | Integer.MAX_VALUE |
keepAliveSeconds | 최대 풀 사이즈 초과 시, 유휴 스레드가 종료되기까지 유지되는 시간 | 60초 |
threadNamePrefix | 스레드 이름 접두사 (디버깅용) | class 이름 + "-" |
allowCoreThreadTimeOut | `true`면 `corePoolSize` 이하의 스레드도 유휴 상태가 되면 종료됨 | false |
waitForTasksToCompleteShutdown | `true`이면 컨테이너 종료 시 대기 중인 태스크가 완료될 때까지 대기 | false |
awaitTerminationSeconds | `waitForTasksToCompleteOnShutdown=true`일 때, 최대 대기 시간 | 0 |
rejectedExecutionHandler | 스레드 풀이 꽉 차서 새로운 작업을 받을 수 없을 때 실행할 정책을 결정 | AbortPolicy |
daemon | 스레드가 데몬 스레드인지 여부를 결정 | false |
적절한 설정을 위해 고려할 사항들
이상적인 스레드 풀의 적정 크기에 대하여, 스레드 풀 크기 공식, 리틀의 법칙 글에서 워크로드에 따른 적절한 설정 공식을 참고해 아래와 같이 고려할 사항들을 정리했습니다.
- 작업의 성격이 `CPU 바운드`인지, `I/O 바운드`인지
- 클로바 API의 이용량 제어 정책
- 사용자당 필요한 동시 요청 수
- 피크 시 예상되는 동시 요청 수
- 호출 한도 시 추가 조치
작업의 성격
비동기로 처리할 작업의 주 목적은 HTTP 요청입니다.
HTTP 요청같은 I/O 바운드 작업은 네트워크 응답을 기다리는 시간이 대부분이므로 CPU 코어 수보다 외부 API 요청 패턴에 맞게 설정해야 합니다.
평균적으로 응답까지 걸리는 시간은 7초입니다.
클로바 API 이용량 제어 정책
거의 모든 외부 API 호출에는 이용량 제어 정책이 있습니다.
특히 외부 API에 많이 의존할수록 이용량 제어 정책에 맞춰 워크로드를 설계해야 하는데요,
이용량 제어 정책은 테스트 앱과 서비스 앱별로 차이가 있습니다.
사용 중인 모델은 최상단의 HCX-003입니다.
- QPM(Querys Per Minute, 분당 작업 요청 수): 200 (테스트 앱) | 900 (서비스 앱)
- TPM(Tokens Per Minute, 분당 처리할 토큰 수): 30,000 (테스트 앱) | 180,000 (서비스 앱)
- 처리할 토큰 수 = 입력 토큰 수 + 결괏값 생성 시 사용할 최대 토큰 수
우리 서비스는 현재 `테스트 앱`으로 사용 중이며, `서비스 앱` 신청 시 더 많은 이용량을 사용할 수 있습니다.
현재 `서비스 앱` 신청을 고려하고 있으므로 `서비스 앱` 정책을 기준으로 삼았습니다.
사용자당 필요한 동시 요청 수
HTTP 요청을 비동기로 처리하려는 목적은 사용자당 `7일치` 편지 분석을 요청하기 위함입니다.
이때 `7일치`를 `하루치`로 나눠 "7번" 요청합니다.
피크 시 예상되는 동시 요청 수
활성 사용자 수가 많이 없는 관계로 피크 시 예상 활성 사용수는 10명 정도로 생각됩니다.
따라서 최대 동시 요청 수는 "70번(=사용자당 7번 요청 x 10명)"으로 예상했습니다.
호출 한도 시 추가 조치
외부 API 호출 한도에 걸리지 않도록 처리율 제한을 둬야하지만, 현재 서비스의 처리율 제한은 초기 "답장 서비스" 워크로드만 고려해 운영상 정책에 불과합니다.
따라서 호출 한도에 걸렸을 때 추가 조치를 설정해야 하는데요, 아래 옵션들을 고려했습니다.
- 옵션 1: 한도를 넘은 요청은 거절 처리한다.
- 옵션 2: 한도를 넘은 요청은 별도의 큐에 넣고 지연 처리하거나, 별도로 저장 후 배치 프로그램을 통해 순차처리한다.
옵션 1은 사용자에게 왜 거절이 되었는지, 언제 다시 요청할 수 있는지 등 별도의 응답을 해야합니다.
옵션 2는 사용자에게 지연 처리된다는 응답(202 Accepted 등)과 함께 이용량 제어 정책 내에서 처리할 수 있도록 구현해야 합니다.
스레드 풀 설정을 위한 의사 결정 과정
시나리오
- 응답까지 걸리는 평균 시간: 7초
- 최소 동시 요청 수: 7회
- 피크 시 예상 동시 요청 수: 70회 (10명 기준)
→ QPM 900회 아래이므로 TPM을 우선 고려 - 전체 토큰 한도에 따른 최대 호출 건수: 128건/분 (=180,000 토큰/분 ÷ 1,400 토큰(건)/사용자)
- TPM: 180,000 토큰
- 사용자당 `7일치 편지들` 분석 시 평균 1,400 토큰 사용
위의 시나리오를 고려해 스레드 풀 설정을 아래와 같이 정했습니다.
- `CorePoolSize`: 70 (피크 시 최대 동시 요청 수)
- `MaxPoolSize`: 100 (버스트 상황)
- `QueueCapacity`: 100 (대기 큐 용량)
- 실제 호출은 분당 128회가 넘지 않도록 별도로 처리해야 합니다.
- `WaitForTasksToCompleteShutdown`: true (스프링 IOC 컨테이너 종료 시 대기 중인 작업까지 기다림)
- 스프링 부트 설정 시 graceful shutdown을 원해 서버 설정에도 적용되어 있습니다. (server.shutdown=`graceful`)
- `AwaitTerminationSeconds`: 30 (`waitForTasksToCompleteShutdown` 설정 시 무한 대기 방지)
- `RejectedExecutionHandler`: AbortPolicy
- 호출 한도를 초과하는 요청을 처리하는 구현은 나중에 구현할 계획입니다.
- 따라서 기본 정책을 사용합니다. (`AbortPolicy`)
- `CallerRunsPolicy`는 호출하는 메인 스레드에서 실행합니다. 이 경우 응답까지 최대 1분 넘는 시간이 소요될 수 있기때문에 다른 서비스와 경합이 발생할 수 있습니다. 따라서 사용하지 않습니다.
- `DiscardPolicy`는 거부된 작업을 그냥 버리는 정책입니다. 사용자에게 응답할 수 없으므로 사용하지 않습니다.
- `DiscardOldestPolicy`는 대기 큐에서 가장 오래된 작업을 제거하고 새로운 작업을 추가합니다. 기존 사용자의 요청과 경합이 발생하므로 사용하지 않습니다.
최종 설정
@EnableAsync
@Configuration
public class AsyncConfig {
@Bean
public Executor httpRequestExecutor() {
// 이 예시는 최대 10명의 사용자가 동시에 각각 7건씩 요청할 수 있는 상황 (총 70건 동시 요청)
// 외부 API의 한도와 평균 응답 시간(7초)을 고려
// FIXME: 실제 호출은 레이트 리미터로 조절하여 분당 128건을 넘지 않도록 해야 함.
int corePoolSize = 70; // 최대 동시 요청 수 예상 (10명 x 7)
int maxPoolSize = 100; // 버스트 상황을 고려하여 확장
int queueCapacity = 100; // 대기 큐 용량
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
executor.setThreadNamePrefix("HttpAsyncExecutor-");
executor.setRejectedExecutionHandler(new AbortPolicy());
executor.initialize();
return executor;
}
}
- 현재 설정 값이 최적은 아닙니다. 스트레스 테스트 등을 통해 적절한 값을 찾는 과정이 필요합니다.
- 실제 호출은 분당 128회가 넘지 않도록 처리율 제한기를 구현해야 합니다.
궁극적으로는 정확한 한도까지 이용해야 하기 때문에 TPM을 넘지 않는 것을 목표로 해야합니다.
@Async 사용 시 주의 사항
1. 트랜잭션 관리
`@Async`가 적용된 메서드는 스프링에서 Bean으로 등록한 `ThreadPoolTaskExecutor`에 의해 별도의 스레드에서 실행됩니다.
따라서 메인 스레드의 트랜잭션 컨텍스트가 자동으로 전파되지 않습니다. 따라서 비동기 메서드 내에서 데이터베이스 작업을 수행한다면 해당 메서드에 별도로 `@Transactional`을 적용해 트랜잭션 전파를 설정해야 합니다.
(현재는 HTTP 요청과 트랜잭션 커밋을 분리했기 때문에 이 문제에 대해 고려하지 않아도 됐습니다.)
2. Self-Invocation(자기 호출)의 제한
`@Transactional`과 마찬가지로 스프링 AOP를 이용하기 때문에 동일한 Bean 내에서 호출(self-invocation) 시 비동기로 동작하지 않고 동기적으로 실행됩니다. 따라서 별도의 Bean으로 분리하거나 스프링 IOC 컨테이너로부터 자기 자신을 주입받아 프록시 객체를 통해 호출해야 합니다.
3. 예외 처리
`@Async`는 기본적으로 예외가 메인 스레드로 전파되지 않습니다. 심지어 반환 값이 `void`라면 예외가 로깅되지도 않는 문제가 있습니다. 이 경우(`void` 타입 반환)에는 `AsyncUncaughtExceptionHandler` 인터페이스를 구현해 직접 처리해야 합니다.
(정리하면 `void` 반환 타입은 메인 스레드에서 `.exceptionally()`나 `.whenComplete()` 등의 콜백을 통해 핸들링할 수 없습니다. 그래서 CompletableFuture<Void>로 감싸는 것이 낫다고 생각합니다.)
하지만, `CompletableFuture<T>` (CompletableFuture<Void> 포함) 타입을 반환한다면 메인 스레드에서 `.exceptionally()` 같은 콜백으로 예외를 핸들링 할 수 있습니다.
만약 콜백으로 예외를 핸들링하지 않는다면, `.get()`을 통해 예외를 핸들링할 수 있습니다. (`ExecutionException`, `.join()`의 경우 `CompletionException`로 래핑됨)
@Async 적용
@Service
@RequiredArgsConstructor
public class ClovaService {
private final ClovaFeignClient client;
@Async("httpRequestExecutor")
public CompletableFuture<CreateResponse> sendAsyncWithPromptTemplate(PromptTemplate promptTemplate,
String userMessage) {
return CompletableFuture.completedFuture(client.sendToClova(CreateRequest.of(promptTemplate, userMessage)))
.exceptionally(t -> {
log.error("외부 API 호출 중 예외 발생", t);
throw new CompletionException(t);
})
.orTimeout(20, TimeUnit.SECONDS);
}
}
- HTTP 요청 전송을 책임지는 `클로바 서비스`에 새로운 인터페이스를 추가합니다.
- @Async에 사용할 때 TaskExecutor Bean으로 등록한 이름(`httpRequestExecutor`)을 지정합니다.
- `exceptionally()`를 적용해 API 호출 시 예외 발생에 대해 로그를 출력하고 해당 작업을 종료합니다.
- 만약 `resilience4j`와 함께 동작하고, FallbackFactory가 설정되어 있다면 서킷 브레이커가 트리거(open) 됐을 때 `exceptionally()`로 핸들링 할 수 없게되니 주의가 필요합니다.
- 또한, `RestApiControllerAdvice`를 사용한다면 `CompletionException`를 언래핑할 필요가 있습니다.
비동기 처리 중 발생한 원래 예외를 올바르게 처리하기 위함입니다.
- `orTimeout()`을 설정해 작업 당 20초가 넘으면 `TimeoutException`을 발생시킵니다.
CompletableFuture 적용
@Async 애너테이션이 적용된 메서드는 `CompleatableFuture`로 감싼 응답을 반환합니다.
따라서 비동기 HTTP 요청을 책임지는 `편지 분석 서비스`에 새로운 인터페이스를 추가합니다.
비동기 요청 내부 구현 (편지 분석 서비스)
public List<DailyAnalysisResult> createAsyncDailyAnalyses(List<DailyLetters> dailyLetters) {
List<CompletableFuture<DailyAnalysisResult>> futures = dailyLetters.stream()
.map(each -> clovaService.sendAsyncWithPromptTemplate(promptTemplate, each.getMessages())
.thenApply(DailyAnalysisExtractor::extract))
.toList();
CompletableFuture<List<DailyAnalysisResult>> combinedFuture =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream().map(CompletableFuture::join).toList());
return combinedFuture.join();
}
- `하루치 편지들(DailyLetters)`을 7일치로 모은 `List<DailyLetters>`를 파라미터로 받습니다.
- Stream API를 이용해 HTTP 비동기 요청을 보냅니다.
- 콜백으로 `thenApply()`를 적용해 응답으로부터 분석 정보를 추출합니다.
- CompletableFuture.allOf().thenApply(): 위에서 만든 비동기 작업들을 실행하고 기다립니다.
이때, `thenApply()`를 통해 응답을 받을 때마다 추가적인 블로킹 없이 모든 결과를 기다립니다.
엔티티 저장 (편지 분석 서비스)
// 메서드 추가
@Transactional
public void saveAllAnalysesAndDailyReports(List<DailyLetters> dailyLetters, List<DailyAnalysisResult> results) {
for (int i = 0; i < results.size(); i++) {
saveAnalysisAndDailyReport(dailyLetters.get(i), results.get(i)); // 기존 저장 코드
}
}
`편지 분석(LetterAnalysis)`와 `데일리 리포트(DailyReport)` 엔티티는 `ManyToOne`입니다.
따라서 전체 결과의 원자적인 영속화를 위해 기존 `saveAnalysisAndDailyReport()` 메서드를 순차 실행합니다.
(`List<LetterAnalysis>`와 `DailyReport`간의 연관 관계 매핑 후 `.saveAll(List<LetterAnalysis>)`를 수행)
이제 `데일리 리포트 서비스` 반복문 안에서 비동기 요청을 보내는 `편지 분석 서비스` 코드를 변경합니다.
AS-IS (데일리 리포트 서비스)
public void preCreateDailyReport(UUID userId, LocalDate startDate, LocalDate endDate) {
// 편지 서비스에게 `분석 가능한 편지들` 찾기 위임
List<DailyLetters> analyzableLetters = letterService.findAnalyzableLettersInRange(userId, startDate, endDate);
for (DailyLetters dailyLetters : analyzableLetters) {
// 외부 API 호출
CreateResponse createResponse = clovaService.sendWithPromptTemplate(promptTemplate, dailyLetters.getMessages());
// 응답 결과로부터 하루치 분석 추출
DailyAnalysisResult analysisResult = DailyAnalysisExtractor.extract(createResponse);
// 하루치 분석으로부터 편지 분석, 데일리 리포트 엔티티 저장
letterAnalysisService.saveAnalysisAndDailyReport(dailyLetters, analysisResult);
}
}
TO-BE (데일리 리포트 서비스)
public void preCreateDailyReport(UUID userId, LocalDate startDate, LocalDate endDate) {
// 편지 서비스에게 `분석 가능한 편지들` 찾기 위임
List<DailyLetters> analyzableLetters = letterService.findAnalyzableLettersInRange(userId, startDate, endDate);
// 편지 분석 서비스에게 `편지 분석`, `데일리 리포트` 생성 요청
List<DailyAnalysisResult> results = letterAnalysisService.createAsyncDailyAnalyses(analyzableLetters);
// 편지 분석 서비스에게 트랜잭션 내에서 저장 요청
letterAnalysisService.saveAllAnalysesAndDailyReports(analyzableLetters, results);
}
비동기 요청 적용 결과 평가
- 비동기 요청이 성공적으로 수행되었습니다. 비동기 요청마다 응답 받는 시간이 다름을 확인할 수 있습니다.
- `7일치 편지 분석`에 소요된 전체 시간은 10.5초 정도입니다.
- 개별 요청마다 응답 시간이 다르지만, 가장 오래 기다린 응답 시간이 전체 요청 시간에 가장 큰 영향을 끼칩니다.
- 그 밖에도 컨텍스트 스위칭 등 오버헤드에 의해 전체 실행 시간이 늦어질 수 있습니다.
위클리 리포트까지 생성하는데 걸리는 시간은 약 18.5초가 걸렸습니다. (데일리 리포트 생성: 10.5초 + 위클리 리포트 생성: 8초)
비동기 작업으로 처리하기 전에는 1분 5.5초정도 소요됐었으니, 약 47초 단축했습니다.
남은 개선 사항 - 처리율 제한기
- 실제 외부 API 호출은 TPM을 넘지 않도록 별도로 처리율 제한기를 구현해야 합니다.
- 처리율 제한기는 RabbitMQ 같은 메시지 큐를 이용해 구현할 계획입니다.
마치며
- 외부 API 호출에 의존적인 서비스의 경우, API 이용량 정책과 예산에 따라 성능이 제한될 수 있습니다. 사용자당 토큰 소모량과 사용 패턴 분석을 통해 워크로드를 면밀하게 파악하는 것이 중요합니다.
- @Async 애너테이션을 사용하는 것만으론 문제가 해결되지 않습니다. 워크로드에 적합한 스레드 풀 설정을 찾아야 하는데, 이는 CS 지식뿐만 아니라 운영 예산, 사용자 패턴 등 복잡한 요소들을 고려해야 하므로 최적값을 찾기가 쉽지 않았습니다.
- join(), 콜백(thenApply 등) 활용법을 충분히 익히고 사용해야 합니다. 스레드 블로킹을 최소화하기 위해 join()을 적절히 사용하는 것이 중요한데, 특히 비동기 작업 대기 과정(`CompletableFuture.allOf()`)에서 `.thenApply()`같은 콜백을 활용하여 불필요한 블로킹을 줄여야 합니다.
- 시간 제약으로 인해 스트레스 테스트를 수행하지 못한 점이 아쉽습니다. 실제 서비스 환경에서의 성능을 검증하기 위해서는 스트레스 테스트가 필수적인 것을 설정 값을 찾는 과정에서 느낄 수 있었습니다.
참고
- https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/scheduling/annotation/EnableAsync.html
- https://xxeol.tistory.com/44
- https://dkswhdgur246.tistory.com/82#enableasync-%EC%84%A4%EB%AA%85
- https://code-lab1.tistory.com/269
- https://wonit.tistory.com/669
- https://devoong2.tistory.com/entry/Spring-Async-%EC%82%AC%EC%9A%A9-%EB%B0%A9%EB%B2%95-%EB%B0%8F-TaskExecutor-ThreadPool
- https://dkswnkk.tistory.com/706
'Project' 카테고리의 다른 글
프로젝트 리팩토링 (4) - 책임 재할당과 트랜잭션에서 외부 API 분리하기 (0) | 2025.02.07 |
---|---|
프로젝트 리팩토링 (3) - 책임 재할당과 실행 계획 분석으로 검증하기 (1) | 2025.02.04 |
프로젝트 리팩토링 (2) - 도메인 모델 리팩토링 (0) | 2025.02.02 |
프로젝트 리팩토링 (1) - 객체 지향 설계 (0) | 2025.01.31 |
모놀리식 아키텍처에서 이벤트 기반으로 비즈니스 로직의 원자성 확보하기 (2) (1) | 2025.01.17 |