요구사항
이번 게시물의 배경이 되는 Context를 설명하도록 하겠다.
Kinesis로 모바일 유저들에 위치/속도 등 여러가지 데이터가 실시간으로 1명당 10초에 1번 꼴로 수집되는 상황이다. 약 10000명 이상의 유저가 있으며 스트림 데이터로서 관리하여야 한다.
Kinesis + Lambda를 통해서 비동기적으로 데이터가 들어올 때마다 API Gateway를 거쳐서 websocket으로 전송되던 구조에서, Spring 백엔드 서버로 Kinesis에 수집되는 데이터를 비동기적으로 수집하고 이를 또다시 비동기적으로 구독한 Client에게 실시간으로 데이터를 전송하여야 하는 상황이다.
도입
이번 게시물은 Kotlin언어 + Spring 프레임워크에서 Amazon Kinesis에서 코틀린의 코루틴과 EFO(Enhanced Fan-Out)를 활용하여, 비동기적으로 Record 수신을 인지하고 콘솔에 표시하는 방법에 대해서 다뤄볼 것이다.
https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html
Enhanced Fan-Out
KCL을 참고한다면 비교적 얼마전에 생긴 Enhanced Fan-Out Consumer를 확인할 수 있다. 이는 기존 KCL보다 더 유용한 기능들을 제공하고 있고 하나의 스트림당 최대 20명개를 둘 수 있다. 또한 특정 샤드를 선택하여 구독할 수 있는 장점이 있다. 이번 Context에서 가장 필요로 하는 "비동기 처리" 또한 EFO를 통해 해결할 수 있다.
폴링(Polling) & 푸시 방식(Pushing)에 대해서 먼저 짚고 넘어가자.
- 폴링(Polling): 전통적인 Kinesis Data Streams 사용 방식에서는 소비자가 주기적으로 스트림을 확인(poll)하여 새로운 데이터가 있는지 검사하고, 데이터가 있으면 이를 가져오는 방식을 사용한다. 이러한 방식은 지속적인 네트워크 요청이 필요하며, 실시간성과 효율성 측면에서 제한이 있을 수 있다.
- 푸시 방식(Enhanced Fan-Out): 반면, Enhanced Fan-Out 기능을 사용하면, Kinesis Data Streams는 새로운 레코드가 스트림에 추가될 때 자동으로 이를 비동기 클라이언트에 푸시한다. 이는 HTTP/2를 기반으로 하는 더 효율적인 데이터 전송 메커니즘을 사용한다. 따라서 소비자는 더 이상 데이터가 있는지 주기적으로 확인하기 위해 스트림을 폴링할 필요가 없게 되며, 데이터는 거의 실시간으로 소비자에게 전송된다.
이점
- 낮은 지연시간: Enhanced Fan-Out을 사용함으로써 데이터 스트림으로부터 데이터를 훨씬 빠르게 받을 수 있으므로, 애플리케이션의 반응 속도와 처리량이 향상된다.
- 효율적인 리소스 사용: 소비자가 데이터의 유무를 주기적으로 검사하는 폴링 방식 대신, 데이터가 자동으로 푸시되기 때문에 리소스 사용이 더 효율적이며, 비용 절감에도 도움이 될 수 있다.
- 개선된 처리량: 각 소비자는 고유한 전용 대역폭을 통해 데이터를 받기 때문에, 스트림의 데이터 처리량이 크게 향상된다.
특히나 10초에 1번씩 10000명 이상의 위치가 조회되는 상황이기 때문에 웬만하면 폴링을 지양해야한다.
의존성 설정
의존성은 2.x버전인 KCL을 활용하였다. 참고로 2.x버전 이하는 AWS에서 권장하지 않고 있다. *가끔 Chat gpt/Copliot한테 물어보면 com.~~ 종속성을 활용한 코드를 짜주는데 이건 1.x버전이니 주의하도록 하자.
implementation("software.amazon.kinesis:amazon-kinesis-client:2.5.6")
Kinesis Client Configure
@Configuration
class AmazonKinesisConfig {
@Value("\${aws_access_key_id}")
private lateinit var accessKey: String
@Value("\${aws_secret_access_key}")
private lateinit var secretKey: String
@Bean
fun amazonKinesis(): KinesisClient = KinesisClient.builder()
.region(Region.AP_NORTHEAST_2)
.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
accessKey, secretKey
)
)
)
.build()
}
먼저 KinesisClient를 스프링 빈으로 등록하여 관리하도록 한다. 이때 Credential에 대한 정보를 넣어준다. AccessKey와 SecretKey는 콘솔 홈에서 보안 자격 증명에서 발급받아 사용할 수 있다. (계정의 주인이 아닐경우 키 생성에 대한 권한을 관리자에게 요청하거나 관리자에게 키 값을 요청해야한다.)
해당 AccessToken을 발급받아 등록하면, Kinesis Async Client를 Bean으로 성공적으로 사용할 수 있게 된다.
Coroutine Flow 생명 주기
먼저 Coroutine Flow는 코루틴 상에서 리액티브 프로그래밍을 지원하기 위해 만들어진 구현체이다. 이번 Context에는 2개의 Flow가 필요하다. Kinesis 데이터 스트림을 다루기 위해서 필연적으로 사용하게 된다. 또한, gRPC 서버 스트리밍 통신에서는 서버에서 Stream 데이터를 Client에게 보내게 되므로 여기서도 Flow가 필요하다.
정리하자면,
1. Kiensis Data Stream을 처리하기 위해서.
2. gRPC 서버 스트리밍 통신을 위해서.
사용된다.
Kinesis Data Stream을 처리하는 경우는, 비동기적으로 데이터가 들어왔다는 것을 EFO가 감지하고 백엔드 서버 내부에 SharedFlow에 Emit(데이터를 발행)해야한다. 굳이 ShareFlow를 써야하는 이유는, 이 데이터를 내부에 발행하고 또다시 이를 다른 Flow를 통해 실시간으로 Client에게 전송하기 위함이다.
Kinesis Data Stream을 처리하는 Flow는 Spring Application이 시작되면 바로 생성되어야하는(Application Scope) 플로우이다. 서버가 켜져있는 동안 계속 비동기적으로 Kinesis에 들어오는 데이터들을 서버내부에 Emit해야하기 때문이다.
Shared Flow 생성
private val sharedFlow = MutableSharedFlow<ExampleData>(
replay = 0,
extraBufferCapacity = 1000,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
위 코드는 서버 내의 MutableSharedFlow다.
*SharedFlow: 다수의 코루틴 간에 데이터를 공유하며, 발행-구독 패턴을 따르는 데이터 스트림을 생성하는데 사용
먼저 아래와 같이 서버가 실행됐을 때, Shard에 구독하는 동작을 수행한다.
override fun onApplicationEvent(event: ContextRefreshedEvent) {
val applicationScope = CoroutineScope(Dispatchers.IO)
logInfo("onApplicationEvent")
applicationScope.launch {
try {
subscribeToShard()
} catch (e: Exception) {
logInfo("Error during subscription: ${e.message}")
}
}
}
그리고 샤드에 구독하는 코드는 아래와 같다.
EFO를 등록하게 되는데 저기 있는 consumerARN이 곧 Kinesis Stream ARN이 아닌 EFO ARN이다.
fun subscribeToShard() {
val request = SubscribeToShardRequest.builder()
.consumerARN(consumerARN)
.shardId(shardId)
.startingPosition { it.type(ShardIteratorType.LATEST) }
.build()
while (true) {
callSubscribeToShardWithVisitor(kinesisClient, request).join()
}
}
1. 키네시스 데이터를 MutableSharedFlow에 비동기적으로 Emit한다.
해당 코드를 통해서 이벤트를 수신하기 시작했다면, 이제는 비동기적으로 수집된 이벤트에서 Record
를 처리해야한다. 따라서 record
가 있을 때 비동기적으로 이를 String
으로 일차적으로 빼내고, SharedStream
에 Emit
해야한다.
suspend fun produceToMutableSharedFlow(record: Record) {
val dataString = String(record.data().asByteArray(), Charsets.UTF_8)
logInfo("Received data: $dataString")
try {
val exampleData = objectMapper.readValue<@Valid ExampleData>(dataString)
sharedFlow.emit(exampleData)
} catch (e: Exception) {
logInfo("Failed to parse data: $dataString")
}
}
suspend fun을 통해서 비동기적으로 레코드를 objectMapper
을 통해 객체로 변환하고, 이를 sharedFlow
에 emit
한다.
2. Client 요청을 받은 데이터를 sharedflow에서 filter처리해서 가져온다.
다음으로 sharedFlow를 구독하여 shardFlow 내부에 있는 Client가 필요로 하는 정보들을 비동기적으로 필터링해서 가져와야한다.
따라서 먼저 필터링 해주는 코드를 작성한다.
fun getExampleDataById(id: Long) =
sharedFlow.filter { it.id == id.toString() }
아래의 코드의 결과 ExampleData로 이루어진 Flow를 가져오게 되고, 해당 플로우를 collect한 뒤에 newBuilder()를 활용하여 gRPC 통신 proto 규격에 맞게 반환하고 emit()을 통해서 클라이언트에게 발행하면 된다.
정리
위 두 가지 과정을 거친다면 비동기적으로 데이터를 수집함과 동시에 gRPC 요청에 따라 클라이언트가 필요로 하는 데이터를 비동기적으로 수집된 MutableSharedFlow에서 다시 비동기적으로 수집하여 클라이언트에게 비동기적으로 실시간으로 데이터를 전송할 수 있게 되는 것이다.
이번 게시물을 작성할 때까지 Coroutine/Flow(hot&cold flow)/gRPC Server Streaming 등 정말 이해할 것이 많았고 집약적으로 게시물을 작성했는데, 언젠가 누군가에게 도움이 되었으면 좋겠다. 또한 주변 선배님들의 도움을 많이 받았다!! 내가 도움을 받은 것처럼 다른 분들도 이 게시물을 보고 도움이 되었으면 좋겠다. :)
참고 자료
https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html
https://velog.io/@inqbator/AndroidFlow
'Backend' 카테고리의 다른 글
Soft Delete 테스트: @SQLDelete와 @Where의 효과적인 검증 방법 (0) | 2024.11.30 |
---|---|
MySQL vs PostgreSQL데이터베이스의 성능 및 확장성 비교 (4) | 2024.10.31 |
비동기 처리를 지원하는 모델(스레드 기반/이벤트 루프 기반) (0) | 2024.06.23 |
Presigned Url 으로 S3에 이미지 업로드하기 (Kotlin) (0) | 2024.04.19 |
스트리밍 데이터(Data Stream) (0) | 2024.03.18 |