【分布式-5】dubbo

article/2024/5/23 1:58:54

基础使用就不说了,看一下dubbo的SPI,以及基于SPI的一些常用技术。在此之前先看看jdk中的spi。

jdk的SPI:

jdk内置的一种服务发现机制,用法:在META-INF/service下创建一个文件,名称是接口全限定名,内容是实现类全限定名,通过ServiceLoader加载到jvm,实现类须有无参构造。 

示例:

public interface ITest {void saySomething();
}
public class ITestImpl1 implements ITest {public void saySomething() {System.out.println("Hi, mia.");}
}
public class ITestImpl2 implements ITest {@Overridepublic void saySomething() {System.out.println("Hello, world.");}
}

测试:使用ServiceLoader加载所有的扩展点(接口实现类),迭代选择想要那一个 

import java.util.Iterator;
import java.util.ServiceLoader;public class TestServiceLoader {public static void main(String[] args) {ServiceLoader<ITest> serviceLoader = ServiceLoader.load(ITest.class);Iterator<ITest> iTests = serviceLoader.iterator();while (iTests.hasNext()) {ITest iTest = iTests.next();System.out.printf("loading %s\n", iTest.getClass().getName());iTest.saySomething();}}
}

存在问题:

jdk的SPI会一次加载所有扩展点,包括不用的,浪费资源,有一个加载失败,所有的都不能用;也不能动态选择想要的实现,只能迭代器遍历。

dubbo的SPI:

因为jdk spi存在的问题,dubbo实现了自己的spi,可以动态选择想要的扩展点。

  • 普通使用(像jdk一样加载所有的遍历):

各项目导入dubbo依赖

接口HelloService,标注@SPI

@SPI
public interface HelloService {String  sayHello();
}

接口实现类:

public class DogHelloService implements HelloService{@Overridepublic String sayHello() {return "wang wang";}
}public class HumanHelloService implements HelloService{@Overridepublic String sayHello() {return "hello 你好";}
}

在resource目录下创建META-INF/dubbo目录,新建文件.(这里可以配置key,方便动态加载; 如果只是像此例中的普通使用方式,可以不要配置)

使用:

public class DubboSpiMain {public static void main(String[] args) {// 获取扩展加载器ExtensionLoader<HelloService>  extensionLoader  = ExtensionLoader.getExtensionLoader(HelloService.class);// 遍历所有的支持的扩展点 META-INF.dubboSet<String>  extensions = extensionLoader.getSupportedExtensions();for (String extension : extensions){String result = extensionLoader.getExtension(extension).sayHello();System.out.println(result);}}
}

当然,对于上面的普通使用来说,也和jdk中一样,只能加载出全部的,然后遍历。可以石红Adaptive动态选择想要的扩展点。

  • Adaptive动态加载

使用Adaptive,需要在接口方法上添加Adaptive注解,并配合org.apache.dubbo.common.URL参数的方式实现动态选择,如下面第二个方法:

@SPI()
public interface HelloService {String  sayHello();@AdaptiveString  sayHello(URL  url);
}
public class HumanHelloService implements HelloService{@Overridepublic String sayHello() {return "hello 你好";}@Overridepublic String sayHello(URL url) {return  "hello url";}
}
public class DogHelloService implements HelloService{@Overridepublic String sayHello() {return "wang wang";}@Overridepublic String sayHello(URL url) {return "wang url";}
}
public class DubboAdaptiveMain {public static void main(String[] args) {URL   url  = URL.valueOf("test://localhost/hello?hello.service=dog");HelloService  adaptiveExtension = ExtensionLoader.getExtensionLoader(HelloService.class).getAdaptiveExtension();String  msg = adaptiveExtension.sayHello(url);System.out.println(msg);}
}

如上,getAdaptiveExtension()可以动态选择扩展点。 URL根据业务来写(乱写也没关系),但是参数hello.service=dog不能乱写,hello.service就是接口HelloService名字的驼峰变种,dog就是META-INF/dubbo目录下,接口配置文件中指定的key。

如果url中没有指定hello.service参数,那需要在接口的注解@SPI指定默认值:@SPI("dog")。  

  • @Activate

学习了Adaptive的动态选择,感觉确实方便了一些。 不过也会发现,每一次只能主动手写代码,去选择一个扩展点。 如果有多个扩展点都想使用呢? @Activate注解可以同时激活多个扩展点(当多个扩展点都满足@Activate中指定的条件时,都能使用),该注解有3个选项:

