Armeria로 WebSocket 서비스 만들기

Armeria 1.24.0 버전이 얼마 전에 릴리즈되었습니다. 해당 버전에서는 웹소켓 지원이 추가되었는데요. 새롭게 추가된 기능을 소개하고 웹소켓 프로토콜을 사용한 간단한 에코 서버를 만들어보면서 배운 것들을 정리해보았습니다.

PR을 보고 사용법 알아내기

아직 추가된지 얼마 되지 않은 기능이라 따로 문서가 없어 먼저 WebSocket PR을 통해 어떻게 사용하는지 알아보았습니다. Armeria의 PR은 Motivation - Modifications - Result 절로 나누어 내용을 정리하기 때문에 쉽게 이 PR이 만들어진 배경, 어떤 수정을 했는지 그리고 결과까지 쉽게 알 수 있습니다. 그 중 Modifications를 보면 다음과 같이 정리되어있습니다.

  1. Add WebSocketService and WebSocketHandler to implement the WebSocket service using back pressure.
  2. Add WebSocketFrame that represents the web socket frames.
  3. Add RequestLogBuilder.responseCause(cause) to set the cause while sending a normal response.
  4. Forked HttpServerCodec from netty to support WebSocket upgrade.
  5. Forked WebSocker encoder and decoder from Netty to use it without channels.

총 5개의 항목이 있는데요. 먼저 1번의 서비스와 핸들러라는 단어를 봤을 때 WebSocketService를 이용해 Armeria 서버에 웹소켓 서비스를 추가하고 해당 서비스로 들어오는 요청은 WebSocketHandler로 구현할 수 있음을 알 수 있습니다. 2~5번 내용도 흥미롭지만 일단 일차적인 목적은 웹소켓을 지원하는 서버를 만드는 것이므로 더 자세한 코드 분석은 미뤄두었습니다. 다음에 다른 글로 쓰면 좋을 것 같네요.

다음으로 PR의 Result 부분에 WebSocketServiceWebSocketHandler를 이용해서 만든 예제 코드가 친절하게 적혀있습니다.

ServerBuilder sb = Server.builder();
WebSocketHandler backpressureHandler = (ctx, messages) -> {
WebSocketWriter webSocketWriter = WebSocket.streaming();
// Write frames using back pressure.
return webSocketWriter;
};
sb.service("/chat", WebSocketService.of(backpressureHandler));
view raw PRExample.java hosted with ❤ by GitHub

코드는 간결하지만 저는 처음 보고 이게 어떻게 동작하는지 잘 이해하지 못했는데요. 하나씩 알아보면서 서버를 만든 과정을 소개하겠습니다.

어디서 본 듯한 streaming() 메소드

아마 Armeria에 익숙하신 분이라면 위 코드를 보자마자 어떻게 사용하는지 이해하셨을지도 모르겠습니다. 저는 아직 사용 경험이 적어 바로 이해하지 못했는데요. 다만 해당 코드와 비슷한 꼴을 어디서 본 듯한 인상을 받았습니다. 바로 stremaing() 메소드를 보고 공식 문서를 떠올렸는데요. Armeria는 백프레셔를 이용한 스트리밍 응답을 지원합니다. 이 기능은 주로 크기가 매우 큰 파일을 서빙할 때 사용합니다. 아래는 링크한 문서의 예시 코드를 첨부하였습니다.

sb.service("/big_file.dat", (ctx, req) -> {
HttpResponseWriter response = HttpResponse.streaming();
response.write(ResponseHeaders.of(200));
response.whenConsumed().thenRun(() -> {
// Produce the first chunk when the ResponseHeaders is
// written to the socket.
response.write(produceChunk(0));
response.whenConsumed().thenRun(() -> {
// Produce the second chunk when the first chunk is
// written to the socket.
response.write(produceChunk(1));
...
});
});
return response;
});

