如何解决如何在Kafka邮件中添加标题?
我需要更多标头来满足日志记录等特殊需求。我对如何添加它们有些困惑。我相信应该将它们添加到我通过send()方法直接通信的地方。我该如何在Spring App中为我的Kafka消息设置自定义标头,就像这样进行通信:
final var streamMessage = StreamMessage
.builder()
.payload(Event.builder().eventId(eventId).result(result).build())
.build();
ListenableFuture<SendResult<String,StreamMessage<?>>> future = kafkaTemplate.send(TOPIC,streamMessage);
我的Kafka配置如下:
@Configuration
public class KafkaConfiguration {
private final Map<String,Object> producerProps;
private final Map<String,Object> consumerProps;
@Autowired
public KafkaConfiguration(@Value("${kafka.bootstrap.servers}") String bootstrapServers) {
this.producerProps = producerProps(bootstrapServers);
this.consumerProps = consumerProps(bootstrapServers);
}
private Map<String,Object> producerProps(String bootstrapServers) {
final Map<String,Object> props = new ConcurrentHashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG,"all");
props.put(ProducerConfig.LINGER_MS_CONFIG,1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
return props;
}
private Map<String,Object> consumerProps(String bootstrapServers) {
final Map<String,Object> props = new ConcurrentHashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,50);
return props;
}
@Bean
public ConsumerFactory<String,StreamMessage<?>> consumerFactory() {
JsonDeserializer<StreamMessage<?>> deserializer = new JsonDeserializer<>(StreamMessage.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
return new DefaultKafkaConsumerFactory<>(consumerProps,new StringDeserializer(),deserializer);
}
@Bean
public ProducerFactory<String,StreamMessage<?>> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerProps);
}
@Bean
public KafkaTemplate<String,StreamMessage<?>> kafkaProducerTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,StreamMessage<?>> listenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String,StreamMessage<?>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
解决方法
Pim(Adding custom header using Spring Kafka)的最新答案对我有用:
当我偶然发现此问题时,我正在寻找答案。但是,我使用的是ProducerRecord<?,?>
类而不是Message<?>
,因此标头映射器似乎并不相关。
这是我添加自定义标头的方法:
var record = new ProducerRecord<String,String>(topicName,"Hello World");
record.headers().add("foo","bar".getBytes());
kafkaTemplate.send(record);
现在(在使用之前)要阅读标题,我添加了一个自定义拦截器。
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@Slf4j
public class MyConsumerInterceptor implements ConsumerInterceptor<Object,Object> {
@Override
public ConsumerRecords<Object,Object> onConsume(ConsumerRecords<Object,Object> records) {
Set<TopicPartition> partitions = records.partitions();
partitions.forEach(partition -> interceptRecordsFromPartition(records.records(partition)));
return records;
}
private void interceptRecordsFromPartition(List<ConsumerRecord<Object,Object>> records) {
records.forEach(record -> {
var myHeaders = new ArrayList<Header>();
record.headers().headers("MyHeader").forEach(myHeaders::add);
log.info("My Headers: {}",myHeaders);
// Do with header as you see fit
});
}
@Override public void onCommit(Map<TopicPartition,OffsetAndMetadata> offsets) {}
@Override public void close() {}
@Override public void configure(Map<String,?> configs) {}
}
最后一点是使用以下(Spring Boot)配置将此拦截器注册到Kafka Consumer Container:
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@Configuration
public class MessagingConfiguration {
@Bean
public ConsumerFactory<?,?> kafkaConsumerFactory(KafkaProperties properties) {
Map<String,Object> consumerProperties = properties.buildConsumerProperties();
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,MyConsumerInterceptor.class.getName());
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。