Day189.Stream消息驱动、Sleuth分布式请求链路追踪 -SpringCloud

SpringCloud

十五、SpringCloud Stream消息驱动

消费驱动概述

是什么

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

官网:https://spring.io/projects/spring-cloud-stream#overview

在这里插入图片描述

在这里插入图片描述


API:https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/

中文指导手册:https://m.wang1314.com/doc/webapp/topic/20971999.html

设计思想

在这里插入图片描述


标准MQ:

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述

为什么用Cloud Stream?

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述


在这里插入图片描述


在这里插入图片描述


在这里插入图片描述


在这里插入图片描述


在这里插入图片描述


在这里插入图片描述

Spring Cloud Stream标准流程套路

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述


在这里插入图片描述


在这里插入图片描述

编码API和常用注解

在这里插入图片描述

案例说明

在这里插入图片描述


消息驱动之生产者

  1. 新建模块cloud-stream-rabbitmq-provider8801

  2. pom

    <dependencies>
        
        <!--stream rabbit -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        
        <!--eureka client-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--监控-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--热部署-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    
  3. yml

    server:
      port: 8801
    
    spring:
      application:
        name: cloud-stream-provider
      cloud:
        stream:
          binders: #在此处配置要绑定的rabbitmq的服务信息
            defaultRabbit: #表示定义的名称,用于binding整合
              type: rabbit #消息组件类型
              environment: #设置rabbitmq的相关环境配置
                spring:
                  rabbitmq:
                    host: 47.110.247.184  #RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
                    port: 5672
                    username: guest
                    password: guest
          bindings: #服务的整合处理
            output: #这个名字是一个通道的名称,MQ里面的目的地名字
              destination: studyExchange #表示要使用的Exchange名称定义
              content-type: application/json #设置消息类型,本次为json,本文要设置为“text/plain”
              binder: defaultRabbit #设置要绑定的消息服务的具体设置(爆红不影响使用,位置没错)
    
    eureka:
      client:
        service-url:
          defaultZone: http://localhost:7001/eureka
      instance:
        lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔(默认是30S)
        lease-expiration-duration-in-seconds: 5 #如果超过5S间隔就注销节点 默认是90s
        instance-id: send-8801.com #在信息列表时显示主机名称
        prefer-ip-address: true #访问的路径变为IP地址
    
    
  4. 主启动类

    @SpringBootApplication
    public class StreamMQMain8801 {
        public static void main(String[] args) {
            SpringApplication.run(StreamMQMain8801.class, args);
        }
    }
    
  5. 业务类

    在这里插入图片描述

    1. 新建service.IMessageProvider接口

      public interface IMessageProvider {
          public String send();
      }
      
    2. 在service下新建impl.IMessageProviderImpl实现类

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LFuOlDJU-1613029654716)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210211132356328.png)]

      下面需要在实现类上表名指定@EnableBinding(Source.class)

      import com.achang.springcloud.service.IMessageProvider;
      import org.springframework.cloud.stream.annotation.EnableBinding;
      import org.springframework.cloud.stream.messaging.Source;
      import org.springframework.messaging.MessageChannel;
      import org.springframework.messaging.support.MessageBuilder;
      
      import javax.annotation.Resource;
      import java.util.UUID;
      
      @EnableBinding(Source.class) //定义指定消息的推送管道(Source是spring的)
      public class IMessageProviderImpl implements IMessageProvider {
      
          @Resource
          private MessageChannel output;  //消息发送管道
      
          @Override
          public String send() {
              String serial = UUID.randomUUID().toString();
      
              //MessageBuilder是spring的integration.support.MessageBuilder
              output.send(MessageBuilder.withPayload(serial).build());
              System.out.println("====serial: " + serial);
              return null;
      
          }
      
      }
      
    3. 新建controller.SendMessageController

      @RestController
      public class sendMessageController {
      
          @Resource
          private IMessageProvider iMessageProvider;
      
          @GetMapping("/sendMessage")
          public String sendMessage(){
              return iMessageProvider.send();
          }
      
      }
      
  6. 测试
    启动7001,RabbitMQ,启动8801

    然后在RabbitMQ后台可以看到新生成的交换机(在yml定义的)

    在这里插入图片描述


    然后在浏览器输入:http://localhost:8801/sendMessage,多次刷新,后台打印的数据:

    在这里插入图片描述


    RabbitMQ后台:

    在这里插入图片描述

我以上操作都可以完成,但是idea报错:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5Ul39cYg-1613029654719)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210211135027368.png)]

但是交换机上的主题和访问都可以完成。