  1. group分组(筛选条件):比如指定扩展点在提供方还是消费方使用
  2. key值(筛选条件):注解中key值指定后,通常也是主动选择时使用,在url中指定相同的参数。
  3. 排序:多个扩展点满足时使用顺序。

在dubbo的过滤器中,就是使用的这个技术, 多个拦截器可以同时被激活,下面自定义一个拦截器:

@Activate(group = {CommonConstants.CONSUMER,CommonConstants.PROVIDER})
public class DubboInvokeFilter   implements Filter {@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {long   startTime  = System.currentTimeMillis();try {// 执行方法return  invoker.invoke(invocation);} finally {System.out.println("invoke time:"+(System.currentTimeMillis()-startTime) + "毫秒");}}
}

然后在META-INF/dubbo下配置好。

当定义了group的范围是消费方和提供方,那么业务方法被调用时,consumer和provider的项目中,都会打印方法执行时间。

下面再介绍几个dubbo中的技术:

负载均衡:

dubbo默认实现了几种负载均衡,如随机(默认),轮询,一致性hash… 它也是使用的spi技术,配置方式可以在方法,接口或者全局配置文件; 可以在客户端配置,也可以在服务端配置。

  1. 方法级优先,接口级次之,全局配置再次之。
  2. 如果级别一样,则消费方优先,提供方次之。

//在服务消费者一方配置负载均衡策略

@Reference(check = false,loadbalance = "random")

 

//在服务提供者一方配置负载均衡

@Service(loadbalance = "random")

public class HelloServiceImpl implements HelloService {

        public String sayHello(String name) {

                return "hello " + name;

        }

}

如果要自定义,只需实现org.apache.dubbo.rpc.cluster.LoadBalance即可,并以spi方式注入;

  • 异步调用

配置:

注解:@Reference(methods = {@Method(name = "dsd",async = true)}); xml:<dubbo:reference id="helloService" interface="com.lagou.service.HelloService"> 
<dubbo:method name="sayHello" async="true" /> 
</dubbo:reference> 

异步调用时,可以用RpcContext.getContext().getFuture()获取结果

  • 线程池

fix:   表示创建固定大小的线程池。也是Dubbo默认的使用方式,默认创建的执行线程数为200,并且是没有任何等待队列的。所以再极端的情况下可能会存在问题,比如某个操作大量执行时,可能存在堵塞的情况。

cache:  创建非固定大小的线程池,当线程不足时,会自动创建新的线程。但是使用这种的时候需要注意,如果突然有高TPS的请求过来,方法没有及时完成,则会造成大量的线程创建,对系统的CPU和负载都是压力,执行越多反而会拖慢整个系统。

自定义线程池: 真实使用时,可能使用比较多的是fix,但是当发生线程池满了,产生问题时去查看,可能就有点晚了。   所以在创建线程池时,通过某些手段进行监控,可以提前预警。 如下面的自定义线程池:

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Map;
import java.util.concurrent.*;public class WachingThreadPool  extends FixedThreadPool  implements  Runnable{private  static  final Logger  LOGGER = LoggerFactory.getLogger(WachingThreadPool.class);// 定义线程池使用的阀值,达到90%就报警private  static  final  double  ALARM_PERCENT = 0.90;private  final Map<URL, ThreadPoolExecutor>    THREAD_POOLS = new ConcurrentHashMap<>();public  WachingThreadPool(){// 当前类既是一个线程池,也是一个线程任务。// 在构造函数中创建一个单线程的线程池A(不是自己),然后将自己作为任务,提交到线程池A。// 线程池A每隔3秒就会执行提交的任务,也就是执行此类(自己)的run方法// run方法就会打印自身这个线程池中的线程情况Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this,1,3, TimeUnit.SECONDS);}// 通过父类创建线程池,项目启动时会调用@Overridepublic Executor getExecutor(URL url) {final  Executor executor = super.getExecutor(url);if(executor instanceof  ThreadPoolExecutor){THREAD_POOLS.put(url,(ThreadPoolExecutor)executor);}return  executor;}@Overridepublic void run() {// 遍历线程池for (Map.Entry<URL,ThreadPoolExecutor> entry: THREAD_POOLS.entrySet()){final   URL  url = entry.getKey();final   ThreadPoolExecutor  executor = entry.getValue();// 计算相关指标final  int  activeCount  = executor.getActiveCount();final  int  poolSize = executor.getCorePoolSize();double  usedPercent = activeCount / (poolSize*1.0);LOGGER.info("线程池执行状态:[{}/{}:{}%]",activeCount,poolSize,usedPercent*100);if (usedPercent > ALARM_PERCENT){LOGGER.error("超出警戒线! host:{} 当前使用率是:{},URL:{}",url.getIp(),usedPercent*100,url);}}}
}

定义好线程池后,做SPI声明,创建文件 :

META-INF/dubbo/org.apache.dubbo.common.threadpool.ThreadPool

内容:watching=包名.线程池名

在provider中引入此模块,并配置使用该线程池。

dubbo.provider.threadpool=watching

在consumer中调用provider的方法时,就能使用该线程池了

问题来了,创建线程池时候的URL是什么: 

上面知道了怎么创建线程池,可是创建线程池的时候,需要一个参数URL。把FixedThreadPool的代码贴出来:

public class FixedThreadPool implements ThreadPool {public FixedThreadPool() {}public Executor getExecutor(URL url) {String name = url.getParameter("threadname", "Dubbo");int threads = url.getParameter("threads", 200);int queues = url.getParameter("queues", 0);return new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, (BlockingQueue)(queues == 0 ? new SynchronousQueue() : (queues < 0 ? new LinkedBlockingQueue() : new LinkedBlockingQueue(queues))), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));}
}

可以看到,url中可以指定线程名称和核心线程数,否则使用默认值。   那么这个url到底是什么?

URL主要包含以下内容:

