apache kafka 란?

핵심개념들

카프카 , Producer, Consumer, Consumer Group, Topic, Partition, Segment, Broker, zookeeper, Quorum 알고리즘기반, __consumer_offsets, Broker replica, ISR, OSR, Controller, fetch의 흐름

카프카?

움직이는 데이터를 처리하는 플랫폼
카프카는 이벤트 스트리밍 플랫폼 - 흐르는 데이터를 처리하기 위한 플랫폼
LinkedIn 내에서 개발
기존의 Messaging Platform(예, MQ)로 이벤트 스트림이처리 불가능하여 이벤트 스트림 처리를 위해 개발

카프카는 이벤트 스트림을 안전하게 전송(Publish & Subscribe)
이벤트 스트림을 디스크에 저장(write to disk)
이벤트 스트림을 분석 및 처리(processing & ananlysis)

이벤트?

Event는 비즈니스에서 일어나는 모든 일(데이터)을 의미
Event Stream은 연속적인 많은 이벤트들의 흐름을 의미

  1. 이벤트 스트림을 안전하게 전송 / Publish & Subscribe
  2. 이벤트 스트림을 디스크에 안전하게 저장 / Write to Disk(중요)
  3. 실시간 이벤트 스트림을 분석 및 처리 / Processing & Ananlysis

Event(메시지/데이터)가사용되는모든곳에서사용
o Messaging System
o IOT디바이스로부터데이터수집
o Realtime Event Stream Processing (Fraud Detection, 이상감지등)
o 애플리케이션에서 발생하는 로그수집
o DB동기화(MSA 기반의분리된DB간동기화)
o 실시간ETL
o Spark, Flink, Storm, Hadoop 과같은빅데이터기술과같이사용

Producer, Consumer, Consumer Group

• Producer : 메시지를생산(Produce)해서 Kafka의Topic으로메시지를 보내는애플리케이션
• Consumer : Topic의메시지를가져와서소비(Consume)하는 애플리케이션
• Consumer Group : Topic의메시지를사용하기위해협력하는 Consumer들의집합
• 하나의Consumer는하나의Consumer Group에포함되며, Consumer Group내의Consumer들은협력하여Topic의메시지를 분산병렬처리함

Producer가Write하는LOG-END-OFFSET과Consumer Group의Consumer가Read하고
처리한후에Commit한CURRENT-OFFSET과의차이(Consumer Lag)가발생할수있음

Topic, Partition, Segment

Topic : Kafka안에서메시지가저장되는장소, 논리적인표현
Partition : Commit Log, 하나의Topic은 하나 이상의Partition으로구성 병렬처리(Throughput향상)를 위해서 다수의Partition 사용. 파티션은 브로커들에 분산되어 저장된다.
Segment : 메시지(데이터)가저장되는실제물리File Segment File이 지정된 크기보다크거나 지정된 기간보다 오래되면 새파일이 열리고 메시지는새파일에추가됨

Topic생성시Partition개수를지정 개수변경가능하나운영시에는변경권장하지않음
Topic내의Partition들은서로독립적임
Event(Message)의순서는하나의Partition내에서만보장
Partition은Segment File들로구성됨
Rolling 정책: log.segment.bytes(default 1 GB), log.roll.hours(default 168 hours)

Broker

Kafka Broker는Partition에대한Read및Write를관리하는소프트웨어

• Kafka Server라고부르기도함
• Topic내의Partition들을분산,유지및관리
• 각각의Broker들은ID로식별됨(단, ID는숫자)
• Topic의일부Partition들을포함
-> Topic데이터의일부분(Partition)을갖을뿐데이터전체를갖고있지않음
• Kafka Cluster : 여러개의Broker들로구성됨
• Client는특정Broker에연결하면전체클러스터에연결됨
• 최소3대이상의Broker를하나의Cluster로구성해야함
-> 4대이상을권장함

• Topic을구성하는Partition들은여러Broker 상에분산됨
• Topic생성시Kafka가자동으로Topic을구성하는전체 Partition들을모든Broker에게할당해주고분배해줌

모든 Kafka Broker는 Bootstrap(부트스트랩)서버라고부름
• 하나의Broker에만연결하면Cluster전체에연결됨
-> 하지만,특정Broker장애를대비하여,전체Broker List(IP, port)를파라미터로입력권장
• 각각의Broker는모든Broker, Topic, Partition에대해알고있음(Metadata)