消息驱动之消费者

  1. 新建模块cloud-stream-rabbitmq-consumer8802

  2. pom

    <dependencies>
        <!--stream rabbit -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        
        
        <!--eureka client-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--监控-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--热部署-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    
  3. yml

    server:
      port: 8802
    
    spring:
      application:
        name: cloud-stream-consumer
      cloud:
        stream:
          binders: #在此处配置要绑定的rabbitmq的服务信息
            defaultRabbit: #表示定义的名称,用于binding整合
              type: rabbit #消息组件类型
              environment: #设置rabbitmq的相关环境配置
                spring:
                  rabbitmq:
                    host: 47.110.247.184  #RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
                    port: 5672
                    username: guest
                    password: guest
          bindings: #服务的整合处理
            input: #这个名字是一个通道的名称,指定从topic中收到的内容
              destination: studyExchange #表示要使用的Exchange名称定义
              content-type: application/json #设置消息类型,本次为json,本文要设置为“text/plain”
              binder: defaultRabbit #设置要绑定的消息服务的具体设置(爆红不影响使用,位置没错)
    
    eureka:
      client:
        service-url:
          defaultZone: http://localhost:7001/eureka
      instance:
        lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔(默认是30S)
        lease-expiration-duration-in-seconds: 5 #如果超过5S间隔就注销节点 默认是90s
        instance-id: receive-8802.com #在信息列表时显示主机名称
        prefer-ip-address: true #访问的路径变为IP地址
    
  4. 主启动类

    @SpringBootApplication
    public class StreamMQMain8802 {
        public static void main(String[] args) {
            SpringApplication.run(StreamMQMain8802.class,args);
        }
    }
    
  5. 新建controller.ReceiveMessageListenerController

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iie3mWWi-1613029654722)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210211141232758.png)]

    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Controller;
    
    @EnableBinding(Sink.class)
    @Controller
    public class ReceiveMessageListenerController {
    
        @Value("${server.port}")
        private String serverPort;
    
        @StreamListener(Sink.INPUT)//通过这个注解,监听Sink的输入,上面的图
        public void input(Message<String> message){
            System.out.println("消费者1号------> 收到的消息: "+message.getPayload()+ "\t port: "+serverPort);
        }
    
    }
    
  6. 测试
    启动7001,8801,8802

http://localhost:8801/sendMessage(8801发送消息)

在这里插入图片描述


8802接收到消息:

在这里插入图片描述


分组消费与持久化

在这里插入图片描述


按照8802,新建8803。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6od25oiU-1613029654726)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210211143649129.png)]


消费

在这里插入图片描述


在这里插入图片描述


8801发送后,8802和8803都能接收到数据(重复消费);正常情况下应该是有一个消费者消费了8801的消息后,另外的其他消费者就不能消费。这里要把8802和8803看成一个集群,如果8802和8803都接收到了,就都会去做业务,然后本来8801只想让这个集群来做一次消费的,就会变成每个消费者都来消费一次。这是因为8802和8803不是在同一个组(队列)里,不同组可以重复消费,而同一个组里,只有一个消费者能消费,所以需要对消费者进行分组,把所有相同的消费者分到一个组里。(主题会给每个队列发送消息,而每个队列只有一个消费者可以获得消息(同组广播,不同组轮询))

在这里插入图片描述

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述

在这里插入图片描述


分组(队列)

在这里插入图片描述

设置不同分组

在这里插入图片描述


修改8802的yml

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0m8SN1Jc-1613029654728)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210211145155072.png)]

修改8803的yml

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-j9lFy9Qq-1613029654731)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210211145208773.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JkOvP6L7-1613029654732)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210211145322237.png)]

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述

结论:

在这里插入图片描述


设置相同分组

在这里插入图片描述


修改8803的yml中group为achangA,然后重启8003。

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述


在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

将 _广播模型 变 为工作模型_


持久化

在这里插入图片描述

  1. 停掉8802和8803,去掉8802的group: angeninA
  2. 然后8801发送4条消息。

    在这里插入图片描述

  3. 启动8802,8802并没有去拿取消息。(因为8802去掉了group: achangA,所以启动后会再新建一个队列)

    在这里插入图片描述

  4. 启动8803,启动后获取到8801的消息。(因为8803没删除group: achangA,achangA队列是在8801发送消息前存在的,所以当8803停机后再启动,就可以获取到停机时8801发送的信息(如果此时同组(队列)里有别的消费者,那么消息会被别的消费者消费掉))

    在这里插入图片描述


十六、SpringCloud Sleuth分布式请求链路追踪

在这里插入图片描述


在这里插入图片描述


https://github.com/spring-cloud/spring-cloud-sleuth

https://cloud.spring.io/spring-cloud-sleuth/reference/html/

在这里插入图片描述


搭建链路监控步骤

在这里插入图片描述


在这里插入图片描述

下载jar包:http://dl.bintray.com/openzipkin/maven/io/zipkin/java/zipkin-server/

在这里插入图片描述


在这里插入图片描述

下载完后,cmd命令行终端jar包的目录里,然后输入:java -jar zipkin-server-2.12.9-exec.jar运行。

在这里插入图片描述

浏览器输入:http://localhost:9411/zipkin/

在这里插入图片描述


原理

在这里插入图片描述


在这里插入图片描述

单链路

在这里插入图片描述


服务提供者cloud-provider-payment8001

  1. 在pom中添加:

       <!--包含了sleuth+zipkin-->
       <dependency>
           <groupId>org.springframework.cloud</groupId>
           <artifactId>spring-cloud-starter-zipkin</artifactId>
       </dependency>
    
  2. 在yml中添加:

    spring:
     ...
     
     zipkin:
        base-url: http://localhost:9411 # 指定zipkin的url
     sleuth:
        sampler:
          probability: 1  #采样率值介于0到1之间,1则表示全部采集(一般不为1,不然高并发性能会有影响)
    

