前言
在上一篇的RPC实现,使用了Socket来进行通信,这种方式的缺点很明显,Socket的IO是同步阻塞的,即你的线程会一直等到有服务进来或者read()
有消息。这时候有其他线程想要链接,为了不被阻塞,只能再开一个线程去处理。
虽然可以通过线程池来复用线程,但是通信线程多起来还是会影响到整个服务的性能。所以这篇我们将使用NIO
来代替BIO来实现服务之间的通信。代码上传至github,链接。
NIO组件
Selector
可以说它是NIO中最关键的一个部分,Selector的作用就是用来轮询每个注册的Channel,一旦发现Channel有注册的事件发生,便获取事件然后进行处理。
Channel
传统IO中,Stream是单向的,比如InputStream只能进行读取操作,OutputStream只能进行写操作。而Channel是双向的,既可用来进行读操作,又可用来进行写操作。
Buffer
是NIO中非常重要的一个东西,实际上就是一个容器,是一个连续数组。在NIO中所有数据的读和写都离不开Buffer。在NIO中,读取的数据只能放在Buffer中。同样地,写入数据也是先写入到Buffer中。
打个比喻,一个小巷有5条街,一个巡检在这些街道不停的巡逻。当某个街道发生状况时,巡检会带着他的记录册子去记录事件状况。这个巡检也很懒,他要报案的人来记录这个状况。
先看BIO的情况:当巡检在某条街道看到有状况发生,让涉嫌人员把状况记录完毕(耗时操作),等记录完毕了,巡检查看状况作出相应措施。也就是说这个巡检是在等到记录完成之后才能执行下一步操作!那在这个时间段如果其他街道有事情发生不就不能及时解决了吗!
再看NIO的情况:当巡检在某条街道看到有状况发生,让涉嫌人员把状况记录下来,记录的时候巡检还可以继续巡逻街道。等小哥处理好了,给巡检发个消息说记录好了,巡检在来这个街道查看记录状况,然后对事件进行相应的处理。
对应上面的例子,Selector就是巡检,Buffer是记录的小册子,Channel就是街道。
核心代码
和第一版的主要差别是在通信部分变成NIO
NIOServer 核心
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41while(true){
System.out.println("Server: 等待触发事件...");
selector.select();//没有事情产生会被阻塞在这
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while(keyIterator.hasNext()){
SelectionKey skey = keyIterator.next();
if(skey.isAcceptable()){
System.out.println("Server: 有链接进来");
sc = ((ServerSocketChannel) skey.channel()).accept();
sc.configureBlocking(false);
sc.register(selector,SelectionKey.OP_READ);
System.out.println("Server: 与Client建立链接");
}
if(skey.isReadable()){
System.out.println("Server: 有可读操作进来");
sc = (SocketChannel) skey.channel();
byteBuffer.clear();
while(sc.read(byteBuffer)>0) {//因为有可能字节数很多,byteBuffer一次读不了
//读取channel中的数据
//byteBuffer.flip();
ByteArrayInputStream bInput = new ByteArrayInputStream(byteBuffer.array());
ObjectInputStream oInput = new ObjectInputStream(bInput);
ReqMessage reqMessage = (ReqMessage) oInput.readObject();
result = getResultByParams(reqMessage);
sc.register(selector, SelectionKey.OP_WRITE);
}
}
if(skey.isWritable()){
System.out.println("Server: 有可写操作进来");
sc = (SocketChannel) skey.channel();
ByteArrayOutputStream bOut = new ByteArrayOutputStream();
ObjectOutputStream oOut = new ObjectOutputStream(bOut);
oOut.writeObject(result);
sc.write(ByteBuffer.wrap(bOut.toByteArray()));
sc.close();
}
//移除当前监听的事件
keyIterator.remove();
}
}
NIOInvokeHandler 核心
1 | public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { |
缺点与改进
NIO的实现上比较繁琐,半包\粘包的问题也不好解决。ByteBuffer使用上也非常不方便,需要注意filp、compact调用时机,且不支持动态扩容。后面用Netty来实现通信,针对上面的问题Netty都要合适的解决途径。
服务不支持动态注册,下个版本使用Zookeeper来解决这个问题。