zookeeper

Zookeeper는Broker를관리(Broker 들의목록/설정을관리)하는소프트웨어
• Zookeeper는변경사항에대해Kafka에알림
->Topic 생성/제거, Broker 추가/제거등
• Zookeeper 없이는Kafka가작동할수없음
->KIP1)-500 을통해서Zookeeper제거가진행중
->2022년에Zookeeper를제거한정식버전출시예정중
• Zookeeper는홀수의서버로작동하게설계되어있음 (최소3, 권장5)
• Zookeeper에는Leader(writes)가있고나머지 서버는Follower(reads)

KIP : Kafka Improvement Proposal

Zookeeper는 분산형 Configuration 정보유지, 분산동기화서비스를제공하고대용량분산
시스템을위한네이밍레지스트리를제공하는소프트웨어
Leader, Follower로 구성되며 Floower는 Leader와 동기화하여 브로커에 전달한다.

분산작업을제어하기위한Tree형태의데이터저장소
-> Zookeeper를사용하여멀티Kafka Broker들간의정보(변경사항포함)공유,동기화등을수행

zookeeper는 Quorum 알고리즘기반

Quorum(쿼럼)은“정족수”이며, 합의체가의사를진행시키거나의결을하는데필요한최소한도의 인원수를뜻함
분산코디네이션환경에서예상치못한장애가발생해도분산시스템의일관성을유지시키기 위해서사용
Ensemble이3대로구성되었다면Quorum은2,즉Zookeeper 1대가장애가발생하더라도정상동작
Ensemble이5대로구성되었다면Quorum은3,즉Zookeeper 2대가장애가발생하더라도정상동작

3대를쓰나 4대를 쓰나 1개의 장애밖에 못버틴다(홀수를 권장하는 이유)

• Broker는Partition에대한Read및Write를관리하는소프트웨어
• Broker는Topic내의Partition들을분산,유지및관리
• 최소3대이상의Broker를하나의Cluster로구성해야함
-> 4대이상을권장함
• Zookeeper는Broker를관리(Broker 들의목록/설정을관리)하는소프트웨어
• Zookeeper는홀수의서버로작동하게설계되어있음(최소3, 권장5)

Producer

Message == Record == Event == Data

레코드는 Header(Topic, Partition, Timestamp, etc), Key, Value로 구성된다.
Key와Value는Avro, JSON 등다양한형태가가능

Kafka는Record(데이터)를Byte Array로저장

흐름

  1. 프로듀서에서 토픽을 send()
  2. Serializer(String, avro, json ..등 다양함) 를 통해 byte array로 변환
  3. Partitioner을 통해 어느파티션으로 보낼지 정해지고
  4. Compress(사용유무 선택가능)
  5. RecordAccumulator으로 보내져서 배치 혹은 건별로 카프카로 전송이 된다.
  6. 성공하면 metadata를 리턴하고 실패하면 retry

개발자는 Serializer, Compress만 설정하고 send()메소드 정도만 호출하면 된다

Partitioner의역할
메시지를Topic의어떤Partition으로보낼지결정(Hash(Key) % Number of Partitions)

Key 가null일때처리
Kafka 2.4 이전 : DefaultPartitioner는 Round Robin정책으로동작. 여러 파티션에 하나씩 배정

Kafka 2.4이후: DefaultPartitioner는 Sticky정책으로동작. 하나의Batch가닫힐때까지 하나의partition에게record를 보내고랜덤으로Partition 선택.
배치처리가 더욱 효율적으로 이뤄짐

Counsumer

Consumer Offset : Consumer Group이읽은위치를표시

• Consumer가자동이나수동으로데이터를읽은위치를commit하여다시읽음을방지
• __consumer_offsets 라는Internal Topic에서Consumer Offset을저장하여관리

4개의Partition으로구성된Topic의데이터를사용하는Single Consumer가있는경우,
이Consumer는Topic의모든Partition에서모든Record를Consume함

하나의Consumer는각Partition에서의
Consumer Offset을별도로유지(기록) 하면서모든Partition에서Consume함

