概述
Dubbo 官网对 Dubbo 的描述:
Apache Dubbo 是一款微服务开发框架,它提供了 RPC通信 与 微服务治理
两大关键能力。这意味着,使用 Dubbo
开发的微服务,将具备相互之间的远程发现与通信能力, 同时利用 Dubbo
提供的丰富服务治理能力,可以实现诸如服务发现、负载均衡、流量调度等服务治理诉求。同时
Dubbo
是高度可扩展的,用户几乎可以在任意功能点去定制自己的实现,以改变框架的默认行为来满足自己的业务需求。
Dubbo 目前有两个大的版本:2.x 和 3.0,2.x
版本也是我们在项目中使用的版本,这是一个同步的 Request-Response 模型的
RPC 框架,而 3.0 版本通过引入 triple 协议,支持了更多的通信模型:
- 消费端异步请求(Client Side Asynchronous Request-Response)
- 提供端异步执行(Server Side Asynchronous Request-Response)
- 消费端请求流(Request Streaming)
- 提供端响应流(Response Streaming)
- 双向流式通信(Bidirectional Streaming)
Dubbo 官网将 triple 定义为下一代 RPC 通信协议,是这样描述的:
定义了全新的 RPC 通信协议 – Triple,一句话概括 Triple:它是基于
HTTP/2 上构建的 RPC 协议,完全兼容
gRPC,并在此基础上扩展出了更丰富的语义。 使用 Triple
协议,用户将获得以下能力:
- 更容易到适配网关、Mesh架构,Triple 协议让 Dubbo
更方便的与各种网关、Sidecar 组件配合工作。
- 多语言友好,推荐配合 Protobuf 使用 Triple 协议,使用 IDL
定义服务,使用 Protobuf 编码业务数据。
- 流式通信支持。Triple 协议支持 Request Stream、Response
Stream、Bi-direction Stream。
从同步/异步和通信模型两个维度来总结:
- Dubbo 2.x 是一个同步的、只支持 Request-Response 模型的 RPC
框架;
- Dubbo 3.0 支持异步、支持 Request Stream、Response
Stream、Bi-direction Stream 多种通信模型的下一代 RPC 通信框架。
后面以一个 Java 实例来演示下它的功能。
Dubbo 服务模型
一个简化模型
在 Dubbo 服务模型中有几个重要的概念:
- Invoker:它是 Dubbo
的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可以向它发起调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现;在客户端,它作为服务的调用方,向服务端发起调用,在服务端,它封装了服务实现类,代表了最终的服务提供方;
- Exporter:它封装了服务端的 Invoker 对象,将服务发布出去;
- Protocol: 它负责 Exporter 和 Invoker 对象的生命周期管理;
- Invocation:它持有调用过程中的变量,比如方法名,参数等。
一个简化的模型如下图所示:
发布服务
在服务端,将服务实现封装成为一个 Invoker 对象,通过 Exporter
发布出去。
引用服务
在客户端,将一次远程 RPC 调用转化为本地 Invoker 对象的调用,再通过
Invoker 对象发起远程的调用。
Java Dubbo 实例
引入依赖
Triple 协议依赖 Protocol Buffers 及 gRPC, Protocol Buffers
用来作为数据的序列化及服务的定义,在 Java 实例里没有使用 Protocol
Buffers 来进行服务定义,如果是其它语言,官方建议使用 Protocol Buffers
作为 IDL 来描述服务,从而获得跨平台的能力。另外,Dubbo 扩展了 grpc-java
代码生成插件,从这点来看,Dubbo Triple 协议是否也可以理解为 gRPC
协议的扩展?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
| <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.9</maven.compiler.source> <maven.compiler.target>1.9</maven.compiler.target> <source.level>1.9</source.level> <target.level>1.9</target.level> <dubbo.version>3.0.4</dubbo.version> <grpc.version>1.40.1</grpc.version> <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version> <maven-failsafe-plugin.version>2.21.0</maven-failsafe-plugin.version> <protoc.version>3.7.1</protoc.version> <dubbo.compiler.version>0.0.4-SNAPSHOT</dubbo.compiler.version> </properties>
<dependencies>
<dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo</artifactId> <version>${dubbo.version}</version> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.14.0</version> </dependency> <dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo-dependencies-zookeeper</artifactId> <version>${dubbo.version}</version> <type>pom</type> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-all</artifactId> <version>${grpc.version}</version> </dependency>
<dependency> <groupId>org.testcontainers</groupId> <artifactId>testcontainers</artifactId> <version>1.12.3</version> <scope>test</scope> </dependency> </dependencies>
<build> <extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> <version>1.6.1</version> </extension> </extensions> <plugins> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.6.1</version> <configuration> <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact> <pluginId>grpc-java</pluginId> <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact> <protocPlugins> <protocPlugin> <id>dubbo</id> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo-compiler</artifactId> <version>${dubbo.compiler.version}</version> <mainClass>org.apache.dubbo.gen.dubbo.Dubbo3Generator</mainClass> </protocPlugin> </protocPlugins> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>test-compile</goal> <goal>compile-custom</goal> <goal>test-compile-custom</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>${maven-compiler-plugin.version}</version> <configuration> <source>${source.level}</source> <target>${target.level}</target> </configuration> </plugin> </plugins> </build>
|
接口定义
接口定义分为两个部分,一个是接口描述,另外一个是请求及响应参数描述。在这里,接口使用
Java 语言描述,而参数使用 Protocol Buffers 来描述。
接口描述 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public interface IGreeter {
HelloReply sayHello(HelloRequest request);
StreamObserver<HelloRequest> bidiHello(StreamObserver<HelloReply> replyStream);
void lotsOfReplies(HelloRequest request, StreamObserver<HelloReply> replyStream);
StreamObserver<HelloRequest> lotsOfGreetings(StreamObserver<HelloReply> replyStream); }
|
参数描述 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| syntax = "proto3";
option java_multiple_files = true; option java_package = "org.apache.dubbo.hello"; option java_outer_classname = "HelloWorldProto"; option objc_class_prefix = "HLW";
package helloworld;
message HelloRequest { string name = 1; }
message HelloReply { string message = 1; }
|
执行 mvn package 生成代码。
客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
| class ApiConsumer { private final IGreeter delegate;
ApiConsumer() { ReferenceConfig<IGreeter> ref = new ReferenceConfig<>(); ref.setInterface(IGreeter.class); ref.setCheck(false); ref.setInterface(IGreeter.class); ref.setCheck(false); ref.setProtocol(CommonConstants.TRIPLE); ref.setLazy(true); ref.setTimeout(100000);
ApplicationConfig applicationConfig = new ApplicationConfig("demo-consumer"); applicationConfig.setQosPort(33333);
DubboBootstrap bootstrap = DubboBootstrap.getInstance(); bootstrap.application(applicationConfig) .registry(new RegistryConfig(TriSampleConstants.ZK_ADDRESS)) .reference(ref) .start();
this.delegate = ref.get(); }
public void lotsOfReplies() { delegate.lotsOfReplies(HelloRequest.newBuilder() .setName("allen") .build(), new StdoutStreamObserver<>("serverStream")); }
public void lotsOfGreetings() { final StreamObserver<HelloRequest> request = delegate.lotsOfGreetings(new StdoutStreamObserver<>("lotsOfGreetings")); for (int i = 0; i < 10; i++) { request.onNext(HelloRequest.newBuilder() .setName("allen") .build()); } request.onCompleted(); }
public void bidiHello() { final StreamObserver<HelloRequest> request = delegate.bidiHello(new StdoutStreamObserver<>("stream")); for (int i = 0; i < 10; i++) { request.onNext(HelloRequest.newBuilder() .setName("allen") .build()); } request.onCompleted(); }
public void sayHello() { try { final HelloReply reply = delegate.sayHello(HelloRequest.newBuilder() .setName("allen") .build()); TimeUnit.SECONDS.sleep(1); System.out.println("Reply:" + reply); } catch (Throwable t) { t.printStackTrace(); } }
public static void main(String[] args) throws IOException { final ApiConsumer consumer = new ApiConsumer(); System.out.println("dubbo triple consumer started");
consumer.sayHello(); consumer.bidiHello(); consumer.lotsOfReplies(); consumer.lotsOfGreetings();
System.in.read(); }
}
|
服务端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
| public class Greeter1Impl implements IGreeter { @Override public HelloReply sayHello(HelloRequest request) {
return HelloReply.newBuilder() .setMessage(request.getName()) .build(); }
public HelloReply sayHelloException(HelloRequest request) { RpcContext.getServerContext().setAttachment("str", "str") .setAttachment("integer", 1) .setAttachment("raw", new byte[]{1, 2, 3, 4}); throw new RuntimeException("Biz Exception"); }
@Override public StreamObserver<HelloRequest> bidiHello(StreamObserver<HelloReply> replyStream) { return new StreamObserver<HelloRequest>() { @Override public void onNext(HelloRequest data) { replyStream.onNext(HelloReply.newBuilder() .setMessage(data.getName()) .build()); }
@Override public void onError(Throwable throwable) { throwable.printStackTrace(); replyStream.onError(new IllegalStateException("Stream err")); }
@Override public void onCompleted() { replyStream.onCompleted(); } }; }
@Override public void lotsOfReplies(HelloRequest request, StreamObserver<HelloReply> replyStream) { for (int i = 0; i < 10; i++) { replyStream.onNext(HelloReply.newBuilder() .setMessage(request.getName()) .build()); } replyStream.onCompleted(); }
@Override public StreamObserver<HelloRequest> lotsOfGreetings(StreamObserver<HelloReply> replyStream) { StdoutStreamObserver stdoutStreamObserver = new StdoutStreamObserver("lotsOfGreetings");
return new StreamObserver<HelloRequest>() { @Override public void onNext(HelloRequest data) { stdoutStreamObserver.onNext(data.getName()); }
@Override public void onError(Throwable throwable) { throwable.printStackTrace(); stdoutStreamObserver.onError(new IllegalStateException("Stream err")); }
@Override public void onCompleted() { HelloReply reply = HelloReply.newBuilder().setMessage("completed,lotsOfGreetings").build();
replyStream.onNext(reply); replyStream.onCompleted();
stdoutStreamObserver.onCompleted(); } }; } }
class ApiProvider { public static void main(String[] args) { new EmbeddedZooKeeper(TriSampleConstants.ZK_PORT, false).start();
ServiceConfig<IGreeter> service = new ServiceConfig<>(); service.setInterface(IGreeter.class); service.setRef(new Greeter1Impl()); service.setToken(true);
DubboBootstrap bootstrap = DubboBootstrap.getInstance(); bootstrap.application(new ApplicationConfig("demo-provider")) .registry(new RegistryConfig(TriSampleConstants.ZK_ADDRESS)) .protocol(new ProtocolConfig(CommonConstants.TRIPLE, TriSampleConstants.SERVER_PORT)) .service(service) .start() .await();
} }
|
总结
可以发现,不管在客户端还是服务端,对于异步和 stream 操作都是通过
StreamObserver 对象来实现,它跟 gRPC 中的是一致,用过 gRPC
的话,可以无缝衔接 Dubbo 的 Triple,使用方式及相关的类都是一样的。
参考:
1. dubbo 官方文档