概述 RSocket 官网对 RSocket 定义:
RSocket is a binary protocol for use on byte stream transports such as TCP, WebSockets, and Aeron.RSocket provides a protocol for Reactive Streams semantics between client-server, and server-server communication.
RSocket 是一个构建在字节流传输协议,如 TCP, WebSocket 和 Aeron(UDP) 之上的二进制协议,同时,它也是一个为client-server,server-server 之间通信提供了反应式流语义的协议。本质上来说,RSocket 是基于反应式编程的一个二进制协议。一个反应式系统应该具备如下特征:
Responsive: 只要有可能,系统就会及时地作出反应;
Resilient: 系统出现 failure 时仍然保持响应性,并能够独立恢复;
Elastic: 系统在不断变化的工作负载之下依然能保持即时响应性;
Message Driven: 异步非阻塞消息驱动架构。
RSocket 作为一个应用层的协议,提供了丰富的功能:
Multiplexed, Binary Protocol:多路复用的二进制协议;
Bidirectional Streaming: 双向流;
Flow Control: 流控制;
Socket Resumption: 连接恢复;
Message passing: 消息传递模型;
Transport independent: 与传输层解耦的应用层协议。
在这篇文章,主要讲述与流相关的功能。
Reactive Streams Flux & Mono 在 RSocket 中,使用 Flux 和 Mono 两个 Reactive Streams 框架来处理流数据,其中 Flux 处理一个流中 0 个或多个消息的场景,而 Mono 处理 0 个或最多 1 个消息的场景。
Flux: A Reactive Streams Publisher with rx operators that emits 0 to N elements, and then completes (successfully or with an error).
Mono: A Reactive Streams Publisher with basic rx operators that emits at most one item via the onNext signal then terminates with an onComplete signal (successful Mono, with or without value), or only emits a single onError signal (failed Mono).
Flux 和 Mono 都继承自 Publisher 接口,Publisher 接口只有一个方法 subscribe() 方法,这个方法主要是用来订阅 Subscriber 对象,而 Subscriber 对象用来消息处理数据。
Publisher 使用了观察者模式,消费者订阅观察者对象 Subscriber,生产者通过 Publisher 对象发布数据,再通知给 Subscriber 对象。
Subscriber 接口是消息最终处理接口,其定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public interface Subscriber <T> { public void onSubscribe (Subscription s) ; public void onNext (T t) ; public void onError (Throwable t) ; public void onComplete () ; }
Subscriber 接口的定义和功能与 gRPC 中 StreamObserver 是类似的。
Frame 格式 RSocket 使用二进制帧进行通信,其格式如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |0 | Stream ID | +-----------+-+-+---------------+-------------------------------+ |Frame Type |I|M| Flags | +-------------------------------+-------------------------------+ | Metadata Length | +---------------------------------------------------------------+ | Metadata Payload ... +---------------------------------------------------------------+ | Payload of Frame ... +---------------------------------------------------------------+
字段描述:
Stream ID: (31 bits = max value 2^31-1 = 2,147,483,647), 用 31 位无符号整数表示 StreamId, 0 专门用来描述整个连接;
Frame Type:(6 bits = max value 63) 帧类型,下文会介绍;
Flags: (10 bits), 标志位,具体值取决于帧类型,0 表示不设置。协议要求,所有帧要带两个标志位: (I)gnore: 如果不识别该帧是否忽略 ,(M)etadata: 是否带有 Metadata;
Metadata Length: (24 bits = max value 16,777,215), 24 位无符号整数表示 Metadata 长度, 该部分可选,取决于是否有 Metadata;
Metadata: Metadata 元数据;
Payload: 数据。
帧类型:
Type
Value
Description
RESERVED
0x00
Reserved
SETUP
0x01
Setup: Sent by client to initiate protocol processing.
LEASE
0x02
Lease: Sent by Responder to grant the ability to send requests.
KEEPALIVE
0x03
Keepalive: Connection keepalive.
REQUEST_RESPONSE
0x04
Request Response: Request single response.
REQUEST_FNF
0x05
Fire And Forget: A single one-way message.
REQUEST_STREAM
0x06
Request Stream: Request a completable stream.
REQUEST_CHANNEL
0x07
Request Channel: Request a completable stream in both directions.
REQUEST_N
0x08
Request N: Request N more items with Reactive Streams semantics.
CANCEL
0x09
Cancel Request: Cancel outstanding request.
PAYLOAD
0x0A
Payload: Payload on a stream. For example, response to a request, or message on a channel.
ERROR
0x0B
Error: Error at connection or application level.
METADATA_PUSH
0x0C
Metadata: Asynchronous Metadata frame
RESUME
0x0D
Resume: Replaces SETUP for Resuming Operation (optional)
RESUME_OK
0x0E
Resume OK : Sent in response to a RESUME if resuming operation possible (optional)
EXT
0x3F
Extension Header: Used To Extend more frame types as well as extensions.
在建立好连接之后,发送的第一帧是 SETUP 或 RESUME,主要用于协商客户端与服务器相关的信息,其中 RESUME 用于连接的重建。SETUP 帧的格式如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Stream ID = 0 | +-----------+-+-+-+-+-----------+-------------------------------+ |Frame Type |0 |M|R|L| Flags | +-----------+-+-+-+-+-----------+-------------------------------+ | Major Version | Minor Version | +-------------------------------+-------------------------------+ |0 | Time Between KEEPALIVE Frames | +---------------------------------------------------------------+ |0 | Max Lifetime | +---------------------------------------------------------------+ | Token Length | Resume Identification Token ... +---------------+-----------------------------------------------+ | MIME Length | Metadata Encoding MIME Type ... +---------------+-----------------------------------------------+ | MIME Length | Data Encoding MIME Type ... +---------------+-----------------------------------------------+ Metadata & Setup Payload
协商的内容包括协议的版本、KEEPALIVE 间隔时间(单位:ms)、服务保活最大时长(单位:ms)、用于 Resume 的 token(恢复连接时进行比对)、Metadata 编码类型及 Data 编码类型。一旦协商成功,就可以发送后续的请求帧。
通信模型 在 RSocket 中支持四种通信模型,分别是:
Fire-Forget: 只发送 Request,不用 Response;
Request-Response: 一个 Request,一个 Response;
Request-Stream: 一个 Request, 响应为 Stream;
Channel:双向流。
四种通信模型对应的帧请求分别为:REQUEST_FNF,REQUEST_RESPONSE,REQUEST_STREAM 及 REQUEST_CHANNEL, 响应结果使用 PAYLOAD 帧,PAYLOAD 用来传输数据。另外,响应结束的 COMPLETE 标志位在 PAYLOAD 帧中定义。 PAYLOAD 定义如下:
1 2 3 4 5 6 7 8 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Stream ID | +-----------+-+-+-+-+-+---------+-------------------------------+ |Frame Type |0 |M|F|C|N| Flags | +-------------------------------+-------------------------------+ Metadata & Data
Frame Type: (6 bits) 0x0A;
Flags: (10 bits):
(M)etadata: 带有 Metadata 数据;
(F)ollows: 表示是否将数据分包(fragments),请求数据大于一个 Frame 长度时使用;
(C)omplete: 流结束标志位,如果设置,onComplete() 方法会被调用;
(N)ext: 传递数据,如果设置,onNext(Payload) 方法会被调用;
Payload Data: 数据
REQUEST_FNF, REQUEST_RESPONSE, REQUEST_STREAM 及 REQUEST_CHANNEL 定义比较类似,也相对简单,以 REQUEST_RESPONSE 为例:
1 2 3 4 5 6 7 8 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Stream ID | +-----------+-+-+-+-------------+-------------------------------+ |Frame Type |0 |M|F| Flags | +-------------------------------+ Metadata & Request Data
Frame Type: (6 bits) 0x04;
Flags: (10 bits):
(M)etadata: 带有 Metadata 数据;
(F)ollows: 表示是否将数据分包(fragments),请求数据大于一个 Frame 长度时使用; Request Data: 请求的数据
说完帧的格式,我们来看下 Requester 和 Responder 之间的通信流程,以 Request-Response 为例,先作以下约定:
“RQ -> RS” refers to Requester sending a frame to a Responder. And “RS -> RQ” refers to Responder. “*” refers to 0 or more and “+” refers to 1 or more.
Request-Response:
1 2 3 4 5 6 7 8 9 10 1. RQ -> RS: REQUEST_RESPONSE2. RS -> RQ: PAYLOAD with COMPLETEor 1. RQ -> RS: REQUEST_RESPONSE2. RS -> RQ: ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID]or 1. RQ -> RS: REQUEST_RESPONSE2. RQ -> RS: CANCEL
有三种情况:
请求正常,Requester 发送一个 REQUEST_RESPONSE 帧,Responder 响应一个带 COMPLETE 标志位的 PAYLOAD;
请求出错,Requester 发送一个 REQUEST_RESPONSE 帧,Responder 响应一个 ERROR 帧,帧中带有错误码及错误信息;
请求取消,Requester 发送一个 REQUEST_RESPONSE 帧,紧接着发送一个 CANCEL, 取消请求。
对于 Stream 的情况,则会发送多个 PAYLOAD 帧。
实例 引入依赖 1 2 3 4 5 6 7 8 9 10 11 12 <dependencies > <dependency > <groupId > io.rsocket</groupId > <artifactId > rsocket-core</artifactId > <version > 0.11.13</version > </dependency > <dependency > <groupId > io.rsocket</groupId > <artifactId > rsocket-transport-netty</artifactId > <version > 0.11.13</version > </dependency > </dependencies >
实例底层的传输使用的是 TCP, 框架采用的是 Netty.
服务器端 1 2 3 4 5 6 7 8 9 10 11 12 13 public interface RSocket extends Availability , Closeable { Mono<Void> fireAndForget (Payload payload) ; Mono<Payload> requestResponse (Payload payload) ; Flux<Payload> requestStream (Payload payload) ; Flux<Payload> requestChannel (Publisher<Payload> payloads) ; Mono<Void> metadataPush (Payload payload) ; }
在服务端需要实现 RSocket 接口,并将其添加到服务器中。
1 2 3 4 5 6 7 8 9 10 11 public Server () { this .server = RSocketFactory.receive() .acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl ())) .transport(TcpServerTransport.create("localhost" , TCP_PORT)) .start() .doOnNext(x -> LOG.info("Server started" )) .subscribe(); initService(); }
服务端的 RSocketImpl 代码逻辑如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 private class RSocketImpl extends AbstractRSocket { @Override public Mono<Payload> requestResponse (Payload payload) { try { LOG.info("payload:{}" , payload); return Mono.just(payload); } catch (Exception x) { return Mono.error(x); } } @Override public Mono<Void> fireAndForget (Payload payload) { try { dataPublisher.publish(payload); return Mono.empty(); } catch (Exception x) { return Mono.error(x); } } @Override public Flux<Payload> requestStream (Payload payload) { String streamName = payload.getDataUtf8(); if (DATA_STREAM_NAME.equals(streamName)) { return Flux.from(dataPublisher); } return Flux.error(new IllegalArgumentException (streamName)); } @Override public Flux<Payload> requestChannel (Publisher<Payload> payloads) { Flux.from(payloads) .subscribe(gameController::processPayload); Flux<Payload> channel = Flux.from(gameController); return channel; } } public class DataPublisher implements Publisher <Payload> { private List<Subscriber<? super Payload>> subscribers = new ArrayList <>(); @Override public void subscribe (Subscriber<? super Payload> subscriber) { this .subscribers.add(subscriber); } public void publish (Payload payload) { subscribers.stream().forEach(subscriber -> subscriber.onNext(payload)); } public void complete () { subscribers.stream().forEach(subscriber -> subscriber.onComplete()); } }
客户端代码 Fire-and-Forget 客户端定时 50s 发送一次数据到服务器,不用服务器响应。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private final RSocket socket;private final List<Float> data;public FireNForgetClient () { this .socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost" , TCP_PORT)) .start() .block(); this .data = Collections.unmodifiableList(generateData()); } public void sendData () { Flux.interval(Duration.ofMillis(50 )) .take(data.size()) .map(this ::createFloatPayload) .flatMap(socket::fireAndForget) .blockLast(); }
Request-Response
1 2 3 4 5 6 7 8 9 10 11 12 public String callBlocking (String string) { return socket .requestResponse(DefaultPayload.create(string)) .map(Payload::getDataUtf8) .onErrorReturn(ERROR_MSG) .block(); }
Request-Stream
1 2 3 4 5 6 7 8 9 10 11 public Flux<Float> getDataStream () { return socket .requestStream(DefaultPayload.create(DATA_STREAM_NAME)) .map(Payload::getData) .map(buf -> buf.getFloat()) .onErrorReturn(null ); }
Stream-Stream 通过 GameController 类,既实现了数据在发送,也实现了数据的处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private final RSocket socket;private final GameController gameController;public ChannelClient () { this .socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost" , TCP_PORT)) .start() .block(); this .gameController = new GameController ("Client Player" ); } public void playGame () { socket.requestChannel(Flux.from(gameController)) .doOnNext(gameController::processPayload) .blockLast(); }
GameController 处理逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 public class GameController implements Publisher <Payload> { private static final Logger LOG = LoggerFactory.getLogger(GameController.class); private final String playerName; private final List<Long> shots; private Subscriber<? super Payload> subscriber; private boolean truce = false ; public GameController (String playerName) { this .playerName = playerName; this .shots = generateShotList(); } private List<Long> generateShotList () { return Flux.range(1 , SHOT_COUNT) .map(x -> (long ) Math.ceil(Math.random() * 1000 )) .collectList() .block(); } @Override public void subscribe (Subscriber<? super Payload> subscriber) { this .subscriber = subscriber; fireAtWill(); } private void fireAtWill () { new Thread (() -> { for (Long shotDelay : shots) { try { Thread.sleep(shotDelay); } catch (Exception x) {} if (truce) { break ; } LOG.info("{}: bang!" , playerName); subscriber.onNext(DefaultPayload.create("bang!" )); } if (!truce) { LOG.info("{}: I give up!" , playerName); subscriber.onNext(DefaultPayload.create("I give up" )); } subscriber.onComplete(); }).start(); } public void processPayload (Payload payload) { String message = payload.getDataUtf8(); switch (message) { case "bang!" : String result = Math.random() < 0.5 ? "Haha missed!" : "Ow!" ; LOG.info("{}: {}" , playerName, result); break ; case "I give up" : truce = true ; LOG.info("{}: OK, truce" , playerName); break ; } } }
说明: 流的操作主要是通过 Publisher 类来实现的。
测试代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 @Test public void whenSendingAString_thenRevceiveTheSameString () { ReqResClient client = new ReqResClient (); String string = "Hello RSocket" ; assertEquals(string, client.callBlocking(string)); client.dispose(); } @Test public void whenSendingFireForget () { FireNForgetClient fnfClient = new FireNForgetClient (); List<Float> data = fnfClient.getData(); data.stream().forEach(System.out::println); fnfClient.sendData(); try { TimeUnit.SECONDS.sleep(10 ); } catch (InterruptedException ex) { ex.printStackTrace(); } } @Test public void whenSendingStream_thenReceiveTheSameStream () { FireNForgetClient fnfClient = new FireNForgetClient (); ReqStreamClient streamClient = new ReqStreamClient (); List<Float> data = fnfClient.getData(); List<Float> dataReceived = new ArrayList <>(); Disposable subscription = streamClient.getDataStream() .index() .subscribe( tuple -> { assertEquals("Wrong value" , data.get(tuple.getT1().intValue()), tuple.getT2()); LOG.info("index:{},data:{}" ,tuple.getT1(),tuple.getT2()); dataReceived.add(tuple.getT2()); }, err -> LOG.error(err.getMessage()) ); fnfClient.sendData(); try { Thread.sleep(5000 ); } catch (Exception x) { } subscription.dispose(); fnfClient.dispose(); assertEquals("Wrong data count received" , data.size(), dataReceived.size()); } @Test public void whenRunningChannelGame_thenLogTheResults () { ChannelClient client = new ChannelClient (); client.playGame(); client.dispose(); }
总结 通过分析 RSocket 通信协议,我们理解了 RSocket 四种通信模型的基本原理,不过 RSocket 提供的功能不仅限于这些,它还提供了更多高级的功能,如文章开头介绍的那样。RSocket 官网提供了翔实的文档,如果感兴趣的话,可以深入理解下。
参考:
1. rsocket 官方文档 2. RSocket 基于消息传递的反应式应用层网络协议 3. 反应式宣言 4. Introduction to RSocket 5. Flux API 6. Mono API 7. Reactive service to service communication with RSocket — Introduction 8. Rsocket protocol 9. 响应式服务通信协议RSocket