• 4개의파티션이있는Topic를consume하는4개의Consumer가하나의Consumer Group에있다면,각Consumer는정확히하나의Partition에서Record를consume함
• Partition은항상Consumer Group내의하나의Consumer에의해서만사용됨
• Consumer는주어진Topic에서0개이상의많은Partition을사용할수있음

Partition이2 개이상인경우모든메시지에대한전체순서보장불가능
Partition을1개로구성하면모든메시지에서전체순서보장가능‒처리량저하
대부분의경우, Key로구분할수있는메시지들의순서보장이필요한경우가많음

운영중에Partition 개수를변경하면어떻게될까?
해쉬알고리즘은 파티션 갯수로 나누기 때문에 결국 순서보장이 불가능해짐

Key Cardinality는Consumer Group의개별Consumer가 수행하는작업의양에영향
적절히 잘 키를 선택해서 누군 놀고 누군 쉬는것이 일어나지 않도록 해야함
• Key선택이잘못되면작업부하가 고르지않을수있음
• Key는Integer, String등과같은 단순한유형일필요가없음
• Key는Value와마찬가지로Avro, JSON 등여러필드가있는복잡한객체일수있음
• 따라서, Partition전체에 Record를고르게배포하는Key를 만드는것이중요

Consumer Rebalancing
Consumer Group의다른Consumer가 실패한Consumer를대신함

복제

Broker에장애가발생하면어떻게될까?
장애가발생한Broker내의 Partition들은모두사용할수없게되는문제발생

다른Broker에서장애가발생한Partition을대신해서Partition을새로만들면장애를해결?
기존메시지는버릴것인가?기존Offset 정보들을버릴것인가?

Partition을복제(Replication)하여다른Broker상에서복제물(Replicas)을만들어서장애를 미리대비함

Replicas - Leader Partition, Follower Partition로 구성됨
Replication Factor = 3이라 하면 Leader Partition 한개, Follower Partition 두개로 구성됨

Producer는Leader에만Write하고Consumer는Leader로부터만Read함(Apache Kafka 2.4부터 Follower 파티션에서 컨슈머가 Fetching(Read) 가능)
Follower는Broker장애시안정성을제공하기위해서만존재
Follower는Leader의Commit Log에서데이터를가져오기요청(Fetch Request)으로복제 - 팔로워가 리더한테 요청을 해서 가져가는 구조

Leader에장애가발생하면?
Kafka 클러스터는Follower중에서새로운Leader를선출 Clients(Producer/Consumer)는자동으로새Leader로전환

하나의Broker에만Partition의Leader들이몰려있다면?
특정Broker에만Client(Producer/Consumer)로인해부하집중(Hot Spot)

Hot Spot 방지
auto.leader.rebalance.enable : 기본값enable # 리더가 각 브로커 적절히 분배되도록 설정하는것.
leader.imbalance.check.interval.seconds : 기본값300 sec # 30 초마다 리더가 적절히 분배되었는지 확인
leader.imbalance.per.broker.percentage : 기본값10 # 다른 브로커보다 10퍼센트 이상 많이가져가면 불균형이라 판단

Rack Awareness
Rack 간분산하여Rack 장애를대비
동일한Rack 혹은Available Zone상의Broker들에동일한“rack name” 지정
복제본(Replica-Leader/Follower)은최대한Rack간에균형을유지하여Rack 장애대비
Topic 생성시또는Auto Data Balancer/Self Balancing Cluster 동작때만실행

In-Sync Replicas(ISR) : Leader 장애시Leader를선출하는데사용

In-Sync Replicas(ISR)는High Water Mark라고하는지점까지동일한Replicas (Leader와 Follower모두)의목록

Leader에장애가발생하면, ISR 중에서새Leader를선출

ISR : replica.lag.max.messages

replica.lag.max.messages=4 - 리더가 갖고있는 LOG-END-OFFSET과 4 미만으로 차이나는 팔로워들을 ISR로 지정한다 4개 이상 차이나는 팔로워들을 Out-of-Sync Follower(OSR) 이라 한다,

High Water Mark: 4미만(replica.lag.max.messages=4일 때)으로 차이나는 지점 중 하나로써 컨슈머가 거기까지 잘 복사해간것을 의미

