无题
Netty
牢记四个点
NIO基础
非阻塞io - non-blocking io
- 为什么NIO 是非阻塞的.NIO使用了通道和通道的IO多路复用技术。
三大组件
Channel Buffer Selector
Channel & Buffer
最常用的 Buffer: ByteBuffer
其他的不是很清楚
Selector
Selector相当于一个中转站, 来分发channel 发来的数据给thread
把通道注册到选择器中, 第二步则是通过选择器所提供的事件 查询方法。来查看 这些注册的通道是否又已经就绪的IO事件
选择器只需要一个线程进行监控。
因为 一个 选择器 连接着多个 Channel, 然后一个 channel 又相当于 一个流, 所以 它的 效率是远远高于 oio ( 这个就是多路复用)
前面两个组件 channel selector 只是负责与应用程序的交互, 但是为了实现非阻塞的读写, 我们需要 使用 Buffer
buffer 是非线程安全的
Buffer类是一个抽象类,对应于Java的主要数据类型,在NIO中有8种缓冲区类,分别如下:ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer、MappedByteBuffer。
buffer
- 在 buffer 的子类 中, 有一个byte[]类型的数组 作为 数据的 读写缓存区。 但是 这个 数组没有定义在 buffer 中。
为了 记录 读写的状态 它提供了下面三种重要的属性
capacity
capacity 一经 对象创建 之后 就不能修改。
(注意这个 容量 不是指 对象里面 byte数组的大小, 而是指 可以写入的字符多少)
position
position 的值 和 缓冲区的读写模式有关。 在不同 的模式下。 position属性值的含义是不同的。
在刚刚写入 模式的时候,代表当前 要写入的位置
当刚刚进入到 读模式的时候, pos 会被修改为 0 同上
调用 filp 方法实现 读写模式的转换
- 同时limit 被修改为 写入模式的pos
- pos 变为 0
limit
略, 就是最大容量
mark
在缓冲区操作过程中,可以将当前的 pos 的值临时存入到mark 属性中, 需要的时候, 可以在从mark中取出暂存的标记值。恢复到 pos 中,重新从pos 位置开始
- 调用 reset () 可以恢复 pos
读写模式
- 读和写 针对的缓冲区
- 写入到缓冲区 就要使用 写模式
- 从缓冲区读出来就要使用读模式
channel 的 Write 和 read
write 方法 Writes a sequence of bytes to this channel from the given buffer. (将缓冲区写入到通道)
read 方法 Reads a sequence of bytes from this channel into the given buffers.(读取通道到缓冲区)
详解 NIO Buffer 类
allocate 分配缓冲区
1 | public class UseBuffer |
put写入缓存中
- 你写入的对象类型要和 buffer 子类一个类型
1 | package com.crazymakercircle.bufferDemo; |
flip 翻转
这里的读 和 写 是针对于缓冲区
写模式到读模式
(1)首先,设置可读上限limit的属性值。将写入模式下的缓冲区中内容的最后写入位置position值,作为读取模式下的limit上限值。
(2)其次,把读的起始位置position的值设为0,表示从头开始读。
(3)最后,清除之前的mark标记,因为mark保存的是写入模式下的临时位置,发生模式翻转后,如果继续使用旧的mark标记,会造成位置混乱。
1 |
|
- 想要转换回来 也就是 读模式变成写模式
需要调用clear 方法 或者 compact方法
get从缓冲区读取
同上面,但是有几个需要注意的点
- pos 和 limit 上面讲过了
- 读完不能直接写, 需要调用 clear方法, 和compact 方法
- 可以重复读吗? 可以 调用 rewind , mark reset 方法
mark 和 reset 方法
- Buffer.mark()方法将当前position的值保存起来,放在mark属性中,让mark属性记住这个临时位置;之后,可以调用**Buffer.reset()**方法将mark的值恢复到position中。
clear 清空 缓冲区
上面讲过了
在读取模式下,调用clear()方法将缓冲区切换为写入模式。此方法的作用:
(1)会将position清零;
(2)limit设置为capacity最大容量值,可以一直写入,直到缓冲区写满。
使用Buffer类的步骤
总体来说,使用Java NIO Buffer类的基本步骤如下:
(1)使用创建子类实例对象的allocate( )方法,创建一个Buffer类的实例对象。
(2)调用put( )方法,将数据写入到缓冲区中。
(3)写入完成后,在开始读取数据前,调用Buffer.flip(
)方法,将缓冲区转换为读模式。
(4)调用get( )方法,可以从缓冲区中读取数据。
(5)读取完成后,调用Buffer.clear()方法或Buffer.compact()方法,将缓冲区转换为写入模式,可以继续写入。
详解 NIO Channel 类
- 之前认为 一个通道可以对应一个底层的文件描述符。实际上还可以更加的抽象,对应不同的协议类型
NIO全部通道类型进行过多的描述,仅仅聚焦于介绍其中最为重要的四种Channel(通道)实现:FileChannelSocketChannel、ServerSocketChannel、DatagramChannel。
对于以上四种通道,说明如下:
(1)FileChannel文件通道,用于文件的数据读写;
(2)SocketChannel套接字通道,用于Socket套接字TCP连接的数据读写;
(3)ServerSocketChannel服务器套接字通道(或服务器监听通道),允许我们监听TCP连接请求,为每个监听到的请求,创建一个SocketChannel套接字通道;
(4)DatagramChannel数据报通道,用于UDP协议的数据读写。这个四种通道,涵盖了文件IO、TCP网络、UDP
FileChannel
注意这是 阻塞的,不能设置为非阻塞的模式
写入代码
1 | FileInputStream fis = new FileInputStream(srcFile); |
- 读出模式
- 在大部分应用场景,从通道读取数据都会调用通道的int
read(ByteBufferbuf)方法,它从通道读取到数据写入到ByteBuffer缓冲区,并且返回读取到的数据量。
- 在大部分应用场景,从通道读取数据都会调用通道的int
1 | RandomAccessFile aFile = new RandomAccessFile(fileName, “rw”); |
1 | // 这里 就是 它的 读出 文件然后打印出来 |
- 写入 FileChannel 模式
1 | buf.flip(); |
通道使用完成后,需要像io操作那样将其关闭
在缓冲区写入通道之后,处于性能考量可能不会吧数据写入到此磁盘之中, 为了 保证 可以实现写入磁盘, 我们可以调用force 方法 来强制写入
需要注意的是 , 这个 force 需要填入 一个 参数, 如果是 true 的话, 就会把 元数据填入, 比如说文件的创建时间, 修改时间大小 写入, 如果是false 的话, 就只是 把 文件内容进来
FileChannel 来实现文件复制
1 | public static void changeATOB (String src, String desc){ |
SocketChannel 套接字通道
有两个负责网络连接的通道
- 一个是SocketChannel负责连接的数据传输
- 另一个是ServerSocketChannel负责连接的监听
ServerSocketChannel仅仅应用于服务器端,而SocketChannel则同时处于服务器端和客户端,所以,对应于一个连接,两端都有一个负责传输的SocketChannel传输通道。
无论是 上面两个通道的哪一个, 它们都支持两种阻塞模式
- socketChannel.configureBlocking(false)设置为非阻塞模式。
- socketChannel.configureBlocking(true)设置为阻塞模式。
发起连接
1 | //获得一个套接字传输通道 |
- 由于可能连接还没有完全连上, 所以我们需要自旋判断
1 | while(!socketChannel.finishConnect()){ |
- 服务端 连接
1 | //新连接事件到来,首先通过事件,获取服务器监听通道 |
读取通道
实际上类似于 前面的 文件的读取方式
但是有一个问题, 我们想要使用异步io,但是如何知道通道有数据了呢,这个时候就需要使用到Selector
写入通道
和前面的把数据写入到FileChannel文件通道一样,大部分应用场景都会调用通道的int
write(ByteBufferbuf)方法。
//写入前需要读取缓冲区,要求ByteBuffer是读取模式
buffer.flip();
socketChannel.write(buffer);
关闭通道
- 在关闭SocketChannel传输通道前,如果传输通道用来写入数据,则建议调用一次shutdownOutput()终止输出方法,向对方发送一个输出的结束标志(-1)。然后调用socketChannel.close()方法,关闭套接字连接。
1 | //调用终止输出方法,向对方发送一个输出的结束标志 |
DatagramChannel 数据报通道
- 调用open的静态方法, 获取到channel 通道
1 | //获取DatagramChannel数据报通道 |
- 详细的内容 可以看书上 p53
发送的话看下面
1 | //开启 数据包通道 |
接收的话 使用下面这个
1 | //开启 数据包通道 |
详解 NIO Selector 类
Selector 的作用
- 选择器的使命是完成IO的多路复用,其主要工作是通道的注册、监听、事件查询。一个通道代表一条连接通路,通过选择器可以同时监控多个通道的IO(输入输出)状况。选择器和通道的关系,是监控和被监控的关系
介绍
- 一般来说 是一个 线程对应一个 选择器,一个选择器关联多个通道。一个单线程可以处理数百、数千、数万、甚至更多的通道。在极端情况下(数万个连接),只用一个线程就可以处理所有的通道,这样会大量地减少线程之间上下文切换的开销
选择器以及注册
Channel。register(Selector sel, int ops)
第一个参数是通道 , 第二个参数是通道IO 类型
1 | (1)可读:SelectionKey.OP_READ |
同时我们可以通过或运算的方式选择监听多个IO类型
int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE
- 注意我们后面遇到的IO事件, 它是指通道处于的一种状态
比方说某个SocketChannel传输通道,如果完成了和对端的三次握手过程,则会发生“连接就绪”(OP_CONNECT)的事件。再比方说某个ServerSocketChannel服务器连接监听通道,在监听到一个新连接的到来时,则会发生“接收就绪”(OP_ACCEPT)的事件。还比方说,一个SocketChannel通道有数据可读,则会发生“读就绪”(OP_READ)事件;一个等待写入数据的SocketChannel通道,会发生写就绪(OP_WRITE)事件。
SelectableChannel可选择通道
- 实际上不是所有的 通道都可以使用选择器,
- 一个通道是否可以使用取决于它是否继承了抽象类SelectableChannel
- 它提供了实现通道的可选择性所需要的公共方法。Java
NIO中所有网络链接Socket套接字通道,都继承了SelectableChannel类,都是可选择的。
SelectionKey选择键
定义:一旦在通道中发生了某些IO事件(就绪状态达成),并且是在选择器中注册过的IO事件,就会被选择器选中,并放入SelectionKey选择键的集合中。
使用这个不仅可以获取到通道发送的IO事件, 还可以获得IO事件所在的通道, 也可以选择出选择键的选择器实例
选择器的使用流程
(1)获取选择器实例;(Selector selector = Selector.open();)
(2)将通道注册到选择器中;
1 | // 2.获取通道 |
- 注册的通道必须是非阻塞的(所以FileChannel 是不能用到着些方法上面的)
- 一个通道不一定要支持所有的IO事件, 判断通道是否支持某一个事件是使用validOps 方法来实现
(3)轮询感兴趣的IO就绪事件(选择键集合)。
1 | while (selector.select() > 0) { |
注意在遍历选择键的时候, 当你处理完这个IO事件了 ,需要将它从迭代器中移除(不移除下次迭代还是这个信息)
select 方法
(1)select():阻塞调用,一直到至少有一个通道发生了注册的IO事件。
(2)select(long timeout):和select()一样,但最长阻塞时间为timeout指定的毫秒数。
(3)selectNow():非阻塞,不管有没有IO事件,都会立刻返回。
select()方法的返回值的是整数类型(int),表示发生了IO事件的数量。更准确地说,是从上一次select到这一次select之间,有多少通道发生了IO事件,更加准确地说,是指发生了选择器感兴趣(注册过)的IO事件数。
NIO的案例
NIO实现DisCard服务器的实践案例
在一开始监听管道注册之后
我们进入到遍历 选择键的过程
实际上, 如果 当选择键的isAcceptable 方法判断之后, 就说明存在新的连接通道, 我们就把socketChannel注册到选择器之中
如果是 isReadable状态, 我们就调用选择器的 channel 方法, 获取到 通道, 然后设置为 阻塞, 转换为读模式, 从里面 读取数据出来
然后每个循环结束后, 移除掉这个选择键
SocketChannel 在服务端接受文件
- 详细见 书上的写法
- 但是最后的程序没有进行粘 包和半包的处理
Reactor 模式
基础入门
构成
Reactor 线程的职责: 负责响应IO事件, 并且分发到Handlers处理器中。
Handlers处理器的职责, 非阻塞的执行业务处理逻辑
Reactor 线程负责多路IO事件的查询
多线程OIO的缺陷
直接 while 等待 数据, 会造成后面的数据阻塞
使用Handler 来接送数据, 会产生大量的线程, 消耗系统资源, 同时 即便你让一个线程去接受多个数据, 因为OIO的阻塞,性质,导致前面的数据没有完成之后, 后面的数据无法进行, 这就会导致无法并行
缺点: Connection per Thread 的模式就是 会消耗大量的系统资源。这在高并发的场景是致命的
单线程Reactor 模式
首先必须要介绍两个重要的方法
- attach 将对象附加到选择键
- attachment 从选择器获取附加对象
这两个 一个是 将Java POJO对象作为附件添加到 选择键实例中。
注意阅读代码的时候,直接调用 run 方法不会开启一个新的线程来执行, 而是直接执行, (需要使用start())
一个 Reactor , 对应了两个 Handler
- 第一个是进行IO事件的处理
- 另一个是处理业务逻辑
重要的 方法
- SelectionKey (选择键)
void attach(Object o)
将对象附加到选择键Object attachment()
从选择键获取附加对象
- SelectionKey (选择键)
多线程Reactor 模式
- 将IOHandler 的处理器执行放入到独立的线程池中, 这样, 业务处理线程和负责新连接的监听的反应器线程就可以实现相互隔离,避免服务器的连接监听收到阻塞。
- 将反应器线程拆分为多个子反应器, 然后引入多个选择器。,一个线程负责一个一个选择器
实现的机制
引入 一个 多个选择器
设计一个子反应器类, 子反应器负责查询一个选择器
开启多个处理线程, 一个处理线程负责一个子反应器
将IO事件进分类隔离,
对反应器 做了 分类 一个 是 bossReactor 一个是 workReactor
- 我们让主反应器 建立连接请求, 然后再连接请求的时候, 它就从, 从反应器里面轮询挑选一个 选择器, 然后唤醒它, 这样 哪个被选中的 从反应器 就 会 醒来, 往下面 执行 Handler 的 部分, 然后再 读取完数据之后, 把剩下 的 事务提交给子线程来做就好了
正式进入Netty
- 首先上来就是一堆乱起八糟的代码, 这个时候, 实际上就牵涉到我们刚刚学习 Reactor 模式。
方法和类
NioEventLoopGroup
这个就是Netty中的反应器
serverBootstrap 服务引导类,它的职责是将不同的Netty组件组装再一起。
ChannelInboundHandlerAdapter (入站适配器)
- 如果要实现自己的入站处理器Handler, 可以简单的继承ChannelInboundHandlerAdapter 入站处理器适配器,再写入自己的入站处理的业务逻辑(重写)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
NettyDiscardHandler extends ChannelInboundHandlerAdapter{
public void channelRead(ChannelHandlerContext ctx, Object msg){
ByteBuf in = (ByteBuf) msg;
try{
while(in.inReadable()){
sout((char)in.readByte());
}
} finally{
ReferenceCountUtils.release(msg);
}
}
}
解密Netty的Reactor 模式
通道注册
再netty中, 就是channel 注册到 EventLoop上面, 对应到底层就是NIO的Channel 注册到NIO的Selector 上。
查询事件
查询选择器Selector中的IO事件并记录再选择键上面
事件内容分发、数据读取 发射
反应器EventLoop 将事件分发和数据读取两个操作一起负责了。
他会从Unsafe成员中, 获取到信息到IO事件被触发的时候, EventLoop读取到数据后, 会把数据发射到Channel 内部的Pipeline通道。
流水线传播 和 业务处理
数据再通道的Pipeline上传播, 通道由Handler构成, 由Handler业务处理器负责。 处理完成之后, 又把任务交给下一个Handler 。这样就构成了一个责任链模式。
- 一般我们使用最多的就是NioSocketChannel 这个协议