  • protocol: 协议,一般像我们的 provider 或者 consumer 在这里都是人为具体的协议
  • host: 当前 provider 或者其他协议所具体针对的地址,比较特殊的像 override 协议所指定的
  • host就是 0.0.0.0 代表所有的机器都生效
  • port: 和上面相同,代表所处理的端口号
  • path: 服务路径,在 provider 或者 consumer 等其他中代表着我们真实的业务接口
  • key=value: 这些则代表具体的参数,这里我们可以理解为对这个地址的配置。比如我们 provider中需要具体机器的服务应用名,就可以是一个配置的方式设置上去。

既然path等有关(也就是具体的接口),那我们的provider服务中有很多接口,难不成每个URL对应创建一个线程池? 那不得炸了啊………对于这个问题我纠结了很久,网上也没找到相关的说法(可能要慢慢拔源码吧,但是我还没看)。 于是我做了测试:

  1. 在项目中增加了几个service接口和实现,并且每个service有几个方法。  
  2. 在WachingThreadPool创建线程池的方法 getExecutor(URL url) 中,打印出url。 看看provider项目启动时,用哪个url创建的线程池。
  3. 在WachingThreadPool的 run方法中,也打印url。 看看consumer端调用时,使用的url与 上一步启动时的url,有何差别?

结果发现,启动时的url里面的接口的信息,就是几个service中的其中一个(就像是随机选择的),并没有一个service对应创建一个线程池。  另外,consumer端调用时,即使调用了另一个service的方法,在run方法中打印出来的URL,和项目启动创建线程池时候的URL一模一样(连时间戳都一样)。   如下:

dubbo://10.128.7.87:20885/com.test.service.HelloService?anyhost=true&application=dubbo-demo-annotation-provider&bind.ip=10.128.7.87&bind.port=20885&channel.readonly.sent=true&codec=dubbo&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&heartbeat=60000&interface=com.test.service.HelloService&methods=sayHello&pid=1468&release=2.7.5&side=provider&threadname=DubboServerHandler-10.128.7.87:20885&threadpool=watching&timestamp=1679991335736

结论:

虽然项目启动创建线程池的时候,和URL有关,并且URL中附带了某个接口,参数,或者其他一些信息。  但它不会创建多个,整个项目会共用一个线程池。   至于说,此url中带的接口和方法等信息,我感觉是随机选择的。(具体我不知道按什么规则选的,有大神知道可以说一下)

  • 服务降级 

降级是防止分布式服务发生雪崩效应,什么是雪崩?就是蝴蝶效应,当一个请求发生超时,一直等待着服务响应,那么在高并发情况下,很多请求都是因为这样一直等着响应,直到

服务资源耗尽产生宕机,而宕机之后会导致分布式其他服务调用该宕机的服务也会出现资源耗尽宕机, 这样下去将导致整个分布式服务都瘫痪,这就是雪崩。

dubbo的降级配置方式有两种:

1、屏蔽降级:mock=force:return null,不会真正去调用接口,直接返回null。

2、容错降级:mock=fail:return null,会调用接口,失败后返回null。

(mock=return null,默认为第二种)

配置的地方:

1、管理端配置,如下:

