Java, Apache kafka에서 항목의 메시지 수를 가져오는 방법
저는 apache kafka를 메시징으로 사용하고 있습니다.자바에서 생산자와 소비자를 구현했습니다.토픽의 메시지 수를 어떻게 알 수 있을까요?
Java는 아니지만 유용할 수 있습니다.
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list <broker>:<port> \
--topic <topic-name> \
| awk -F ":" '{sum += $3} END {print sum}'
소비자의 관점에서 생각할 수 있는 유일한 방법은 메시지를 실제로 소비하고 세는 것입니다.
Kafka 브로커는 시작 후 받은 메시지 수에 대한 JMX 카운터를 표시하지만 이미 삭제된 메시지 수를 알 수 없습니다.
대부분의 일반적인 시나리오에서 Kafka 메시지는 무한 스트림으로 간주되며, 현재 디스크에 보관되어 있는 메시지 수에 대한 개별 값을 얻는 것은 관련이 없습니다.게다가, 모두 토픽의 메시지의 서브셋을 가지는 브로커 클러스터를 취급하면, 상황은 더욱 복잡해집니다.
★★ConsumerOffsetChecker
되지 않게 되었습니다.이토픽의 할 수 .다음 명령을 사용하여 토픽의 모든 메시지를 체크할 수 있습니다.
bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
--group my-group \
--bootstrap-server localhost:9092 \
--describe
서 ★★★★★LAG
파티션 내의 수 입니다.토픽 파티션의 수: "는 "메시지 수"입니다.
또한 카프카캣을 사용해 볼 수도 있습니다.이것은 토픽에서 메시지를 읽고 파티션을 분할하여 stdout으로 인쇄하는 데 도움이 되는 오픈 소스 프로젝트입니다.다음은 마지막 10개의 메시지를 읽은 샘플입니다.sample-kafka-topic
픽,, 료::
kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e
저는 사실 이것을 POC 벤치마킹에 사용하고 있습니다.Consumer Offset Checker를 사용할 항목.아래와 같이 bash 스크립트를 사용하여 실행할 수 있습니다.
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group testgroup
아래는 결과입니다.빨간색 박스에서 볼 수 있듯이 999는 현재 토픽에 있는 메시지 수입니다.
업데이트: ConsumerOffsetChecker는 0.10.0 이후 더 이상 사용되지 않습니다. ConsumerGroupCommand를 사용할 수 있습니다.
커스텀 파티셔너를 테스트하는 경우 등, 각 파티션의 메시지 수를 파악하는 것이 중요한 경우가 있습니다.카프카 0.10.2.1-2.토픽을 하면, 「 」는 「 」를 참조해당 토픽은 다음과 같습니다.kt
다음 - and 、 음음음음 。
$ kafka-run-class kafka.tools.GetOffsetShell \
--broker-list host01:9092,host02:9092,host02:9092 --topic kt
그러면 다음 3개의 파티션에서 메시지 수를 나타내는 샘플 출력이 출력됩니다.
kt:2:6138
kt:1:6123
kt:0:6137
줄 수는 주제에 대한 파티션 수에 따라 많거나 적을 수 있습니다.
https://prestodb.io/docs/current/connector/kafka-tutorial.html 를 사용합니다.
Facebook이 제공하는 슈퍼 SQL 엔진으로 여러 데이터 소스(Cassandra, Kafka, JMX, Redis...)에서 연결합니다.
PrestoDB는 옵션 워커를 포함한 서버로서 동작하고 있습니다(추가 워커가 없는 스탠드 아론 모드가 있습니다).그 후 작은 실행 가능 파일 JAR(Presto CLI)를 사용하여 쿼리를 작성합니다.
Presto 서버를 올바르게 설정하면 기존 SQL을 사용할 수 있습니다.
SELECT count(*) FROM TOPIC_NAME;
Apache Kafka 명령: 토픽의 모든 파티션에서 처리되지 않은 메시지를 가져옵니다.
kafka-run-class kafka.tools.ConsumerOffsetChecker
--topic test --zookeeper localhost:2181
--group test_group
인쇄:
Group Topic Pid Offset logSize Lag Owner
test_group test 0 11051 11053 2 none
test_group test 1 10810 10812 2 none
test_group test 2 11027 11028 1 none
열 6은 처리되지 않은 메시지입니다.다음과 같이 합산합니다.
kafka-run-class kafka.tools.ConsumerOffsetChecker
--topic test --zookeeper localhost:2181
--group test_group 2>/dev/null | awk 'NR>1 {sum += $6}
END {print sum}'
awk는 행을 읽고 헤더 행을 건너뛰고 6번째 열을 합산하여 마지막에 합계를 인쇄합니다.
인쇄물
5
Kafka 2.11-1.0.0의 Java 클라이언트를 사용하여 다음 작업을 수행할 수 있습니다.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// after each message, query the number of messages of the topic
Set<TopicPartition> partitions = consumer.assignment();
Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
for(TopicPartition partition : offsets.keySet()) {
System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
}
}
}
출력은 다음과 같습니다.
offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13
다음을 실행합니다(전제를 전제로).kafka-console-consumer.sh
패스상에 있습니다).
kafka-console-consumer.sh --from-beginning \
--bootstrap-server yourbroker:9092 --property print.key=true \
--property print.value=false --property print.partition \
--topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
토픽에 대해 저장된 모든 메시지를 가져오려면 각 파티션의 스트림 시작과 끝에 있는 사용자를 검색하여 결과를 합산하면 됩니다.
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
.map(p -> new TopicPartition(topic, p.partition()))
.collect(Collectors.toList());
consumer.assign(partitions);
consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
.collect(Collectors.toMap(Function.identity(), consumer::position));
consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
Kotlin에 있는 Kafka Consumer에서 다음과 같은 질문을 받았습니다.
val messageCount = consumer.listTopics().entries.filter { it.key == topicName }
.map {
it.value.map { topicInfo -> TopicPartition(topicInfo.topic(), topicInfo.partition()) }
}.map { consumer.endOffsets(it).values.sum() - consumer.beginningOffsets(it).values.sum()}
.first()
매우 개략적인 코드입니다.지금 막 작업을 시작했는데 기본적으로는 엔딩 오프셋에서 토픽의 시작 오프셋을 빼면 토픽의 현재 메시지 수가 됩니다.
항목에서 오래된 메시지가 삭제될 수 있는 다른 구성(정리 정책, 보존-ms 등) 때문에 끝 오프셋에만 의존할 수는 없습니다.오프셋은 앞으로만 "이동"하므로 끝 오프셋에 더 가깝게 앞으로 이동하는 간격띄우기(또는 토픽에 메시지가 없는 경우 현재 동일한 값으로 이동)입니다.
기본적으로 끝 오프셋은 해당 항목을 통과한 전체 메시지 수를 나타내며, 두 항목 간의 차이는 현재 항목에 포함된 메시지 수를 나타냅니다.
Kafka Manager의 최신 버전에는 Summed Recent Offsets라는 제목의 컬럼이 있습니다.
Kafka 문서에서 발췌한 내용
0.9.0.0의 폐지
kafka-consumer-offset-checker.sh (kafka.tools)ConsumerOffsetChecker)는 더 이상 사용되지 않습니다.앞으로 kafka-consumer-groups.sh(kafka.admin)을 사용해 주세요.Consumer Group Command)를 참조하십시오.
서버와 클라이언트 양쪽에서 SSL을 유효하게 한 Kafka 브로커를 실행하고 있습니다.아래 명령어 사용
kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x
여기서 /tmp/ssl_config는 다음과 같습니다.
security.protocol=SSL
ssl.truststore.location=truststore_file_path.jks
ssl.truststore.password=truststore_password
ssl.keystore.location=keystore_file_path.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password
서버의 JMX 인터페이스에 액세스 할 수 있는 경우, 시작 오프셋과 종료 오프셋은 다음과 같습니다.
kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
)TOPICNAME
&PARTITIONNUMBER
특정 파티션의 각 복제본을 확인해야 합니다.그렇지 않으면 어떤 브로커가 특정 파티션의 리더인지 알아내야 합니다(이것은 시간이 지남에 따라 변경될 수 있습니다).
또는 Kafka Consumer 메서드를 사용할 수 있습니다.beginningOffsets
★★★★★★★★★★★★★★★★★」endOffsets
.
소비자 그룹의 모든 소비자(또는 다른 소비자 그룹)에 대한 결과를 계산해야 하는 경우, 또 다른 옵션은 관리 클라이언트를 사용하여 토픽/파티션 오프셋(Kotlin 코드 예시)에서 소비자 그룹 오프셋을 빼는 것입니다.
val topicName = "someTopic"
val groupId = "theGroupId"
val admin = Admin.create(kafkaProps.buildAdminProperties()) // Spring KafkaProperties
val parts = admin.describeTopics(listOf(topicName)).values()[topicName]!!.get().partitions()
val topicPartitionOffsets = admin.listOffsets(parts.associate { TopicPartition(topicName, it.partition()) to OffsetSpec.latest() }).all().get()
val consumerGroupOffsets = admin.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata().get()
val highWaterMark = topicPartitionOffsets.map { it.value.offset() }.sum()
val consumerPos = consumerGroupOffsets.map { it.value.offset() }.sum()
val unProcessedMessages = highWaterMark - consumerPos
또, 통상의(admin이 아닌) 클라이언트만을 사용하는 LeYAUable 의 샘플 코드의 동작 버전을 다음에 나타냅니다.
val partitions = consumer.partitionsFor("topicName")
.map { TopicPartition(it.topic(), it.partition()) }
val highWaterMark = consumer.endOffsets(partitions).values.sum()
val consumerPosition = consumer.beginningOffsets(partitions).values.sum()
val msgCount = highWaterMark - consumerPosition
이것은 특정 소비자를 위한 오프셋만 제공합니다!일반적인 경고는 주제가 압축될 때 부정확하다는 것입니다.
저도 먹어본 적은 없지만 말이 되는 것 같아요.
이 경우에도 하실 수 있습니다.kafka.tools.ConsumerOffsetChecker
(소스).
API를 입니다./topic/topicName
"Accept"
/ 값:"application/json"
JSON " " " " " " " 헤더 " " " " " " " 。
여기에 기재되어 있습니다.
언급URL : https://stackoverflow.com/questions/28579948/java-how-to-get-number-of-messages-in-a-topic-in-apache-kafka
'programing' 카테고리의 다른 글
오류: 테이블 " "에 대해 사용자 "@"에 대해 명령 거부됨 (0) | 2022.10.23 |
---|---|
플라스크에 http 헤더를 넣는 방법 (0) | 2022.10.23 |
MySQL 결과 주문 (0) | 2022.10.23 |
태그를 생략하고 태그의 내용을 렌더링하는 사용자 지정 Vue 지시문을 사용하시겠습니까? (0) | 2022.10.23 |
JavaScript에 RegExp.escape 함수가 있나요? (0) | 2022.10.23 |