如何在客户端使用Java读取gRPC中的元数据

我正在使用 Java和Protoc 3.0编译器,我的proto文件在下面提到.
https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang
syntax = "proto3";

package Telemetry;

// Interface exported by Agent
service OpenConfigTelemetry {
    // Request an inline subscription for data at the specified path.
    // The device should send telemetry data back on the same
    // connection as the subscription request.
    rpc telemetrySubscribe(SubscriptionRequest)                     returns (stream OpenConfigData) {}

    // Terminates and removes an exisiting telemetry subscription
    rpc cancelTelemetrySubscription(CancelSubscriptionRequest)      returns (CancelSubscriptionReply) {}

    // Get the list of current telemetry subscriptions from the
    // target. This command returns a list of existing subscriptions
    // not including those that are established via configuration.
    rpc getTelemetrySubscriptions(GetSubscriptionsRequest)          returns (GetSubscriptionsReply) {}

    // Get Telemetry Agent Operational States
    rpc getTelemetryOperationalState(GetOperationalStateRequest)    returns (GetOperationalStateReply) {}

    // Return the set of data encodings supported by the device for
    // telemetry data
    rpc getDataEncodings(DataEncodingRequest)                       returns (DataEncodingReply) {}
}

// Message sent for a telemetry subscription request
message SubscriptionRequest {
    // Data associated with a telemetry subscription
    SubscriptionInput input                                 = 1;

    // List of data models paths and filters
    // which are used in a telemetry operation.
    repeated Path path_list                                 = 2;

    // The below configuration is not defined in Openconfig RPC.
    // It is a proposed extension to configure additional
    // subscription request features.
    SubscriptionAdditionalConfig additional_config          = 3;
}

// Data associated with a telemetry subscription
message SubscriptionInput {
    // List of optional collector endpoints to send data for
    // this subscription.
    // If no collector destinations are specified,the collector
    // destination is assumed to be the requester on the rpc channel.
    repeated Collector  collector_list                      = 1;
}

// Collector endpoints to send data specified as an ip+port combination.
message Collector {
    // IP address of collector endpoint
    string address                                          = 1;

    // Transport protocol port number for the collector destination.
    uint32 port                                             = 2;
}

// Data model path
message Path {
    // Data model path of interest
    // Path specification for elements of OpenConfig data models
    string path                                             = 1;

    // Regular expression to be used in filtering state leaves
    string filter                                           = 2;

    // If this is set to true,the target device will only send
    // updates to the collector upon a change in data value
    bool suppress_unchanged                                 = 3;

    // Maximum time in ms the target device may go without sending
    // a message to the collector. If this time expires with
    // suppress-unchanged set,the target device must send an update
    // message regardless if the data values have changed.
    uint32 max_silent_interval                              = 4;

    // Time in ms between collection and transmission of the
    // specified data to the collector platform. The target device
    // will sample the corresponding data (e.g,. a counter) and
    // immediately send to the collector destination.
    //
    // If sample-frequency is set to 0,then the network device
    // must emit an update upon every datum change.
    uint32 sample_frequency                                 = 5;
}

// Configure subscription request additional features.
message SubscriptionAdditionalConfig {
    // limit the number of records sent in the stream
    int32 limit_records                                     = 1;

    // limit the time the stream remains open
    int32 limit_time_seconds                                = 2;
}

// Reply to inline subscription for data at the specified path is done in
// two-folds.
// 1. Reply data message sent out using out-of-band channel.
// 2. Telemetry data send back on the same connection as the
//    subscription request.

// 1. Reply data message sent out using out-of-band channel.
message SubscriptionReply {
    // Response message to a telemetry subscription creation or
    // get request.
    SubscriptionResponse response                           = 1;

    // List of data models paths and filters
    // which are used in a telemetry operation.
    repeated Path path_list                                 = 2;
}

// Response message to a telemetry subscription creation or get request.
message SubscriptionResponse {
    // Unique id for the subscription on the device. This is
    // generated by the device and returned in a subscription
    // request or when listing existing subscriptions
    uint32 subscription_id = 1;
}