replica.lag.max.messages로ISR판단시나타날수있는문제점

• 메시지유입량이갑자기늘어날경우(예,초당10 msg/sec),지연으로판단하고OSR(Outof-
Sync Replica)로상태를변경시킴
• 메시지가항상일정한비율(초당유입되는메시지, 3 msg/sec 이하)로Kafka로들어올
때, replica.lag.max.messages=5로하면5개이상으로지연되는경우가없으므로 ISR들이정상적으로동작
• 실제Follower는정상적으로동작하고단지잠깐지연만발생했을뿐인데, replica.lag.max.messages옵션을이용하면OSR로판단하게되는문제가발생(운영중에 불필요한error 발생및그로인한불필요한retry 유발)

ISR : replica.lag.time.max.ms으로판단해야함

• Follower가Leader로Fetch 요청을보내는Interval을체크
• 예) replica.lag.time.max.ms = 10000이라면Follower가Leader로Fetch 요청을
10000 ms내에만요청하면정상으로판단
• Confluent 에서는replica.lag.time.max.ms옵션만제공(복잡성제거)

ISR은Leader(브로커)가관리
①Follower가너무느리면Leader는ISR에서Follower를제거하고ZooKeeper에ISR을유지
②Controller는Partition Metadata에대한변경사항에대해서Zookeeper로부터수신
(replica.lag.time.max.ms이내에 Follower가fetch하지않으면 ISR에서제거함)

Controller ??

• Kafka Cluster내의Broker중하나가Controller가됨
• Controller는ZooKeeper를통해Broker Liveness를모니터링 , ISR 정보를 받아들임
• Controller는Leader와Replica 정보를Cluster내의다른Broker들에게전달
• Controller는ZooKeeper에Replicas정보의복사본을유지한다음더빠른액세스를
위해클러스터의모든Broker들에게동일한정보를캐시함
• Controller가Leader 장애시Leader Election을수행
• Controller가장애가나면다른Active Broker들중에서재선출됨(주키퍼가 함)

Last Committed Offset, Current Position, High Water Mark, Log End Offset??

• Last Committed Offset(Current Offset) : Consumer가최종Commit한Offset
• Current Position : Consumer가읽어간위치(처리중, Commit 전)
• High Water Mark(Committed / Fully-Replicated Committed) : ISR(Leader-Follower)간에복제된Offset
• Log End Offset : Producer가메시지를보내서저장된,로그의맨끝Offset

Committed의의미?

• ISR 목록의모든Replicas가메시지를성공적으로가져오면“Committed”. OSR에 있는건 신경쓰지 않음
• Consumer는Committed메시지만읽을수있음
• Leader는메시지를Commit할시기를결정
• Committed메시지는모든Follower에서 동일한Offset을갖도록보장(OSR이라 한들 언젠간 따라잡을테니)
• 즉, 어떤Replica가Leader인지에관계없이 (장애발생이라도) 모든Consumer는해당 Offset에서같은데이터를볼수있음
• Broker가다시시작할때Committed 메시지목록을유지하도록하기위해, Broker의모든Partition에대한마지막 Committed Offset은replicationoffset-checkpoint라는파일에기록됨

High Water Mark
• 가장최근에Committed메시지의Offset추적
• replication-offset-checkpoint 파일에체크포인트를기록

Leader Epoch
• 새Leader가선출된시점을Offset으로표시
• Broker복구중에메시지를체크포인트로자른다음현재Leader를따르기위해사용됨
• Controller가새Leader를선택하면Leader Epoch를업데이트하고해당정보를ISR 목록의모든구성원에게보냄
• leader-epoch-checkpoint 파일에체크포인트를기록
즉 새로운 리더에 대한 기록이라고 보면 될듯

fetch의 흐름

팔로워들은 패쳐쓰레드가 리더한테 페치(데이터를 가져와서)를 해서 자신의 파티션에 write를 함
팔로워의 패쳐 쓰레드가 fetch를 했는데 null 이 오는경우(더이상 데이터를 가져갈게 없는 최신의 상태)
Leader는 자신의 High Water Mark 이동한다.
그 이후 또 팔로워가 fetch를 리더한테 하면 Leader는High Water Mark를 던져주고 팔로워들 또한 High Water Mark를 갱신한다.

Share