在这里插入图片描述

  1. 在PaymentController中添加:

       @GetMapping("/payment/zipkin")
       public String paymentZipkin(){
           return "paymentZipkin...";
       }
    

服务消费者cloud-consumer-order80

  1. 在pom中添加(和提供者一样)

  2. 在yml中添加(和提供者一样)

  3. 在OrderController中添加:

        @GetMapping("/consumer/payment/zipkin")
        public String paymentZipkin(){
            String result = restTemplate.getForObject("http://localhost:8001" + "/payment/zipkin", String.class);
            return result;
        }
    

测试

启动7001,8001,80。

浏览器输入:http://localhost/consumer/payment/zipkin

在这里插入图片描述

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述

原文地址:https://blog.csdn.net/qq_43284469/article/details/113790579

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Nacos 中的参数有很多,如:命名空间、分组名、服务名、保护阈值、服务路由类型、临时实例等,那这些参数都是什么意思?又该如何设置?接下来我们一起来盘它。 1.命名空间 在 Nacos 中通过命名空间(Namespace)+ 分组(Group)+服务名(Name)可以定位到一个唯一的服务实例。 命名
Nacos 支持两种 HTTP 服务请求,一个是 REST Template,另一个是 Feign Client。之前的文章咱们介绍过 Rest Template 的调用方式,主要是通过 Ribbon(负载均衡) + RestTemplate 实现 HTTP 服务调用的,请求的核心代码是这样的: @
Nacos 是 Spring Cloud Alibaba 中一个重要的组成部分,它提供了两个重要的功能:服务注册与发现和统一的配置中心功能。 服务注册与发现功能解决了微服务集群中,调用者和服务提供者连接管理和请求转发的功能,让程序的开发者无需过多的关注服务提供者的稳定性和健康程度以及调用地址,因为这
Spring Cloud Alibaba 是阿里巴巴提供的一站式微服务开发解决方案,目前已被 Spring Cloud 官方收录。而 Nacos 作为 Spring Cloud Alibaba 的核心组件之一,提供了两个非常重要的功能:服务注册中心(服务注册和发现)功能,和统一配置中心功能。 Nac
在 Nacos 的路由策略中有 3 个比较重要的内容:权重、保护阈值和就近访问。因为这 3 个内容都是彼此独立的,所以今天我们就单独拎出“保护阈值”来详细聊聊。 保护阈值 保护阈值(ProtectThreshold):为了防止因过多实例故障,导致所有流量全部流入剩余健康实例,继而造成流量压力将剩余健
前两天遇到了一个问题,Nacos 中的永久服务删除不了,折腾了一番,最后还是顺利解决了。以下是原因分析和解决方案,建议先收藏,以备不时之需。 临时实例和持久化实例是 Nacos 1.0.0 中新增了一个特性。临时实例和持久化实例最大的区别是健康检查的方式:临时实例使用客户端主动上报的健康检查模式,而
Spring Cloud Alibaba 技术体系中的 Nacos,提供了两个重要的功能:注册中心(服务注册与发现)功能和配置中心功能。 其中注册中心解决了微服务调用中,服务提供者和服务调用者的解耦,让程序开发者可以无需过多的关注服务提供者和调用者的运行细节,只需要通过 Nacos 的注册中心就可以
负载均衡通器常有两种实现手段,一种是服务端负载均衡器,另一种是客户端负载均衡器,而我们今天的主角 Ribbon 就属于后者——客户端负载均衡器。 服务端负载均衡器的问题是,它提供了更强的流量控制权,但无法满足不同的消费者希望使用不同负载均衡策略的需求,而使用不同负载均衡策略的场景确实是存在的,所以客
本篇文章为大家展示了如何解决Spring Cloud 服务冲突问题,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。一、背景...
本篇内容主要讲解“spring cloud服务的注册与发现怎么实现”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“spri...
本篇内容介绍了“Dubbo怎么实现Spring Cloud服务治理 ”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处...
本篇内容主要讲解“SpringCloud相关面试题有哪些”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“SpringCloud相...
如何分析Spring Cloud Ribbon、Spring Cloud Feign以及断路器,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希
这篇文章主要讲解了“springcloud微服务的组成部分有哪些”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“s...
这篇文章主要讲解了“SpringCloud的OpenFeign项目怎么创建”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习...
本篇内容主要讲解“spring cloud oauth3整合JWT后获取用户信息不全怎么办”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带...
怎样解析微服务架构SpringCloud,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。...
这篇文章主要介绍spring cloud中API网关的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!一、服务网关简介1、外观模式客户端...
本篇内容介绍了“Spring Cloud微服务的相关问题有哪些”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处...
本文小编为大家详细介绍“spring cloud config整合gitlab如何搭建分布式的配置中心”,内容详细,步骤清晰,细节处理妥当,希望这篇“spring cloud config整合gi...