개발자로 후회없는 삶 살기
[최적화] Spring 환경 AI 서비스 실시간 스트림 파이프라인 구축 (with Redis Stream) 본문
[최적화] Spring 환경 AI 서비스 실시간 스트림 파이프라인 구축 (with Redis Stream)
몽이장쥰 2024. 11. 11. 22:56🚨 서론 (문제 상황)
대부분의 AI 모델은 Python, C++로 작성되고 동작하도록 설계되어 있다. 특히 대용량 데이터 연산 라이브러리를 편하게 사용할 수 있는 Python은 모델을 학습하고 서빙하기에 특화 되어있다.
@app.route("/img2img", methods=["POST"])
def imageToimage():
image_path, prompt = extract_img2img_request_message(request)
image_paths = generate_image_to_image_process(prompt, image_path)
return render_template(INDEX, lora_weights = lora_weights.keys(), images = image_paths, default_value = prompt)
예를 들어서, 파이썬 언어를 활용한 프레임워크인 Flask 환경에서는 파이썬 언어로 백엔드에서 모델을 로드하고 추론하고 클라이언트에게 응답하는 것이 하나의 서버 운용으로 가능하다.
-> Spring의 장점
1) 탄탄한 아키텍쳐와 데이터 보안
2) 어플리케이션 서버와 추론 서버의 분리 구축으로 웹 어플리케이션 영향 최소화
하지만, 대부분의 IT 기업이 사용하고 공공기관의 표준 전자정부 프레임워크인 SPRING의 사용이 극대화 되어 있는 상황에서 JAVA, SPRING 계열에서 운영 가능한 AI 서비스가 수반된다고 판단되며, 이는 어플리케이션의 성능 측면에서도 필수적이다.
그럼에도, 파이썬이 AI 운영에 특화되어 있다고 서두에 언급한 것처럼 과거 프로젝트에서 구축한 서빙 아키텍쳐는 위 그림처럼 단순하고, Redis List를 사용하여 요청 단방향이라는 불안정한 구조를 가지고 있다.
1) 요청에 대한 응답을 받지 못하여, 서버 간 연결이 불안정하다.
2) 메시지 큐에 메시지의 안전을 너무 많이 의존하고 있기에, Redis에 장애가 발생하면 치명적이다.
따라서, 위와 같은 문제가 발생할 수 있고, JAVA 계열에서 AI 서비스를 만들기 어려운 이유가 된다. 지금부터는 이러한 상황을 최적화하는 필자의 경험을 공유하고자 한다. 결론부터 얘기하자면, 어플리케이션과 추론 서버의 양방향 통신을 구축했다. (물론 이것이 네트워크에서 당연한 설계라고 생각한다.)
본론
✅ 최적화 방법 (Redis Stream 활용)
필자는 메시지 큐의 책임을 줄이고 어플리케이션과 추론 서버의 연결을 강화하기 위해서 Redis Stream을 사용했다. 이를 설계 및 구축한 방법을 알아보자.
Redis 클라이언트 : 스프링 어플리케이션, 추론 서버 2개
Redis 서버 : In Mem Redis
앞서, Redis List를 사용했을 땐, Redis를 바라보는 클라이언트가 앱 서버(사진 1)와 추론 서버(사진 2) 2개라서 각 서버는 Redis와 통신하고, 서버 간 통신은 배제된 단방향의 흐름을 가지고 있었지만,
Redis Stream을 사용하면 메세지 Publish-Consume 구조로 요청과 응답의 양방향 구조를 설계할 수 있었다.
-> Redis Stream의 구조
Redis 클라이언트 : 스프링 어플리케이션
Redis 서버 : In Mem Redis
Stream 구조를 사용하면, Redis 클라이언트는 스프링 서버가 담당하여, 메시지 발행과 추출이 어플리케이션에서 일어나고, 메시지 추출과 동시에 추론 서버와 클라이언트 간의 통신이 가능하다.
1) 메시지 Call Back
Stream은 스프링에서 메세지를 발행하면 메시지 call back 이벤트가 발생한다. call back이란, 메시지가 발행되면 자동으로 진행되는 다음 스텝을 의미하여 원하는 동작을 다음 스텝으로 지정할 수 있다. 본인의 프로젝트에 맞는 기능을 구현하면 되는데, 필자의 상황에선 메시지를 읽고 추론 서버를 호출하는 것이 다음 스텝이다.
2) 스프링 어플리케이션에서 메시지 단독 처리
Redis List에서 스프링은 메시지를 Publish만 하고 추론 서버에서 직접 레디스 클라이언트를 사용하여 Consume을 하여 서버 간 통신이 불가능하다. Stream은 스프링만으로 Publish와 Consume 코드를 구현할 수 있도록 이벤트 핸들러가 Bean 등록 되어있어서 사용자 요청은 스프링에서 전부 처리하고 추론 서버에서는 추론에만 집중할 수 있다.
if(statusCode.isOk()) {
final InferencesResponse inferenceResponse = SearchConverter.convertResponse(response);
resultService.saveResult(messageResponseDto.getId(), inferenceResponse);
} else {
throw new InferenceFailException();
}
스프링으로 Redis 클라이언트를 단독 처리하면 좋은 점은 추론 서버와 요청-응답 통신 구조를 만들 수 있다는 것이다. 추론 서버에서 추론 결과를 웹에 직접 응답하고 웹에서 결과를 받아서 성공 시 저장하고, 실패 시 예외를 발생시킬 수 있다.
3) Consumer 그룹 구조
Consumer 그룹 구조란, 하나의 레디스 서버에 여러 개의 쓰레드가 그룹을 이루고 메시지를 처리하는 구조이다. 쓰레드 개수를 10개로 지정하면 동시에 10개의 메시지를 읽을 수 있어서 다수의 동시 요청을 처리할 때 효과적이다.
또한, acknowledge 승인 기능으로 실패한 Call Back 처리에 대해서 다른 Redis 쓰레드에게 재 배포할 수 있다. 추출한 메시지로 동작 후 승인 메서드를 호출하게 되는데, 추출한 메시지를 바로 삭제하는 것이 아닌 보관을 하고 있다가, 일정 시간 동안 승인이 되지 않으면 다른 쓰레드에게 재배포한다. 기존 List에서 추론 서버의 일시적 오류로 인해 요청이 실패한다면 사용자가 재요청해야하는 문제가 발생하지만, Stream은 서버에서 직접 재배포하여 사용자의 개입이 필요하지 않다.
- Redis Stream을 활용한 실시간 스트림 파이프라인 구현
앞에서 필자가 AI 서비스를 개발할 때 List 대신 Stream을 선택한 이유와 효과를 설명했다. 이제부터는 Redis Stream을 설정하는 방법과 파이프라인 구현 과정을 다뤄보자.
1) Redis Stream 설정
2) 메시지 Publish 구현
3) 메시지 Consumer 구현
4) Call Back 후처리 구현
다음과 같은 단계로 이루어지며, 스프링에서 단독으로 Redis에 접근하기 위한 설정과, 메시지 주입과 추출 코드가 필요하고, Call Back 이후 추론 서버와 통신하는 프로세스가 필요하다.
1. Redis Stream 설정|
@Bean
public RedisTemplate<String, Object> streamRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
final RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new StringRedisSerializer());
return redisTemplate;
}
Stream은 Key-Value 구조에 Value 내부에도 Hash 구조를 가진 Record 타입이다. Record는 모두 문자열 타입을 가지며 Record 데이터를 입력하기 위한 Redis Template를 설정해준다.
https://hsb422.tistory.com/entry/jackson-json
이번 포스팅은 파이프라인 설계만 다루며, Record의 개념은 위 블로그를 참고하자.
✅ Redis 직렬화 선택 기준
1) JdkSerializationRedisSerializer
자바 내장 기능으로 별도의 설정을 하지 않으면 자동으로 사용된다. 자바 내장 직렬화를 사용하여 직렬화된 데이터가 상대적으로 크고, 역/직렬화 과정이 비교적 느리다.
2) GenericJackson2JsonRedisSerializer
객체를 JSON 문자열로 직렬화하는 방식으로 데이터 포멧이 가볍고 사람이 읽을 수 있지만, class 메타 정보 등을 포함하여 데이터 크기가 크다.
3) StringRedisSerializer
데이터를 문자열 형태로 저장하고 간단하고 빠르며, 기본적인 키-값 방식으로 저장하기 적합하다. 직렬화 과정 없이 단순히 데이터를 문자열로 변환하여 Redis에 저장한다.
필자가 StringRedisSerializer을 선택한 결정적 이유는 아래와 같다.
기존 List에 데이터를 넣을 땐 Stream이 Hash 구조인 것을 모르고, 단순히 ObjectMapper를 사용하여 json 문자열로 변환하였는데
이 과정에서 데이터를 직렬화/역직렬화 하기 위한 read, write 작업이 필요했다. 이는 개발자 입장에서도 번거롭지만, Stream 입장에서는 이미 직렬화된 데이터를 키-값 형태로, 위 사진의 payload(키) - json(값) 데이터로 가지게 된다.
이는 HASH 구조에 바람직한 형태가 아니고 Redis 공식문서의 HASH 구조에 적합한 사용 방법이 아니다.
StringRedisSerializer을 RedisTemplate에 설정하면 공식문서에 정의된 방식처럼 객체의 필드를 key로 값을 value로 가진 Hash 구조를 보인다.
public void sendModelMessage(final MessageResponseDto message) {
final ObjectRecord<String, MessageResponseDto> record = StreamRecords.newRecord()
.in(CLOTHES.toString())
.ofObject(message)
.withId(RecordId.autoGenerate());
streamRedisTemplate.opsForStream()
.add(record);
}
StringRedisSerializer를 사용한 상태에서 객체를 Stream에 저장하면 개발자의 write 동작 없이 Stream에 의해 Record 형태로 자동 변환되고, ObjectHashMapper에 의해 자동 역/직렬화된다. ObjectMapper를 사용하면 별도의 리소스가 발생했을 텐데, Stream에 최적화된 직렬화 방식으로 데이터를 자동으로 저장하고 읽을 수 있다.
2. 메시지 Publish 구현
public void sendModelMessage(final MessageResponseDto message) {
final ObjectRecord<String, MessageResponseDto> record = StreamRecords.newRecord()
.in(CLOTHES.toString())
.ofObject(message)
.withId(RecordId.autoGenerate());
streamRedisTemplate.opsForStream()
.add(record);
}
Stream에 데이터를 입력하는 Publisher는 다음과 같다. Stream은 Record 형태로 데이터를 저장하기 때문에 ObjectRecord의 제네릭 타입에 원하는 객체 타입을 명시한다. 키 중복을 막기 위해서 RecordId는 자동 생성 방식을 권장한다.
저장된 데이터를 보면, 1) 번에 해당하는 것이 RecordId이고 2) 번에 해당하는 것이 그에 대한 value이다.
3. 메시지 Consumer 구현
final StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, MessageResponseDto>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.batchSize(1)
.keySerializer(new StringRedisSerializer())
.hashValueSerializer(new StringRedisSerializer())
.hashValueSerializer(new StringRedisSerializer())
.objectMapper(hashMapper)
.targetType(MessageResponseDto.class)
.build();
Redis ListenerContainer를 사용하기 위해 Options 설정하고 메시지를 읽을 Consumer를 구현한다.
batchSize : 한 번에 하나의 메시지 Consume
objectMapper : hashMapper로 역직렬화
batchSize로 요청 데이터를 관리할 수 있다. 요청이 많으면 한 번에 여러 개의 데이터를 읽고 추론 서버를 증설하여 대응할 수 있다.
🚨 어떻게 다수의 요청을 처리할 수 있을까?
1) 하드웨어 적인 방법
1대의 Redis 서버에 다수의 Redis Stream을 운영하고 각 Stream마다 호출할 추론 서버를 증설 및 연결하여, 추론 요청과 응답이 분산되도록 하기
2) Consumer 그룹 방법
1대의 Redis 서버에 1개의 Redis Stream에 다수의 스레드 그룹을 운영하고 동시에 추론 서버에게 요청할 수 있도록 추론 서버를 증설 및 로드밸런싱하기
4. Call Back 후처리 구현
@RequiredArgsConstructor
public class ClothesJobConsumerListener implements StreamListener<String, ObjectRecord<String, MessageResponseDto>> {
@Override
public void onMessage(final ObjectRecord<String, MessageResponseDto> message) {
// Call Back 기능 명시
}
메시지 발행 시 리스너에서 Redis Call Back 기능으로 메시지를 자동으로 읽고 ObjectRecord 형태로 추출해준다. 개발자는 onMessage 메서드에 원하는 기능을 구현만 하면 된다.
@Override
public void onMessage(final ObjectRecord<String, MessageResponseDto> message) {
final CloseableHttpResponse response = inferenceProcess(modelPath, messageResponseDto);
final InferencesResponse inferenceResponse = SearchConverter.convertResponse(response);
// 결과 저장
resultService.saveResult(messageResponseDto.getId(), inferenceResponse);
}
필자는 HttpClient를 사용하여 추론 서버에게 메세지를 요청하고 추론 응답을 받아 결과를 저장하는 프로세스를 구현했다. 저장된 결과는 클라이언트에게 전달되어 화면에 전시된다.
inferenceProcess 메서드가 추론 서버와 응답을 주고 받는 부분이다. 이를 통해 웹 어플리케이션과 추론 서버 간의 요청-응답 구조를 설계 할 수 있었다.
결론
1) 서버간 요청-응답 구조로 안정된 네트워크를 관리하고, 추론 과정에서 발생한 예외처리를 웹 어플리케이션에서 사용자에게 빠르게 응답
2) Consumer 그룹 방식으로 메시지 누락을 예방하고 사용자 요청 유실 문제 해결
Spring 환경에서 AI 서비스를 개발할 때, Stream을 도입하여 얻은 장점은 크게 위과 같다. 실제 고객이 사용하는 서비스이므로, AI의 기능을 고객에게 신뢰성 있게 제공하는 것이 정말 중요한데, Stream으로 사용자에게 추론 상태를 직관적으로 제공하고 예외 처리할 수 있었다. 또한 Consumer 설정으로 다수의 동시 요청도 효과적으로 처리할 수 있을 거라고 기대한다.
'[백엔드] > [spring+JPA | 이슈해결]' 카테고리의 다른 글
[최적화] Redis Stream 내부 ObjectHashMapper를 이용하여 HASH 역직렬화 자동화하기 (2) | 2024.11.24 |
---|---|
[최적화] Redis Stream에 적절한 RedisSerializer를 사용하자 (2) | 2024.11.23 |
자바 PART.무한의 값 처리 BigDecimal (0) | 2023.11.27 |
spring PART.postman으로 login 테스트 할 때 받아온 토큰을 요청 헤더에 자동으로 넣는 방법 (0) | 2023.08.18 |
spring PART.Value Object와 Custom Validator를 이용한 검증 개선 (0) | 2023.07.21 |