如何解决通过kafka mqtt-proxy的Protobuf消息
我正在将protobuf消息发送到Kafka MQTT代理,但是我无法获得Kafka解析的消息值。我已经将原型消息注册到Schema注册表中,可以看到传入的消息,但是值显示为乱码。
以下行描述了我当前的架构
移动-> Kafka MQTT代理-> Kafka Broker->主题
还有this is the output I see。我不知道是否缺少配置,但是找不到有关它的任何信息。
原始消息
syntax = "proto2";
message Location {
optional double latitude = 1;
optional double longitude = 2;
}
用于将原型消息注册到架构注册表的Maven pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.confluent</groupId>
<artifactId>rest-utils-parent</artifactId>
<version>5.5.1</version>
</parent>
<artifactId>register-schema</artifactId>
<properties>
<confluent.version>5.5.0</confluent.version>
</properties>
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-provider</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>${confluent.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-maven-plugin</artifactId>
<version>5.5.0</version>
<configuration>
<schemaRegistryUrls>
<param>http://localhost:8081</param>
</schemaRegistryUrls>
<subjects>
<location-value>location.proto</location-value>
</subjects>
<schemaTypes>
<location-value>PROTOBUF</location-value>
</schemaTypes>
</configuration>
<goals>
<goal>register</goal>
</goals>
</plugin>
</plugins>
</build>
</project>
用于发布消息的Flutter代码段
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
final builder = MqttClientPayloadBuilder();
final loc = locationPb.Location();
loc.latitude = location.latitude;
loc.longitude = location.longitude;
final buf = new Uint8Buffer();
buf.addAll(loc.writeToBuffer());
builder.addBuffer(buf);
this.client.publishMessage(
"android/location",MqttQos.exactlyOnce,builder.payload);
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。