코드를 보면 PR에 쓰여있는 WebSocketWriter와 매우 유사해보이는 HttpResponseWriter를 보실 수 있습니다. 위 코드에서는 백프레셔 방식으로 Subscriber가 데이터를 요청할 때마다 파일의 청크를 HttpResponseWriter에 쓰는 것을 볼 수 있습니다. 따라서 비슷한 방식으로 WebSocketWriter#writer() 메소드를 적절히 사용하면 클라이언트에 메시지를 전달할 수 있겠다 짐작했습니다.

실제 Armeria 코드를 보면 HttpResponseWriterStreamWriter<HttpObject> 인터페이스를 구현하고 WebSocketWriterStreamWriter<WebSocketFrame> 인터페이스를 구현합니다. 즉 두 경우 모두 스트리밍 응답을 지원하는데 전자는 데이터를 HttpObject에 담아서, 후자는 WebSocketFrame에 담아서 전달하는 것을 알 수 있습니다. 결국 같은 인터페이스를 사용하기 때문에 프로토콜은 다르지만 그 기술의 세부사항을 자세히 몰라도 쉽게 사용할 수 있습니다.

💡PR Modifications의 2번을 보면 WebSocketFrame은 웹소켓 프레임을 표현한다고 적혀있습니다. WebSocketFrame 소스코드를 보면 친절하게 웹소켓 프로토콜의 명세가 있는 RFC 6455의 5. Data Framing 링크가 첨부되어있습니다. 명세의 첫 번째 줄을 보면 “In the WebSocket Protocol, data is transmitted using a sequence of frames.”라고 적혀있고 StreamWriter<WebSocketFrame>이 이 역할을 하고 있음을 짐작할 수 있습니다.

에코 서버 코드 짜보기

어느 정도 가닥을 잡았으니 간단하게 코드를 작성해보겠습니다. 개발자로서 모르는 도구를 공부할 때 정보를 수집하고 이해하는 것도 중요하지만 일단 만들어보는게 큰 도움을 줍니다. 저의 경우 계속 글을 읽기만 하면 오히려 더 어렵게 느껴져서 의식적으로 중간 단계에서 알아낸 정보로 엉성한 코드를 만들어보려 노력합니다.

1.24.0 버전에서 추가된 기능이기 때문에 build.gradle에서 Armeria의 버전을 변경했습니다.

dependencies {
implementation "com.linecorp.armeria:armeria:1.24.0"
}
view raw build.gradle hosted with ❤ by GitHub

그 다음 바로 PR과 비슷한 형태로 Server 인스턴스를 만들었습니다. 참고로 PR에 쓰여있던 예시의 WebSocketHandler는 아래에서 WebSocketServiceHandler로 변경되었는데, Armeria의 소스코드를 보면 WebSocketHandler<ServiceRequestContext>의 별칭(alias)으로 WebSocketServiceHandler를 사용합니다.

static Server newServer(int port) {
ServerBuilder sb = Server.builder();
WebSocketServiceHandler webSocketServiceHandler = (ctx, messages) -> {
WebSocketWriter webSocketWriter = WebSocket.streaming();
// client의 request body를 어떻게 끄집어낼까?
return webSocketWriter;
};
return sb.http(port)
.decorator(LoggingService.newDecorator())
.service("/chat", WebSocketService.of(webSocketServiceHandler))
.build();
}

에코 서버는 클라이언트가 받은 메시지를 그대로 돌려주면 되므로 최종적으로 webSocketWriter.write("<클라이언트가 보낸 메시지>") 형태가 될 것 같은데 위 코드의 messages에서 어떻게 클라이언트의 요청을 끄집어내는지 알아낼 필요가 있었습니다.

테스트 코드에서 추가 힌트 얻기