// 2. Telemetry data send back on the same connection as the
//    subscription request.
message OpenConfigData {
    // router name:export IP address
    string system_id                                        = 1;

    // line card / RE (slot number)
    uint32 component_id                                     = 2;

    // PFE (if applicable)
    uint32 sub_component_id                                 = 3;

    // Path specification for elements of OpenConfig data models
    string path                                             = 4;

    // Sequence number,monotonically increasing for each
    // system_id,component_id,sub_component_id + path.
    uint64 sequence_number                                  = 5;

    // timestamp (milliseconds since epoch)
    uint64 timestamp                                        = 6;

    // List of key-value pairs
    repeated KeyValue kv                                    = 7;
}

// Simple Key-value,where value could be one of scalar types
message KeyValue {
    // Key
    string key                                              =  1;

    // One of possible values
    oneof value {
        double double_value                                 =  5;
        int64  int_value                                    =  6;
        uint64 uint_value                                   =  7;
        sint64 sint_value                                   =  8;
        bool   bool_value                                   =  9;
        string str_value                                    = 10;
        bytes  bytes_value                                  = 11;
    }
}

// Message sent for a telemetry subscription cancellation request
message CancelSubscriptionRequest {
    // Subscription identifier as returned by the device when
    // subscription was requested
    uint32 subscription_id                                  = 1;
}

// Reply to telemetry subscription cancellation request
message CancelSubscriptionReply {
    // Return code
    ReturnCode code                                         = 1;

    // Return code string
    string     code_str                                     = 2;
};

// Result of the operation
enum ReturnCode {
    SUCCESS                                                 = 0;
    NO_SUBSCRIPTION_ENTRY                                   = 1;
    UNKNOWN_ERROR                                           = 2;
}

// Message sent for a telemetry get request
message GetSubscriptionsRequest {
    // Subscription identifier as returned by the device when
    // subscription was requested
    // --- or ---
    // 0xFFFFFFFF for all subscription identifiers
    uint32 subscription_id                                  = 1;
}

// Reply to telemetry subscription get request
message GetSubscriptionsReply {
    // List of current telemetry subscriptions
    repeated SubscriptionReply subscription_list            = 1;
}

// Message sent for telemetry agent operational states request
message GetOperationalStateRequest {
    // Per-subscription_id level operational state can be requested.
    //
    // Subscription identifier as returned by the device when
    // subscription was requested
    // --- or ---
    // 0xFFFFFFFF for all subscription identifiers including agent-level
    // operational stats
    // --- or ---
    // If subscription_id is not present then sent only agent-level
    // operational stats
    uint32 subscription_id                                  = 1;

    // Control verbosity of the output
    VerbosityLevel verbosity                                = 2;
}

// Verbosity Level
enum VerbosityLevel {
    DETAIL                                                  = 0;
    TERSE                                                   = 1;
    BRIEF                                                   = 2;
}

// Reply to telemetry agent operational states request
message GetOperationalStateReply {
    // List of key-value pairs where
    //     key      = operational state definition
    //     value    = operational state value
    repeated KeyValue kv                                    = 1;
}

// Message sent for a data encoding request
message DataEncodingRequest {
}

// Reply to data encodings supported request
message DataEncodingReply {
    repeated EncodingType  encoding_list                    = 1;
}

// Encoding Type Supported
enum EncodingType {
    UNDEFINED                                               = 0;
    XML                                                     = 1;
    JSON_IETF                                               = 2;
    PROTO3                                                  = 3;
}

为了进行服务调用(rpc TelemetrySubscribe),我首先需要读取具有订阅ID的头,然后开始阅读消息.现在,使用Java我能够连接服务,我确实介绍了拦截器但是当我打印/检索头时它是null.我的调用拦截器代码如下,

ClientInterceptor interceptor = new HeaderClientInterceptor();
      originChannel = OkHttpChannelBuilder.forAddress(host,port)
        .usePlaintext(true)
        .build();
     Channel channel =  ClientInterceptors.intercept(originChannel,interceptor);
      telemetryStub = OpenConfigTelemetryGrpc.newStub(channel);

这是读取元数据的拦截器代码.

