Java程式設計方法論-Spring WebFlux篇 Reactor-Netty下TcpServer的功能實現 1
本系列為本人Java程式設計方法論 響應式解讀系列的Webflux
部分,現分享出來,前置知識Rxjava2 ,Reactor的相關解讀已經錄製分享視訊,併發布在b站,地址如下:
Rxjava原始碼解讀與分享:https://www.bilibili.com/video/av34537840
Reactor原始碼解讀與分享:https://www.bilibili.com/video/av35326911
NIO原始碼解讀相關視訊分享: https://www.bilibili.com/video/av43230997
NIO原始碼解讀視訊相關配套文章:
BIO到NIO原始碼的一些事兒之NIO 下 之 Selector
BIO到NIO原始碼的一些事兒之NIO 下 Buffer解讀 上
BIO到NIO原始碼的一些事兒之NIO 下 Buffer解讀 下
Java程式設計方法論-Spring WebFlux篇 01 為什麼需要Spring WebFlux 上
Java程式設計方法論-Spring WebFlux篇 01 為什麼需要Spring WebFlux 下
Java程式設計方法論-Spring WebFlux篇 Reactor-Netty下HttpServer 的封裝
其中,Rxjava與Reactor作為本人書中內容將不對外開放,大家感興趣可以花點時間來觀看視訊,本人對著兩個庫進行了全面徹底細緻的解讀,包括其中的設計理念和相關的方法論,也希望大家可以留言糾正我其中的錯誤。
本書主要針對Netty
伺服器來講,所以讀者應具備有關Netty
的基本知識和應用技能。接下來,我們將對Reactor-netty
從設計到實現的細節一一探究,讓大家真的從中學習到好的封裝設計理念。本書在寫時所參考的最新版本是Reactor-netty 0.7.8.Release
這個版本,但現在已有0.8
版本,而且0.7
與0.8
版本在原始碼細節有不小的變動,這點給大家提醒下。我會針對0.8
版本進行全新的解讀並在未來出版的書中進行展示。
TcpServer的功能實現
這裡,我們會首先解讀Reactor Netty
是如何針對Netty
中Bootstrap
的ChildHandler
進行封裝以及響應式拓展等一些細節探究。接著,我們會涉及到HttpHandler
的引入,以此來對接我們上層web服務。
針對Bootstrap的ChildHandler的封裝
因為這是我們切入自定義邏輯的地方,所以,我們首先來關注下與其相關的ChannelHandler
,以及前文並未提到的,伺服器到底是如何啟動以及如何通過響應式來做到優雅的關閉,首先我們會接觸關閉伺服器的設定。
ChannelHandler引入與使用響應式優雅關閉伺服器
我們再回到reactor.ipc.netty.http.server.HttpServer#HttpServer
這個構造器中,由上一章我們知道請求是HTTP
層面的(應用層),必須依賴於TCP
的連線實現,所以這裡就要有一個TCPServer
的實現,其實就是Channel
上Pipeline
的操作。
//reactor.ipc.netty.http.server.HttpServer#HttpServer private HttpServer(HttpServer.Builder builder) { HttpServerOptions.Builder serverOptionsBuilder = HttpServerOptions.builder(); ... this.options = serverOptionsBuilder.build(); this.server = new TcpBridgeServer(this.options); } 複製程式碼
這裡的話在DiscardServer Demo
中,TCPServer
我們主要針對childHandler
的內容的封裝,也就是如下內容:
b.group(bossGroup, workerGroup) ... .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) ... 複製程式碼
那childHandler
到底代表什麼型別,我們可以在io.netty.bootstrap.ServerBootstrap
找到其相關定義:
//io.netty.bootstrap.ServerBootstrap public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class); private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>(); private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>(); private final ServerBootstrapConfig config = new ServerBootstrapConfig(this); private volatile EventLoopGroup childGroup; private volatile ChannelHandler childHandler; public ServerBootstrap() { } private ServerBootstrap(ServerBootstrap bootstrap) { super(bootstrap); childGroup = bootstrap.childGroup; childHandler = bootstrap.childHandler; synchronized (bootstrap.childOptions) { childOptions.putAll(bootstrap.childOptions); } synchronized (bootstrap.childAttrs) { childAttrs.putAll(bootstrap.childAttrs); } } ... } 複製程式碼
由欄位定義可知,childHandler
代表的是ChannelHandler
,顧名思義,是關於Channel
的一個處理類,這裡通過檢視其定義可知它是用來攔截處理Channel
中的I/O
事件,並通過Channel
下的ChannelPipeline
將處理後的事件轉發到其下一個處理程式中。
那這裡如何實現DiscardServer Demo
中的b.childHandler(xxx)
行為,通過DiscardServer Demo
我們可以知道,我們最關注的其實是ch.pipeline().addLast(new DiscardServerHandler());
中的DiscardServerHandler
實現,但是我們發現,這個核心語句是包含在ChannelInitializer
內,其繼承了ChannelInboundHandlerAdapter
,它的最頂層的父類介面就是ChannelHandler
,也就對應了io.netty.bootstrap.ServerBootstrap
在執行b.childHandler(xxx)
方法時,其需要傳入ChannelHandler
型別的設定。這裡就可以分拆成兩步來做,一個是b.childHandler(xxx)
行為包裝,一個是此ChannelHandler
的定義拓展實現。
那麼,為了API
的通用性,我們先來看Netty的客戶端的建立的一個Demo(摘自本人RPC專案的一段程式碼):
private Channel createNewConChannel() { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class) .group(new NioEventLoopGroup(1)) .handler(new ChannelInitializer<Channel>() { protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO)) .addLast(new RpcDecoder(10 * 1024 * 1024)) .addLast(new RpcEncoder()) .addLast(new RpcClientHandler()) ; } }); try { final ChannelFuture f = bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .option(ChannelOption.TCP_NODELAY, true) .connect(ip, port).sync(); // <1> f.addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { LOGGER.info("Connect success {} ", f); } }); final Channel channel = f.channel(); channel.closeFuture().addListener((ChannelFutureListener) future -> LOGGER.info("Channel Close {} {}", ip, port)); return channel; } catch (InterruptedException e) { e.printStackTrace(); } return null; } 複製程式碼
將Netty
的客戶端與服務端的建立進行對比,我們可以發現b.childHandler(xxx)
與相應的啟動(Server
端的話是serverBootstrap.bind(port).sync();
,客戶端的話是上述Demo中<1>
處的內容)都可以抽取出來作為一個介面來進行功能的聚合,然後和相應的Server
(如TcpServer
)或Client
(如TcpClient
)進行其特有的實現。在Reactor Netty
內的話,就是定義一個reactor.ipc.netty.NettyConnector
介面,除了做到上述的功能之外,為了適配響應式的理念,也進行了響應式的設計。即在netty
客戶端與服務端在啟動時,可以儲存其狀態,以及提供結束的對外介面方法,這種在響應式中可以很優雅的實現。接下來,我們來看此reactor.ipc.netty.NettyConnector
的介面定義:
//reactor.ipc.netty.NettyConnector public interface NettyConnector<INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound> { Mono<? extends NettyContext> newHandler(BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> ioHandler); ... default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>> BlockingNettyContext start(T handler) { return new BlockingNettyContext(newHandler(handler), getClass().getSimpleName()); } } ... default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>> void startAndAwait(T handler, @Nullable Consumer<BlockingNettyContext> onStart) { BlockingNettyContext facade = new BlockingNettyContext(newHandler(handler), getClass().getSimpleName()); facade.installShutdownHook(); if (onStart != null) { onStart.accept(facade); } facade.getContext() .onClose() .block(); } 複製程式碼
其中,newHandler
可以是我們上層web處理,裡面包含了INBOUND, OUTBOUND
,具體的話就是request,response
,後面會專門來涉及到這點。
接著就是提供了一個啟動方法start
,其內建立了一個BlockingNettyContext
例項,而邏輯的核心就在其構造方法內,就是要將配置好的伺服器啟動,整個啟動過程還是放在newHandler(handler)
中,其返回的Mono<? extends NettyContext>
中的NettyContext
型別元素是管理io.netty.channel.Channel
上下文資訊的一個物件,這個物件更多的是一些無狀態的操作,並不會對此物件做什麼樣的改變,也是通過對此物件的一個Mono<? extends NettyContext>
包裝然後通過block
產生訂閱,來做到sync()
的效果,通過,通過block
產生訂閱後返回的NettyContext
物件,可以使中斷關閉伺服器的操作也可以做到更優雅:
public class BlockingNettyContext { private static final Logger LOG = Loggers.getLogger(BlockingNettyContext.class); private final NettyContext context; private final String description; private Duration lifecycleTimeout; private Thread shutdownHook; public BlockingNettyContext(Mono<? extends NettyContext> contextAsync, String description) { this(contextAsync, description, Duration.ofSeconds(45)); } public BlockingNettyContext(Mono<? extends NettyContext> contextAsync, String description, Duration lifecycleTimeout) { this.description = description; this.lifecycleTimeout = lifecycleTimeout; this.context = contextAsync .timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be started within " + lifecycleTimeout.toMillis() + "ms"))) .doOnNext(ctx -> LOG.info("Started {} on {}", description, ctx.address())) .block(); } ... /** * Shut down the {@link NettyContext} and wait for its termination, up to the * {@link #setLifecycleTimeout(Duration) lifecycle timeout}. */ public void shutdown() { if (context.isDisposed()) { return; } removeShutdownHook(); //only applies if not called from the hook's thread context.dispose(); context.onClose() .doOnError(e -> LOG.error("Stopped {} on {} with an error {}", description, context.address(), e)) .doOnTerminate(() -> LOG.info("Stopped {} on {}", description, context.address())) .timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be stopped within " + lifecycleTimeout.toMillis() + "ms"))) .block(); } ... } 複製程式碼
這裡,我們來接觸下在Reactor中並沒有深入接觸的blockXXX()
操作,其實整個邏輯還是比較簡單的,這裡拿reactor.core.publisher.Mono#block()
來講,就是獲取並返回這個下發的元素:
//reactor.core.publisher.Mono#block() @Nullable public T block() { BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>(); onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber)); return subscriber.blockingGet(); } //reactor.core.publisher.BlockingMonoSubscriber final class BlockingMonoSubscriber<T> extends BlockingSingleSubscriber<T> { @Override public void onNext(T t) { if (value == null) { value = t; countDown(); } } @Override public void onError(Throwable t) { if (value == null) { error = t; } countDown(); } } //reactor.core.publisher.BlockingSingleSubscriber abstract class BlockingSingleSubscriber<T> extends CountDownLatch implements InnerConsumer<T>, Disposable { Tvalue; Throwable error; Subscription s; volatile boolean cancelled; BlockingSingleSubscriber() { super(1); } ... @Nullable final T blockingGet() { if (Schedulers.isInNonBlockingThread()) { throw new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread " + Thread.currentThread().getName()); } if (getCount() != 0) { try { await(); } catch (InterruptedException ex) { dispose(); throw Exceptions.propagate(ex); } } Throwable e = error; if (e != null) { RuntimeException re = Exceptions.propagate(e); //this is ok, as re is always a new non-singleton instance re.addSuppressed(new Exception("#block terminated with an error")); throw re; } return value; } ... @Override public final void onComplete() { countDown(); } } 複製程式碼
可以看到,此處使用的CountDownLatch
的一個特性,在元素下發賦值之後,等待數值減1,這裡剛好也就這一個限定(由super(1)
定義),解除所呼叫的blockingGet
中的等待,得到所需的值,這裡,為了保證block()
的語義,其onComplete
方法也呼叫了countDown();
,即當上遊為Mono<Void>
時,做到匹配。