HttpResponseWriter와 비슷한 방식으로 WebSocketWriter#write()를 사용하면 된다는 중요한 정보를 얻었지만 여전히 클라이언트가 보낸 요청을 사용하는 방법을 알아내야 했습니다. Armeria에 대해 공부하다보면 아무래도 자료가 많은 Spring과 같은 프레임워크에 비해 검색을 통해 모르는 것을 알아내기가 쉽지는 않습니다. 그럴 때 제가 자주 쓰는 팁 중 하나는 테스트 코드를 보는 것입니다. (다른 팁으로 구글 검색에서 site:armeria.dev 검색어를 사용하는데요. 은근히 필요한 정보를 잘 찾아줘서 알아두면 좋습니다.)

PR에 포함된 WebSocketServiceTest 코드를 열어서 읽어보던 중 테스트를 위해 만들어진 AbstractWebSocketHandler 클래스를 발견했습니다. 클래스 이름을 보고 WebSocketServiceHandler를 구현하는 구체(concrete) 핸들러 클래스를 쉽게 만들 수 있겠다는 생각이 들었습니다. AbstractWebSocketHandler 클래스에서 가장 중요한 handle 메소드를 아래 첨부했습니다.

// onOpen, onText, onBinary, onClose 메소드 생략
@Override
public WebSocket handle(ServiceRequestContext ctx, WebSocket in) {
final WebSocketWriter writer = WebSocket.streaming();
in.subscribe(new Subscriber<WebSocketFrame>() {
@Override
public void onSubscribe(Subscription s) {
onOpen(writer);
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(WebSocketFrame webSocketFrame) {
try (WebSocketFrame frame = webSocketFrame) {
switch (frame.type()) {
case TEXT:
onText(writer, frame.text());
break;
case BINARY:
onBinary(writer, frame.byteBuf(ByteBufAccessMode.RETAINED_DUPLICATE));
break;
case CLOSE:
assertThat(frame).isInstanceOf(CloseWebSocketFrame.class);
final CloseWebSocketFrame closeFrame = (CloseWebSocketFrame) frame;
onClose(writer, closeFrame.status(), closeFrame.reasonPhrase());
break;
default:
// no-op
}
} catch (Throwable t) {
writer.close(t);
}
}
@Override
public void onError(Throwable t) {
writer.close(t);
}
@Override
public void onComplete() {
writer.close();
}
});
return writer;
}

WebSocket insubscribe 메소드를 사용하고 파라미터로 org.reactivestreams.Subscriber 인스턴스를 넣습니다. 아마 Reactive Streams에 익숙하신 분이라면 더 잘 이해하실 수 있을 것 같습니다.

먼저 onSubscribe 메소드를 보면 onOpen 메소드를 먼저 실행하는데 이는 AbstractWebSocketHandler를 상속한 하위 클래스에서 오버라이드할 메소드이므로 사용자가 커스터마이즈할 수 있는 부분입니다. 그 다음 s.request(Long.MAX_VALUE); 부분은 Reactive Streams의 Pub/Sub 구조와 관련된 코드입니다. Publisher는 Subscriber에게 Subscription을 넘기고 Subscriber는 이를 통해 Publisher에게 데이터를 요청합니다. 이 때 요청 이벤트 수를 조절할 수 있는데, 그 수가 “Long.MAX_VALUE”보다 크거나 같은 경우 “effectively unbounded”로 취급한다고 되어있습니다. 제 해석으로는 Publisher가 적절하게 이벤트가 발생할 때마다 전부 전송하는 것으로 이해했습니다. 아직 Reactive Streams 코드에 익숙치 않아 완벽하게 이해하지는 못했습니다.

다음으로 가장 중요한 onNext 메소드에서 서버가 받은 WebSocketFrame을 처리하는 코드가 등장합니다. 크게 복잡한 것은 없고 프레임의 타입에 따라 사용자가 오버라이드할(onText, onBinary, onClose) 메소드를 실행합니다. 해당 메소드의 코드를 보고 어느 정도 감을 잡았습니다. WebSocket in을 통해 데이터가 흘러들어오고 그 데이터를 처리하는 핸들러에서 WebSocketWriter#write() 메소드를 사용할 계획을 세웠습니다.

💡테스트 코드를 보는 방법 외에도 WebSocket messages 역시 Armeria에서 자주 사용하는 StreamMessage 인터페이스의 구현체이므로 이와 비슷한 HttpRequest, HttpMessage를 사용하는 코드를 찾아보는 것도 도움이 될 것 같습니다.

서버 코드 완성하기

AbstractWebSocketHandler를 그대로 상속하여 echo 서버의 핸들러를 만들어도 되지만 한눈에 보기 편하게 하기 위해 적절히 카피하여 재구성했습니다.

static Server newServer(int port) {
ServerBuilder sb = Server.builder();
WebSocketServiceHandler webSocketServiceHandler = (ctx, messages) -> {
WebSocketWriter webSocketWriter = WebSocket.streaming();
messages.subscribe(new Subscriber<WebSocketFrame>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(WebSocketFrame webSocketFrame) {
try (WebSocketFrame frame = webSocketFrame) {
switch (frame.type()) {
case TEXT:
// 받은 요청 메시지 그대로 응답한다.
webSocketWriter.write(frame.text());
break;
case BINARY:
// do nothing
break;
case CLOSE:
final CloseWebSocketFrame closeFrame = (CloseWebSocketFrame) frame;
webSocketWriter.close(closeFrame.status(), closeFrame.reasonPhrase());
break;
default:
// do nothing
}
} catch (Throwable t) {
webSocketWriter.close(t);
}
}
@Override
public void onError(Throwable t) {
webSocketWriter.close(t);
}
@Override
public void onComplete() {
webSocketWriter.close();
}
});
return webSocketWriter;
};
return sb.http(port)
.decorator(LoggingService.newDecorator())
.service("/chat", WebSocketService.of(webSocketServiceHandler))
.build();
}

