LLM 모델이 응답을 생성하며 실시간으로 stream으로 보내주면서 스트리밍 요청을 다룰 일이 예전보다 많아진 것 같습니다.
오늘은 Spring WebFlux에서 transfer-encoding: chunked 스트리밍 데이터를 request로 받을 수 있게 하기 위해 시도했던 방법들과 난관에 봉착했던 순간들, 결국 어떻게 성공했는지에 대한 이야기를 적어보려 합니다.
transfer-encoding: chunked
transfer-encoding: chunked란?
우선 transfer-encoding: chunked가 뭔지 간단히 설명하고 넘어가겠습니다.
http header의 많은 종류 중에 transfer-encoding이라는 헤더가 있습니다.
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding
메시지가 전송될 때 해당 메시지가 어떤 방식으로 encoding 되어 이동되고 있는지를 알려주는 헤더입니다.
따라서, transfer-encoding이 chunked라는 것은, 데이터가 청크 단위로 잘려서 순차적으로 클라이언트에게 전송된다는 의미입니다.
Chunked encoding is useful when larger amounts of data are sent to the client and the total size of the response may not be known until the request has been fully processed.
위 문장은 Transfer-Encoding 문서에 있는 chunked에 대한 설명인데요,
이처럼 chunked는 전송이 완전히 끝날 때까지 클라이언트는 해당 응답의 전체 사이즈가 얼마일지 모른다는 특징이 있습니다.
chatGPT를 사용할 때 어떤 질문을 하면, 전체 응답이 생성된 후에 한 번에 UI에 업데이트 되는 게 아니라, 생성이 되는 대로 UI에 한 단어, 한 단어 업데이트 되는 모습을 보신 적이 있을 겁니다.
예를 들어, 아래와 같은 청크들이 전송된다고 할 때,
안녕
하세요
만나서
반갑습니다
클라이언트는 마지막 청크인 '반갑습니다'를 받을 때까지 스트리밍 데이터 전체가 얼마나 큰지에 대해 전혀 신경을 쓰지 않아도 됩니다.
그래서 주로 용량이 큰 파일을 chunk 단위로 잘라서 전송해야 할 때 사용하기도 합니다.
그렇다면 클라이언트는 어떻게 전송이 끝났음을 알고, 어떻게 현재 받은 청크가 깨지지 않은 유효한 청크라는 것을 판단할 수 있을까요?
chunk의 형식
chunked는 length, content, CRLF 이렇게 크게 세 가지 요소로 이루어집니다.
한 chunk를 전송할 때, length CRLF content CRLF의 형식으로 전송하게 되는데요,
위에서 들었던 예시를 그대로 다시 한 번 가져와보겠습니다.
예를 들어 '안녕'이라는 chunk 메시지를 전송해야 한다고 하면,
2\r\n
안녕\r\n
이런 식으로 전송을 하게 되는 거지요.
그렇게 '반갑습니다'까지 모든 chunk를 전송하고 나면, CRLF를 한 번 더 보내줘 스트리밍이 끝났음을 알립니다.
2\r\n
안녕\r\n
3\r\n
하세요\r\n
3\r\n
만나서\r\n
5\r\n
반갑습니다\r\n
\r\n
최종적으로 이렇게 전송이 될 것입니다.
결론적으로, 클라이언트의 입장에서는 CRLF를 연속으로 두 번 받으면 전송이 모두 끝났구나 하고 판단할 수 있습니다.
WebFlux 구현
그럼 이제 본격적으로 WebFlux에서 이 chunked 데이터를 API의 request로 받을 수 있는 방법을 알아보겠습니다.
chunked request 받기
가장 먼저 chunked request를 받아봐야겠죠.
평소처럼 ChunkController를 하나 만들고, Post API 메서드를 하나 생성합니다.
Request Body로 chunked를 받아보려 하는데, chunked는 스프링의 @RequestBody 어노테이션으로는 받을 수가 없습니다.
ServerHttpRequest로 받아서 body로 담겨 있는 DataBuffer을 하나씩 꺼내서 살펴보겠습니다.
@RestController
@RequestMapping("/chunk")
class ChunkController {
@PostMapping
fun postChunked(request: ServerHttpRequest) {
val dataBuffers: Flux<DataBuffer> = request.body
}
}
그런데 이제, chunk로 들어오는 body를 계속해서 받아봐야겠죠.
DataBuffer을 순차적으로 계속해서 받아보겠습니다.
.doOnNext를 활용해서 계속해서 다음 chunk의 dataBuffer을 열어볼 수 있습니다.
@RestController
@RequestMapping("/chunk")
class ChunkController {
@PostMapping
fun postChunked(request: ServerHttpRequest): Mono<Void> {
val dataBuffers: Flux<DataBuffer> = request.body
return dataBuffers
.doOnNext { dataBuffer ->
val byteArray = ByteArray(dataBuffer.readableByteCount())
dataBuffer.read(byteArray)
print(String(byteArray))
DataBufferUtils.release(dataBuffer)
}
.then()
}
}
client가 만약 chunk를 abc, de, fghi, jk 순으로 보낸다면 순차적으로 chunk를 받아볼 수 있습니다.
+) 코틀린 코루틴에 더 익숙한 분이 계시다면, 이렇게도 코드를 써볼 수 있습니다. 위와 동일하게 동작하는 코드입니다.
@PostMapping
suspend fun postChunked(request: ServerHttpRequest) {
request.body.asFlow().collect {dataBuffer ->
val byteArray = ByteArray(dataBuffer.readableByteCount())
dataBuffer.read(byteArray)
print(String(byteArray))
DataBufferUtils.release(dataBuffer)
}
}
만약 chunk가 분리된다면?
위에 구현된 걸 보면 아무 문제가 없어 보입니다.
문제 상황
하지만, 중요한 문제가 하나 있습니다.
TCP는 한 chunk를 한 번에 보낸다는 보장을 하지 않는다는 것이고, chunk가 잘려서 전송될 수 있다는 것입니다.
예를 들면 이런 것입니다.
안녕하세요\r\n
안녕하세요라고 전송되어야 할 하나의 chunk가
안녕하
세요\r\n
'안녕하'만 먼저 전달되고, '세요'와 CRLF가 전달되는 경우입니다.
이렇게, 하나의 온전한 단어로 보면 이게 왜 문제인지 이해하기 어려울 수 있지만,
만약 이게 유니코드로 변환되어 전송되었다면? 혹은 json 객체로 전송되었다면?
\uc548\ub155\ud5 -> 안녕ud55
58\uc138\uc694 -> 58세요
-> 안녕하세요를 전송했는데, 유니코드가 잘려서 받는 쪽에서는 안녕ud5558세요를 받아버렸다.
한 뭉치로 받아 decode 해야 했던 걸 각각 받아 decode를 거치게 되면 온전한 chunk를 복구하지 못하고 손상된 두 개의 chunk를 만들어내게 될 가능성이 있습니다.
그래도 혹시라도, netty나 WebFlux가 잘린 chunk를 잘 핸들링하고 있을 수도 있으니 테스트를 해보았습니다.
자바스크립트로 간단한 테스트를 만들었습니다.
소켓을 열어 온전한 형태의 chunk abc를 한 번 전송해보고,
defgh는 '{length}\r\n', 'def'가 전송된 후 'gh\r\n'가 전송되도록 임의로 chunk가 잘린 상황을 가정했습니다.
이 테스트에서 우리 서버는 잘린 chunk를 잘 조합하여 abc defgh ij를 순서대로 출력해야 합니다.
function writeChunk(client, payload) {
client.write(`${payload.length.toString(16)}\r\n`);
client.write(`${payload}\r\n`);
}
function test() {
var client = new net.Socket();
client.connect(8080, '127.0.0.1', function () {
client.write('POST /chunk HTTP/1.1\r\n');
client.write('Content-Type: text/plain\r\n');
client.write('Transfer-Encoding: chunked\r\n');
client.write('\r\n');
setTimeout(() => writeChunk(client, 'abc'), 0);
// ========== 전체 chunk 가 다 안가고 분할되어서 가는 경우를 테스트 ==========
setTimeout(() => client.write(`${'defgh'.length.toString(16)}\r\n`), 1000);
setTimeout(() => client.write(`${'def'}`), 2000); // TODO 이 부분 이슈
setTimeout(() => client.write(`${'gh'}\r\n`), 3000);
// ========== 전체 chunk 가 다 안가고 분할되어서 가는 경우를 테스트 ==========
setTimeout(() => writeChunk(client, 'ij'), 4000);
setTimeout(() => writeChunk(client, ''), 5000);
});
client.on('data', function (data) {
console.log(`Received: ${data}!!`);
client.destroy(); // kill client after server's response
});
}
테스트 결과는 처참했습니다.
abc를 출력하고, def, gh, ij를 출력했습니다.
def와 gh는 transfer-encoding 스펙상 하나의 chunk로 인식되어야 하는데, 두 개의 별도의 데이터로 처리되어 버린 것입니다.
유니코드나 json이었다면 파싱에 실패해 데이터가 손상되었을 상황입니다.
해결 시도 - Length, CRLF
그렇다면 어떻게 해야 할까요?
몇 가지 방법이 떠오릅니다.
1. Length만큼 받을 때까지 처리하지 않고 기다리면 된다.
2. CRLF가 올 때까지 기다리게 하면 된다.
chunked 스펙에 content의 길이를 먼저 전송하게 되어 있으니 그 Length를 참고해 현재 받은 데이터의 길이가 length와 일치하는지 살펴보거나,
아니면 모든 Content의 끝에는 \r\n를 전송해주는 스펙도 있으니, \r\n가 들어올 때까지 기다렸다가 처리해주면 될 것이라 생각했습니다.
자, 그럼 다시 dataBuffer로 돌아가 dataBuffer에 데이터가 어떻게 들어오고 있는지 봅시다.
chunk 데이터로 'abc'를 전송한 예시입니다.
reader/writerIndex, maxCapacity가 모두 3,
혹시나 해서 dataBuffer을 readable로 바꿔 읽어봐도 97, 98, 99 (a, b, c)가 나옵니다.
chunk의 무결성을 보장하기 위해 length 값이나 CRLF 값을 참고하려 했지만,
dataBuffer에는 우리가 원하는 length나 \r\n가 아예 들어오지 않고 있습니다.
조금 알아보니, netty와 WebFlux를 통과하는 과정에서 이 값들은 모두 파싱되어 제거되고 content만 전달된다고 합니다.
그러면, 이 파싱이 일어나는 조금 더 Low level로 내려가서 살펴봐야 합니다.
netty config를 만들어 강제로 custom parse를 하는 법이 있습니다. 하지만 이는 잘 동작하고 있는 같은 서버의 다른 API request 파싱에도 영향을 주기 때문에 신중해야 합니다.
어떻게 해결하는 게 좋을지 한참을 고민하고, 여러 방법을 시도해보았습니다.
해결 방법 - netty allowPartialChunks 설정
그러다, 가장 괜찮아 보이는 방법 하나가 있었습니다.
netty server 설정을 보면 "allowPartialChunks"라는 필드가 있습니다.
If the length of a chunk exceeds the ByteBufs readable bytes and allowPartialChunks is set to true, the chunk will be split into multiple HttpContents. Otherwise, if the chunk size does not exceed maxChunkSize and allowPartialChunks is set to false, the ByteBuf is not decoded into an HttpContent until the readable bytes are greater or equal to the chunk size.
우리가 원하는 옵션입니다.
allowPartialChunks 필드가 만약 true로 설정이 되어 있으면, chunk는 들어오는 대로 파싱되어 여러 개의 데이터로 분할될 수 있고,
false로 설정되어 있으면 length(chunk size)와 비교해 데이터의 길이가 length와 같아져야만, 즉 chunk의 모든 데이터가 들어와야 비로소 decode를 시작합니다.
이제 이 필드 값을 false로만 바꿔주면 됩니다.
유효한 Netty를 잡아서 allowPartialChunks를 찾은 후 false로 바꿔줍니다.
@Configuration
class NettyServerConfig {
@Bean
fun nettyServerCustomizer() = NettyServerCustomizer {
it.doOnChannelInit { connectionObserver, channel, remoteAddress ->
val codec = channel.pipeline().get(NettyPipeline.HttpCodec) as HttpServerCodec
val inboundHandler = inboundHandlerField.get(codec) as HttpRequestDecoder
// allowPartialChunks 필드를 false 로 변경
decoderCls.getDeclaredField("allowPartialChunks").set(inboundHandler, false)
}
}
}
하지만, 이 코드는 동작하지 않습니다.
allowPartialChunks가 접근 가능하지 않도록 막혀있기 때문입니다.
그렇다면 접근 가능한 필드로 바꿔줘야 합니다.
@Configuration
class NettyServerConfig {
@Bean
fun nettyServerCustomizer(): NettyServerCustomizer {
// allowPartialChunks 필드를 접근 하기 위해 reflection 사용
val handlerCls = CombinedChannelDuplexHandler::class.java
val inboundHandlerField = handlerCls.getDeclaredField("inboundHandler")
val decoderCls = HttpObjectDecoder::class.java
val allowPartialChunksField = decoderCls.getDeclaredField("allowPartialChunks")
// allowPartialChunks 필드를 접근 가능하도록 설정
inboundHandlerField.isAccessible = true
allowPartialChunksField.isAccessible = true
return NettyServerCustomizer {
it.doOnChannelInit { connectionObserver, channel, remoteAddress ->
val codec = channel.pipeline().get(NettyPipeline.HttpCodec) as HttpServerCodec
val inboundHandler = inboundHandlerField.get(codec) as HttpRequestDecoder
// allowPartialChunks 필드를 false 로 변경
allowPartialChunksField.set(inboundHandler, false)
}
}
}
}
이렇게 하면 하나의 온전한 chunk가 들어올 때까지 기다렸다가 파싱을 시작하게 됩니다.
위에서 썼던 Javascript 테스트 코드를 그대로 실행시켜봤습니다.
다행히 defgh가 한 뭉치로 잘 들어옵니다.
이렇게 Spring WebFlux에서 chunked 스트리밍을 받기 위한 모든 과정이 끝났습니다.
최종 코드는 아래에 적어두었습니다.
https://github.com/dannaward/webflux-chunked/tree/main/src/main/kotlin/com/example/demo
백엔드를 그냥 돌아가게 만들기는 쉽지만, 견고하게 만들기 위해 디테일을 잡는 난이도는 디테일이 100%에 가까워질수록 지수함수처럼 증가한다는 것을 깨닫게 된 기술적 사투였습니다.
'Backend' 카테고리의 다른 글
DB Connection Pool에 대해 (0) | 2025.01.31 |
---|---|
사람들은 왜 자바가 아닌 코틀린에 열광할까? (0) | 2025.01.05 |
Nest.js에서 prisma exception 데코레이터로 깔끔하게 핸들링하기 (0) | 2024.11.24 |
Next.js 14와 Firebase로 간단하게 백엔드 API 만들기 (1) | 2023.12.22 |
Node.js + TypeScript를 heroku로 배포하기 (0) | 2023.04.08 |