如何解决气流:从卡夫卡获取消息
我尝试使用python-kafka
软件包在Airflow中从Kafka接收消息。
仅使用Python脚本即可。但是在Airflow中,我有来自Kafka Consumer的消息。而且没有来自卡夫卡的消息。
[2020-09-07 17:51:08,046] {logging_mixin.py:112} INFO - [2020-09-07 17:51:08,046] {parser.py:166} DEBUG - Processing response MetadataResponse_v1
[2020-09-07 17:51:08,047] {logging_mixin.py:112} INFO - [2020-09-07 17:51:08,047] {conn.py:1071} DEBUG - <BrokerConnection node_id=2 host=10.1.25.112:9092 <connected> [IPv4 ('10.1.25.112',9092)]> Response 28 (55.747270584106445 ms): MetadataResponse_v1(brokers=[(node_id=2,host='10.1.25.112',port=9092,rack=None),(node_id=3,host='10.1.25.113',(node_id=1,host='10.1.25.111',rack=None)],controller_id=1,topics=[(error_code=0,topic='dev.tracking.nifi.rmsp.monthly.flow.downloading',is_internal=False,partitions=[(error_code=0,partition=0,leader=3,replicas=[3],isr=[3])])])
[2020-09-07 17:51:08,048] {logging_mixin.py:112} INFO - [2020-09-07 17:51:08,047] {cluster.py:325} DEBUG - Updated cluster metadata to ClusterMetadata(brokers: 3,topics: 1,groups: 0)
[2020-09-07 17:51:08,048] {fetcher.py:296} DEBUG - Stale metadata was raised,and we now have an updated metadata. Rechecking partition existance
[2020-09-07 17:51:08,048] {fetcher.py:299} DEBUG - Removed partition TopicPartition(topic='dev.tracking.nifi.rmsp.monthly.flow.downloading',partition=(0,)) from offsets retrieval
[2020-09-07 17:51:08,049] {logging_mixin.py:112} INFO - [2020-09-07 17:51:08,048] {fetcher.py:247} DEBUG - Could not find offset for partition TopicPartition(topic='dev.tracking.nifi.rmsp.monthly.flow.downloading',)) since it is probably deleted
[2020-09-07 17:51:08,107] {logging_mixin.py:112} INFO - [2020-09-07 17:51:08,107] {group.py:453} DEBUG - Closing the KafkaConsumer.
呼叫时发生此错误
# Issue #1780
# Recheck partition existence after after a successful metadata refresh
if refresh_future.succeeded() and isinstance(future.exception,Errors.StaleMetadata):
log.debug("Stale metadata was raised,and we now have an updated metadata. Rechecking partition existance")
unknown_partition = future.exception.args[0] # TopicPartition from StaleMetadata
if self._client.cluster.leader_for_partition(unknown_partition) is None:
log.debug("Removed partition %s from offsets retrieval" % (unknown_partition,))
timestamps.pop(unknown_partition)
为什么我不能成为主题负责人?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。