코드의 내용은 크게 다르지 않고 중요한 부분만 보면, onNext 메소드에서 프레임의 타입이 TEXT 일 때 받은 요청 메시지 내용을 그대로 WebSocketWriter에 써서 echo 동작을 구현했습니다. (지원하는 프레임 타입에 대해서는 RFC 6455 5.2 Base Framing Protocol에 명시되어있습니다.) 물론 BINARY 타입의 메시지를 보낼 수도 있지만 뒤에서 제가 사용할 클라이언트는 TEXT 타입 메시지만 보낼 예정이라 따로 처리를 추가하지 않았습니다.

여기까지 웹소켓 에코 서버를 완성했습니다. 처음 PR의 예시 코드만 봤을 때는 막막했지만 몇 가지 테스트 코드를 읽고 나니 생각보다 쉽게 완성했습니다. 지금 핸들러 코드를 보시면 생각보다 보일러 플레이트 코드가 많아보일 수 있는데요. 아직 퍼블릭 클래스 중 AbstractWebSocketHandler와 같은 역할을 하여 백프레셔 같은 Reactive Streams의 세부 사항을 알 필요 없이 핸들러를 구현할 수 있는 추상 클래스가 없습니다. 그러나 PR의 TODO를 보면 이와 같은 역할을 하는 API가 곧 추가될 예정이라고 합니다. 이제 막 웹소켓 지원이 시작되었기 때문에 조금 기다리면 훨씬 간결한 코드로 핸들러를 작성할 수 있을 것이라 생각합니다.

클라이언트로 테스트하기

에코 서버를 테스트하기 위한 웹소켓 클라이언트는 Armeria로 작성하지 않고 크롬 익스텐션인 Simple WebSocket Client를 사용했습니다. WebSocketServiceTest 코드를 참고하면 Armeria를 이용해서 테스트할 수 있지만 상용 툴을 사용하여 빠르게 테스트하기 위해 익스텐션을 사용하였습니다. 다만 의도와 다르게 몇 가지 문제가 발생하였고 그걸 해결한 후 테스트하는 과정을 글에 담았습니다. 참고로 익스텐션을 그대로 사용하지는 않고 여러 테스트를 위해 개발자 도구에서 소스코드를 복사해 사용하였습니다.