 2、xml中,比如在某个接口的调用里面配置:

<dubbo:reference id="xxService" check="false" interface="com.xx.XxService"

timeout="3000" mock="return null" />

3、注解中

@Reference(mock="force:return null")    或者   @Reference(mock="return null")

注意,除了返回null,也可以返回其他指定的默认值。


http://www.ngui.cc/article/show-1020401.html

相关文章

Executor执行器

Executor接口有两个实现&#xff0c;BaseExecutor和CachingExecutor&#xff08;装饰者模式&#xff0c;二级缓存时候用到&#xff09;。 其中BaseExecutor有四个子类&#xff1a; 1.SimpleExecutor&#xff1a;简单类型的执行器 2.ReuseExecutor&#xff1a;可重用的执行器…

@Linux搭建LDAP认证服务

文章目录1.Ldap概述2.Ldap的用途3.Ldap的数据模型1》数据模型叙述2》目录树简述3》Ldap目录树说明2.搭建LDAP服务器1》环境准备2》Ldap服务搭建3.Ldap Web-UI安装1》安装PHPLDAPAdmin1.Ldap概述 LDAP(Light Directory Access Portocol)轻量级目录访问协议&#xff1a; LDAP是一…

minimax定理证明

本文目录 本文目录证明minimax定理概述 主要使用布劳威尔不动点定理主要使用哈恩-巴拿赫定理主要使用海涅-博雷尔定理 参考文献 #minimax定理成立与纳什均衡的等价性 选取 p∗p^*p∗ 使得 min⁡qU(p∗,q)max⁡pmin⁡qU(p,q)\min\limits_q U(p^*,q) \max\limits_p \min\limit…

汽车车灯的发展趋势

汽车车灯的发展和光源的发展也是息息相关的,光源从煤油灯——》乙炔灯——》白炽灯——》卤素灯——》放电灯——》LED灯——》激光灯——》像素化光源,可以说一步步的越来越进步, 而车灯的发展也从白炽灯 ——》卤素灯——》氙气灯——》LED灯。比如有一款车使用的Matrix矩…

Django DRF - 反序列化的验证

前言 使用序列化器进行反序列化时&#xff0c;需要对数据进行验证后&#xff0c;才能获取验证成功的数据或保存成模型类对象。在获取反序列化的数据前&#xff0c;必须调用is_valid()方法进行验证&#xff0c;验证成功返回True&#xff0c;否则返回False。验证失败&#xff0c…

[LeetCode] 142. 环形链表 II

题目&#xff1a;给定一个链表的头节点 head &#xff0c;返回链表开始入环的第一个节点。 如果链表无环&#xff0c;则返回 null。 如果链表中有某个节点&#xff0c;可以通过连续跟踪 next 指针再次到达&#xff0c;则链表中存在环。 为了表示给定链表中的环&#xff0c;评…

有限状态机设计及function、task对比

有限状态机的设计 作用—描述一个规律的状态变化与对应输出 状态机类型 mearly型&#xff1a;输出由当前状态和输入共同决定moore型&#xff1a;输出只与当前状态有关 设计步骤 根据设计需要&#xff0c;确定各状态与状态转移关系—状态转换图的设计采取合理的格式进行代码编…

算法刷题打卡037 | 动态规划5

LeetCode 1049 最后一块石头的重量II 题目链接&#xff1a;1049. 最后一块石头的重量 II - 力扣&#xff08;Leetcode&#xff09; 看题目首先想到的是将所有石头放到一个有序集合里&#xff0c;不断取出两块重量接近的石头两相抵消&#xff0c;剩余部分放入集合中继续重复“…

性能分析调优摘要

性能分析调优 00_总论 性能调优的常规手段 空间换时间 时间换空间 分而治之 异步处理 并行 离用户更近一点 一切可拓展,业务模块化,服务化,良好的水平扩展能力 01_性能分析方法 可以抽象为两大类 自底向上: 通过监控硬件即操作系统性能指标来分析性能问题 自顶向下:…

【极客技术】ColossalChat用完整RLHF技术克隆ChatGPT的开源解决方案

原文&#xff1a;ColossalChat: An Open-Source Solution for Cloning ChatGPT With a Complete RLHF Pipeline​​​​​​​ 作者&#xff1a;Yang You&#xff0c;新加坡国立大学青年教授。他在加州大学伯克利分校获得计算机科学博士学位。 ColossalChat:一个用完整RLHF管道…