@Override
  public <ReqT,RespT> ClientCall<ReqT,RespT> interceptCall(MethodDescriptor<ReqT,RespT> method,CallOptions callOptions,Channel next) {
    return new SimpleForwardingClientCall<ReqT,RespT>(next.newCall(method,callOptions)) {

      @Override
      public void start(Listener<RespT> responseListener,Metadata headers) {

        super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
          @Override
          public void onHeaders(Metadata headers) {

             Key<String> CUSTOM_HEADER_KEY = Metadata.Key.of("responseKEY",Metadata.ASCII_STRING_MARSHALLER);

            System.out.println("Contains Key?? "+headers.containsKey(CUSTOM_HEADER_KEY));

想知道有没有其他方法来读取元数据或第一个有订阅ID的消息?所有我需要阅读第一条有订阅ID的消息,并将相同的订阅ID返回给服务器以便流可以启动我使用相同的原型文件使用相同的Python代码,它通过下面的代码提供与服务器通信仅供参考:

sub_req = SubscribeRequestMsg("host",port)
     data_itr = stub.telemetrySubscribe(sub_req,_TIMEOUT_SECONDS)
     metadata = data_itr.initial_metadata()

                   if metadata[0][0] == "responseKey":
                    metainfo = metadata[0][1]
                    print metainfo

                    subreply = agent_pb2.SubscriptionReply()
                    subreply.SetInParent()
                    google.protobuf.text_format.Merge(metainfo,subreply)

                    if subreply.response.subscription_id:
                    SUB_ID = subreply.response.subscription_id

从上面的python代码我可以轻松检索元数据对象,不知道如何使用Java检索它?

在阅读metaData之后,我得到的是:元数据({content-type = [application / grpc],grpc-encoding = [identity],grpc-accept-encoding = [identity,deflate,gzip]})

但我知道从元数据到它还有一条线,也就是说

response {
  subscription_id: 2
}

如何从Header中提取包含订阅ID的最后一个响应.我确实尝试了很多选项,我迷失在这里.

解决方法

您使用的方法是请求元数据,而不是响应元数据:
public void start(Listener<RespT> responseListener,Metadata headers) {

对于响应元数据,您将需要ClientCall.Listener并等待onHeaders回调:

public void onHeaders(Metadata headers)

我觉得你提到的元数据的使用似乎很奇怪.元数据通常用于其他错误详细信息或不特定于RPC方法的交叉功能(如身份验证,跟踪等).

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


gRPC 前言 为什么使用gRPC 传输协议 传输效率 性能消耗 gRPC入门 gRPC流 证书认证 使用根证书 gRPC实现token认证 和Web服务共存 验证器 REST接口 grpcurl工具
参考文章: 1.&#160;https://www.cnblogs.com/kaixinyufeng/p/9651513.html 2.&#160;http://jia-shun.cn/2018/08
今天给大家翻译一篇由ASP.NET首席开发工程师 &quot;James Newton King&quot; 前几天发表的一篇博客,文中带来了一个实验性的产品gRPC Web。大家可以点击文末的讨论帖
上一篇文章我带着大家体验了一把《 &quot;ASP.NET Core 3.0 上的gRPC服务模板初体验(多图)&quot; 》,如果有兴趣的可以点击链接进行查看,相信跟着做的你,也是可以跑起来的。
早就听说ASP.NET Core 3.0中引入了gRPC的服务模板,正好趁着家里电脑刚做了新系统,然后装了VS2019的功夫来体验一把。同时记录体验的过程。如果你也想按照本文的步骤体验的话,那你得先安
这篇笔记主要是记录学习历程而不是怎么用~,以及protobuffers 和 gprc 各种文档的地址,等过上大半年后通过这篇笔记帮助自己快速重新掌握这个技术点 一、Protocolbuffers 关于
最近GRPC很火,感觉整RPC不用GRPC都快跟不上时髦了。 gRPC设计 gRPC是一种与语言无关的高性能远程过程调用 (RPC) 框架。刚好需要使用一个的RPC应用系统,自然而然就盯上了它,但是它
   gRPC是google开源提供的一个RPC软件框架,它的特点是极大简化了传统RPC的开发流程和代码量,使用户可以免除许多陷阱并聚焦于实际应用逻辑中。作为一种google的最新RPC解决方案,gRPC具备了以下这些强项: 1、gRPC在HTTP/2协议上用protobuf取代了json实现了最佳效率 2、用IDL(Interface Definition Language),一种简单的描述语言
  接着上期讨论的gRPC unary服务我们跟着介绍gRPC streaming,包括: Server-Streaming, Client-Streaming及Bidirectional-Streaming。我们首先在.proto文件里用IDL描述Server-Streaming服务: /* * responding stream of increment results */ servi
我已经设法通过GRPC使用流媒体模式的服务帐户为我的 Android应用程序运行Google Cloud Speech.但是,根据我所读到的内容,出于安全原因,我不应该在其中部署具有这些凭据的Android应用程序(当前存储为资源中的JSON文件).正确的是创建一个API密钥,如下所述: https://cloud.google.com/speech/docs/common/auth 这允许我限制
  安装protobuf go get -u github.com/golang/protobuf/proto go get -u github.com/golang/protobuf/protoc-gen-go 此时会生成protoc-gen-go,protoc一般是获取已经编译好的可执行文件(https://github.com/google/protobuf/releases)   安装gR
一、grpc安装 将 https://github.com/google/go-genproto 放到 $GOPATH/src/google.golang.org/genproto 将 https://github.com/grpc/grpc-go 放到 $GOPATH/src/google.golang.org/grpc 将 https://github.com/golang/t
参考URL: https://segmentfault.com/a/1190000015220713?utm_source=channel-hottest gRPC 是一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计。目前提供 C、Java 和 Go 语言版本,分别是:grpc, grpc-java, grpc-go. 其中 C 版本支持 C, C++, Node.js, P
我试图在电子应用程序中要求grpc,但我收到以下错误: Error: dlopen(/srv/node_modules/grpc/src/node/extension_binary/grpc_node.node, 1): Symbol not found: _GENERAL_NAME_free Referenced from: /srv/node_modules/grpc/src/node/e
我试图调用GRPC端点,但我想提供客户身份验证标头.我在哪里指定这个? var client = new proto.Publisher('127.0.0.1:50051', grpc.credentials.createInsecure()); var customHeader = { 'authorization': 'secret' } client.publish(d
我正在尝试创建一个 java grpc客户端来与go中的服务器通信.我是grpc的新手所以遵循本教程 gRPC Java Tutorial.在这些示例中,它们指的是阻塞和非阻塞存根,它们似乎是从 github的其他地方导入的. import io.grpc.examples.routeguide.RouteGuideGrpc.RouteGuideBlockingStub; import io.gr
我正在尝试做类似下面的事情(即使用流式grpc调用从客户端向服务器发送数据).代码参考取自官方网站上给出的grpc示例,用于解释目的: 客户端代码: ClientContext context; context.AddMetadata("authorization", "abcd"); context.set_deadline(...); std::unique_ptr<ClientWriter
什么是gRPC gRPC是google开源的一个高性能、跨语言的RPC框架,基于HTTP2协议,采用ProtoBuf 定义的IDL。 gRPC 的主要优点是: 现代高性能轻量级 RPC 框架。 协定优先 API 开发,默认使用协议缓冲区,允许与语言无关的实现。 可用于多种语言的工具,以生成强类型服务器和客户端。 支持客户端、服务器和双向流式处理调用。 使用 Protobuf 二进制序列化减少对网络
一.简介 gRPC 是一个由Google开源的,跨语言的,高性能的远程过程调用(RPC)框架。 gRPC使客户端和服务端应用程序可以透明地进行通信,并简化了连接系统的构建。它使用HTTP/2作为通信协议,使用 Protocol Buffers 作为序列化协议。 它的主要优点: 现代高性能轻量级 RPC 框架。 约定优先的 API 开发,默认使用 Protocol Buffers 作为描述语言,允许
目录 ASP.NET Core 3.0 使用gRPC ASP.NET Core 3.0 gRPC 双向流 ASP.NET Core 3.0 gRPC 认证授权 一.前言 在前一文 《ASP.NET Core 3.0 使用gRPC》中有提到 gRPC 支持双向流调用,支持实时推送消息,这也是 gRPC的一大特点,且 gRPC 在对双向流的控制支持上也是非常强大的。 二. 什么是 gRPC 流 gRP