우선 크롬 Simple WebSocket Client 익스텐션(이하 크롬 클라이언트)로 에코 서버를 테스트하려 했는데 연결부터 실패하였습니다. 연결을 시도했을 때 “undefined” 알림 메시지 외에 에러 메시지도 보이지 않았기 때문에 쉽게 그 원인을 짐작하기 어려웠습니다. 저는 문제의 원인을 파악하기 위해 되는 상황과 안되는 상황을 간단히 재현할 수 있는 curl 명령툴을 이용해 테스트를 시작했습니다.

curl로 연결 테스트하기

웹소켓 프로토콜에 대한 배경지식이 부족했기 때문에 간단히 HTTP GET 요청을 엔드포인트로 날리면 연결에 성공할 줄 알았으나 곧바로 실패했습니다. RFC 6455를 읽으면서 더 알아보니 필수적인 헤더가 필요하다는 것을 깨닫고 WebSocketServiceTest 코드를 참고하여 정리했습니다.

curl \
--header "Connection: Upgrade" \
--header "Upgrade: websocket" \
--header "Host: localhost:8080" \
--header "Origin: http://localhost:8080" \
--header "Sec-WebSocket-Key: SGVsbG8sIHdvcmxkIQ==" \
--header "Sec-WebSocket-Version: 13" \
http://localhost:8080/chat
view raw curl.sh hosted with ❤ by GitHub

우선 웹소켓을 사용하기 위해서는 Connection, Upgrade 헤더를 통해 서버에게 알려야 하고 그 외에도 Host, Origin 헤더도 추가해야 합니다. Sec 관련 헤더는 오프닝 핸드셰이크 과정에 사용하는데 그 내용을 정리했지만 글의 주제에서 벗어나는 것 같아 생략했습니다. 관심이 있으시다면 RFC 6455 1.3 Opening Handshake를 참고하시면 도움이 될 것 같습니다.

다행히 서버를 실행한 후 위 명령어를 실행하면 정상적으로 접속되었습니다. 그 다음 크롬 클라이언트가 보낸 요청과 무엇이 달라서 문제가 생긴지 비교해보았습니다. 다음은 개발자 도구로 응답 헤더를 추출한 내용입니다. (순서는 보기 편하게 중요한 내용을 위쪽에 배치했습니다.)

GET ws://localhost:8080/chat HTTP/1.1
Connection: Upgrade
Upgrade: websocket
Host: localhost:8080
Origin: null
Sec-WebSocket-Version: 13
Sec-WebSocket-Key: OwI3ES8zIMmfhPsqMj00mQ==
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Pragma: no-cache
Cache-Control: no-cache
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36
Accept-Encoding: gzip, deflate, br
Accept-Language: ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7

이 중 문제가 된 헤더는 Origin이었습니다. 크롬의 요청을 따라해 Origin 헤더에 null 값으로 curl 명령을 실행한 결과 not allowed origin: null 예외 메시지를 받을 수 있었습니다. 또한 Origin 헤더를 아예 비우면 missing the origin header 메시지를 받습니다. 크롬 클라이언트는 MDN WebSocket API를 사용하는데 아쉽게도 Origin 헤더를 변경하는 방법을 찾지 못해 서버 측 옵션을 수정해야만 했습니다.

Origin 전부 허용하기

연결 시 Origin을 확인하는 로직은 WebSocketService#checkOrigin 메소드에서 수행되고 있었습니다. 해당 코드에서 allowAnyOrigin 변수가 쓰이는 것을 보고 모든 Origin을 허용하는 옵션을 만드는 것이 가능하겠다는 생각이 들었습니다. 코드를 조금씩 따라 올라가면서 찾아본 결과 WebSocketServiceBuilder#allowedOrigins 메소드를 사용해 허용할 Origin을 화이트 리스트 방식으로 추가할 수 있다는 것을 알게 되었고, *를 추가하면 모든 Origin을 허용할 수 있다는 것도 알게 되었습니다. 따라서 이를 토대로 ServerBuilder 코드를 수정했습니다.

sb.http(port)
.decorator(LoggingService.newDecorator())
.service("/chat", WebSocketService.builder(webSocketServiceHandler).allowedOrigins("*").build())
.build();

모든 Origin을 허용한 후에는 크롬 클라이언트로 연결에 성공했고 메시지를 보낸 후 echo도 잘 동작했습니다.

(번외) 연결 유지하기

모든 테스트가 완료되었지만 일정 시간 요청을 보내지 않으면 자동으로 연결이 끊어지는 사소한 걸림돌이 있었습니다. 이 시간이 충분히 길면 괜찮지만 테스트하는 도중 잠시 다른 일을 하면 연결이 끊어져 은근한 불편함을 초래했습니다. WebSocketServiceWebSocketServiceBuilder에서는 따로 타임아웃 옵션을 찾아볼 수 없는데요. 다행히 서버 타임아웃 설정은 얼마 전 제가 직접 문서 작성에 기여했던 부분이여서 헤매지 않고 설정을 찾을 수 있었습니다. (https://armeria.dev/docs/server-timeouts)

sb.http(port)
.decorator(LoggingService.newDecorator())
.service("/chat", WebSocketService.builder(webSocketServiceHandler).allowedOrigins("*").build())
.requestTimeout(Duration.ofSeconds(100)) // 타임아웃 늘리기
.build();

타임아웃 설정은 서버 전체에 적용할 수도 있고 /chat 엔드포인트의 웹소켓 서비스에만 적용할 수도 있습니다. 간결한 결과를 보여드리기 위해서 전자를 택했는데 후자를 적용하고 싶으시다면 공식 문서의 ServiceRequestContext 사용 부분을 참고하시면 되겠습니다.

서버의 타임아웃 설정을 변경하는 방법 외에 취할 수 있는 선택지로 Ping Pong 주고받는 방법도 있습니다. (RFC 6455 5.5.2 Ping, 5.5.3 Pong 참고) Armeria의 WebSocketFrame에도 PING, PONG 타입이 있으므로 클라이언트의 Ping 요청에 Pong으로 응답하는 코드를 만들고, 크롬 클라이언트는 setInterval 메소드를 사용해 비동기로 일정 시간마다 Ping 요청을 보내면 연결을 유지할 수 있습니다.

정리

Armeria에서 새롭게 웹소켓 프로토콜을 지원한다는 소식을 듣고 바로 간단한 에코 서버를 만들어 보았습니다. 처음에는 막막했지만 직접 소스코드를 읽고 실습하는 과정에서 Armeria 뿐만 아니라 웹소켓 프로코콜에 대한 경험도 쌓을 수 있었습니다. 게다가 PR을 보면 웹소켓 서비스를 쉽게 사용할 수 있는 API 외에도 WebSocketClient, 익스텐션 등을 추후 지원할 예정으로 보입니다. 나중에 기회가 된다면 추가된 기능도 사용해보고 글로 남길 예정입니다.

최근 Armeria에 기여하기 위해 정보를 찾다보면 릴리즈 노트에서 알게 되는 것이 많아 놀라곤 합니다. 사실 그 전까지 제가 사용하는 툴의 릴리즈 노트를 눈여겨 본 적이 한 번도 없었습니다. 사실 지금도 Armeria보다 오래 전부터 알았던 Spring의 릴리즈 노트를 봐도 아는 것이 별로 없습니다. 그 차이만큼 Armeria에 관심을 갖고 알게 된 것이 생긴 것 같아 기쁘기도 마음이 복잡합니다. 😅 글을 읽으시는 분께서도 Armeria에 새롭게 추가된 기능을 확인하고 관심있었던 분야라면 직접 사용하고 공유한다면 많은 도움이 될 것 같습니다. 😄

관련문서