NIO基础
注意:推荐完成JavaSE篇、JavaWeb篇的学习再开启这一部分的学习,如果在这之前完成了JVM篇,那么看起来就会比较轻松了。
在JavaSE的学习中,我们了解了如何使用IO进行数据传输,Java IO是阻塞的,如果在一次读写数据调用时数据还没有准备好,或者目前不可写,那么读写操作就会被阻塞直到数据准备好或目标可写为止。Java NIO则是非阻塞的,每一次数据读写调用都会立即返回,并将目前可读(或可写)的内容写入缓冲区或者从缓冲区中输出,即使当前没有可用数据,调用仍然会立即返回并且不对缓冲区做任何操作。
NIO框架是在JDK1.4推出的,它的出现就是为了解决传统IO的不足,这一期视频,我
们就将围绕着NIO开始讲解。
缓冲区
一切的一切还要从缓冲区开始讲起,包括源码在内,其实这个不是很难,只是需要理清思路。
Buffer类及其实现
Buffer类是缓冲区的实现,类似于Java中的数组,也是用于存放和获取数据的。但是Buffer相比Java中的数组,功能就非常强大了,它包含一系列对于数组的快捷操作。
Buffer是一个抽象类,它的核心内容:
1 2 3 4 5 6 7 8 9 10
| public abstract class Buffer { private int mark = -1; private int position = 0; private int limit; private int capacity;
long address;
|
我们来看看Buffer类的子类,包括我们认识到的所有基本类型(除了boolean
类型之外):
- IntBuffer - int类型的缓冲区。
- ShortBuffer - short类型的缓冲区。
- LongBuffer - long类型的缓冲区。
- FloatBuffer - float类型的缓冲区。
- DoubleBuffer - double类型的缓冲区。
- ByteBuffer - byte类型的缓冲区。
- CharBuffer - char类型的缓冲区。
(注意我们之前在JavaSE中学习过的StringBuffer虽然也是这种命名方式,但是不属于Buffer体系,这里不会进行介绍)
这里我们以IntBuffer为例,我们来看看如何创建一个Buffer类:
1 2 3 4 5 6 7 8
| public static void main(String[] args) { IntBuffer buffer = IntBuffer.allocate(10); int[] arr = new int[]{1, 2, 3, 4, 5, 6}; IntBuffer buffer = IntBuffer.wrap(arr); }
|
那么它的内部是本质上如何进行操作的呢?我们来看看它的源码:
1 2 3 4 5 6
| public static IntBuffer allocate(int capacity) { if (capacity < 0) throw new IllegalArgumentException(); return new HeapIntBuffer(capacity, capacity); }
|
1 2 3 4 5 6 7 8 9 10 11 12
| public static IntBuffer wrap(int[] array, int offset, int length) { try { return new HeapIntBuffer(array, offset, length); } catch (IllegalArgumentException x) { throw new IndexOutOfBoundsException(); } }
public static IntBuffer wrap(int[] array) { return wrap(array, 0, array.length); }
|
那么这个HeapIntBuffer又是如何实现的呢,我们接着来看:
1 2 3 4
| HeapIntBuffer(int[] buf, int off, int len) { super(-1, off, off + len, buf.length, buf, 0); }
|
我们又来看看IntBuffer中的构造方法是如何定义的:
1 2 3 4 5 6 7 8 9 10 11
| final int[] hb; final int offset; boolean isReadOnly;
IntBuffer(int mark, int pos, int lim, int cap, int[] hb, int offset) { super(mark, pos, lim, cap); this.hb = hb; this.offset = offset; }
|
最后我们来看看Buffer中的构造方法:
1 2 3 4 5 6 7 8 9 10 11 12 13
| Buffer(int mark, int pos, int lim, int cap) { if (cap < 0) throw new IllegalArgumentException("Negative capacity: " + cap); this.capacity = cap; limit(lim); position(pos); if (mark >= 0) { if (mark > pos) throw new IllegalArgumentException("mark > position: (" + mark + " > " + pos + ")"); this.mark = mark; } }
|
通过对源码的观察,我们大致可以得到以下结构了:
现在我们来总结一下上面这些结构的各自职责划分:
- Buffer:缓冲区的一些基本变量定义,比如当前的位置(position)、容量 (capacity)、最大限制 (limit)、标记 (mark)等,你肯定会疑惑这些变量有啥用,别着急,这些变量会在后面的操作中用到,我们逐步讲解。
- IntBuffer等子类:定义了存放数据的数组(只有堆缓冲区实现子类才会用到)、是否只读等,也就是说数据的存放位置、以及对于底层数组的相关操作都在这里已经定义好了,并且已经实现了Comparable接口。
- HeapIntBuffer堆缓冲区实现子类:数据存放在堆中,实际上就是用的父类的数组在保存数据,并且将父类定义的所有底层操作全部实现了。
这样,我们对于Buffer类的基本结构就有了一个大致的认识。
缓冲区写操作
前面我们了解了Buffer类的基本操作,现在我们来看一下如何向缓冲区中存放数据以及获取数据,数据的存放包括以下四个方法:
- public abstract IntBuffer put(int i); - 在当前position位置插入数据,由具体子类实现
- public abstract IntBuffer put(int index, int i); - 在指定位置存放数据,也是由具体子类实现
- public final IntBuffer put(int[] src); - 直接存放所有数组中的内容(数组长度不能超出缓冲区大小)
- public IntBuffer put(int[] src, int offset, int length); - 直接存放数组中的内容,同上,但是可以指定存放一段范围
- public IntBuffer put(IntBuffer src); - 直接存放另一个缓冲区中的内容
我们从最简的开始看,是在当前位置插入一个数据,那么这个当前位置是怎么定义的呢,我们来看看源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public IntBuffer put(int x) { hb[ix(nextPutIndex())] = x; return this; }
protected int ix(int i) { return i + offset; }
final int nextPutIndex() { int p = position; if (p >= limit) throw new BufferOverflowException(); position = p + 1; return p; }
|
所以put操作实际上是将底层数组hb
在position位置上的数据进行设定。
设定完成后,position自动后移:
我们可以编写代码来看看:
1 2 3 4 5 6 7 8
| public static void main(String[] args) { IntBuffer buffer = IntBuffer.allocate(10); buffer .put(1) .put(2) .put(3); System.out.println(buffer); }
|
通过断点调试,我们来看看实际的操作情况:
可以看到我们不断地put操作,position会一直向后移动,当然如果超出最大长度,那么会直接抛出异常:
接着我们来看看第二个put操作是如何进行,它能够在指定位置插入数据:
1 2 3 4 5 6 7 8 9 10
| public IntBuffer put(int i, int x) { hb[ix(checkIndex(i))] = x; return this; }
final int checkIndex(int i) { if ((i < 0) || (i >= limit)) throw new IndexOutOfBoundsException(); return i; }
|
实际上这个比我们之前的要好理解一些,注意全程不会操作position的值,这里需要注意一下。
我们接着来看第三个put操作,它是直接在IntBuffer中实现的,是基于前两个put方法的子类实现来完成的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public IntBuffer put(int[] src, int offset, int length) { checkBounds(offset, length, src.length); if (length > remaining()) throw new BufferOverflowException(); int end = offset + length; for (int i = offset; i < end; i++) this.put(src[i]); return this; }
public final IntBuffer put(int[] src) { return put(src, 0, src.length); }
public final int remaining() { int rem = limit - position; return rem > 0 ? rem : 0; }
|
1 2 3 4 5
| static void checkBounds(int off, int len, int size) { if ((off | len | (off + len) | (size - (off + len))) < 0) throw new IndexOutOfBoundsException(); }
|
大致流程如下,首先来了一个数组要取一段数据全部丢进缓冲区:
在检查没有什么问题并且缓冲区有容量时,就可以开始插入了:
最后我们通过代码来看看:
1 2 3 4 5 6 7
| public static void main(String[] args) { IntBuffer buffer = IntBuffer.allocate(10); int[] arr = new int[]{1,2,3,4,5,6,7,8,9}; buffer.put(arr, 3, 4);
System.out.println(Arrays.toString(buffer.array())); }
|
可以看到最后结果为:
当然我们也可以将一个缓冲区的内容保存到另一个缓冲区:
1 2 3 4 5 6 7 8 9 10 11 12
| public IntBuffer put(IntBuffer src) { if (src == this) throw new IllegalArgumentException(); if (isReadOnly()) throw new ReadOnlyBufferException(); int n = src.remaining(); if (n > remaining()) throw new BufferOverflowException(); for (int i = 0; i < n; i++) put(src.get()); return this; }
|
我们来看看效果:
1 2 3 4 5 6
| public static void main(String[] args) { IntBuffer src = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5}); IntBuffer buffer = IntBuffer.allocate(10); buffer.put(src); System.out.println(Arrays.toString(buffer.array())); }
|
但是如果是这样的话,会出现问题:
1 2 3 4 5 6 7
| public static void main(String[] args) { IntBuffer src = IntBuffer.allocate(5); for (int i = 0; i < 5; i++) src.put(i); IntBuffer buffer = IntBuffer.allocate(10); buffer.put(src); System.out.println(Arrays.toString(buffer.array())); }
|
我们发现,结果和上面的不一样,并没有成功地将数据填到下面的IntBuffer中,这是为什么呢?实际上就是因为remaining()
的计算问题,因为这个方法是直接计算postion的位置,但是由于我们在写操作完成之后,position跑到后面去了,也就导致remaining()
结果最后算出来为0。
因为这里不是写操作,是接下来需要从头开始进行读操作,所以我们得想个办法把position给退回到一开始的位置,这样才可以从头开始读取,那么怎么做呢?一般我们在写入完成后需要进行读操作时(后面都是这样,不只是这里),会使用flip()
方法进行翻转:
1 2 3 4 5 6
| public final Buffer flip() { limit = position; position = 0; mark = -1; return this; }
|
这样,再次计算remaining()
的结果就是我们需要读取的数量了,这也是为什么put方法中要用remaining()
来计算的原因,我们再来测试一下:
1 2 3 4 5 6 7 8 9
| public static void main(String[] args) { IntBuffer src = IntBuffer.allocate(5); for (int i = 0; i < 5; i++) src.put(i); IntBuffer buffer = IntBuffer.allocate(10);
src.flip(); buffer.put(src); System.out.println(Arrays.toString(buffer.array())); }
|
翻转之后再次进行转移,就正常了。
缓冲区读操作
前面我们看完了写操作,现在我们接着来看看读操作。读操作有四个方法:
public abstract int get();
- 直接获取当前position位置的数据,由子类实现
public abstract int get(int index);
- 获取指定位置的数据,也是子类实现
public IntBuffer get(int[] dst)
- 将数据读取到给定的数组中
public IntBuffer get(int[] dst, int offset, int length)
- 同上,加了个范围
我们还是从最简单的开始看,第一个get方法的实现在IntBuffer类中:
1 2 3 4 5 6 7 8 9 10 11
| public int get() { return hb[ix(nextGetIndex())]; }
final int nextGetIndex() { int p = position; if (p >= limit) throw new BufferUnderflowException(); position = p + 1; return p; }
|
可以看到每次读取操作之后,也会将postion+1,直到最后一个位置,如果还要继续读,那么就直接抛出异常。
我们来看看第二个:
1 2 3
| public int get(int i) { return hb[ix(checkIndex(i))]; }
|
我们来看看第三个和第四个:
1 2 3 4 5 6 7 8 9 10 11 12 13
| public IntBuffer get(int[] dst, int offset, int length) { checkBounds(offset, length, dst.length); if (length > remaining()) throw new BufferUnderflowException(); int end = offset + length; for (int i = offset; i < end; i++) dst[i] = get(); return this; }
public IntBuffer get(int[] dst) { return get(dst, 0, dst.length); }
|
我们来看看效果:
1 2 3 4 5 6
| public static void main(String[] args) { IntBuffer buffer = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5}); int[] arr = new int[10]; buffer.get(arr, 2, 5); System.out.println(Arrays.toString(arr)); }
|
可以看到成功地将数据读取到了数组中。
当然如果我们需要直接获取数组,也可以使用array()
方法来拿到:
1 2 3 4 5 6 7
| public final int[] array() { if (hb == null) throw new UnsupportedOperationException(); if (isReadOnly) throw new ReadOnlyBufferException(); return hb; }
|
我们来试试看:
1 2 3 4
| public static void main(String[] args) { IntBuffer buffer = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5}); System.out.println(Arrays.toString(buffer.array())); }
|
当然,既然都已经拿到了底层的hb
了,我们来看看如果直接修改之后是不是读取到的就是我们的修改之后的结果了:
1 2 3 4 5 6
| public static void main(String[] args) { IntBuffer buffer = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5}); int[] arr = buffer.array(); arr[0] = 99999; System.out.println(buffer.get()); }
|
可以看到这种方式由于是直接拿到的底层数组,所有修改会直接生效在缓冲区中。
当然除了常规的读取方式之外,我们也可以通过mark()
来实现跳转读取,这里需要介绍一下几个操作:
public final Buffer mark()
- 标记当前位置
public final Buffer reset()
- 让当前的position位置跳转到mark当时标记的位置
我们首先来看标记方法:
1 2 3 4
| public final Buffer mark() { mark = position; return this; }
|
我们再来看看重置方法:
1 2 3 4 5 6 7
| public final Buffer reset() { int m = mark; if (m < 0) throw new InvalidMarkException(); position = m; return this; }
|
那比如我们在读取到1号位置时进行标记:
接着我们使用reset方法就可以直接回退回去了:
现在我们来测试一下:
1 2 3 4 5 6 7 8
| public static void main(String[] args) { IntBuffer buffer = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5}); buffer.get(); buffer.mark(); buffer.get(); buffer.reset(); System.out.println(buffer.get()); }
|
可以看到,读取的位置根据我们的操作进行了变化,有关缓冲区的读操作,就暂时讲到这里。
缓冲区其他操作
前面我们大致了解了一下缓冲区的读写操作,那么我们接着来看看,除了常规的读写操作之外,还有哪些其他的操作:
public abstract IntBuffer compact()
- 压缩缓冲区,由具体实现类实现
public IntBuffer duplicate()
- 复制缓冲区,会直接创建一个新的数据相同的缓冲区
public abstract IntBuffer slice()
- 划分缓冲区,会将原本的容量大小的缓冲区划分为更小的出来进行操作
public final Buffer rewind()
- 重绕缓冲区,其实就是把position归零,然后mark变回-1
public final Buffer clear()
- 将缓冲区清空,所有的变量变回最初的状态
我们先从压缩缓冲区开始看起,它会将整个缓冲区的大小和数据内容变成position位置到limit之间的数据,并移动到数组头部:
1 2 3 4 5 6 7 8 9 10 11
| public IntBuffer compact() { int pos = position(); int lim = limit(); assert (pos <= lim); int rem = (pos <= lim ? lim - pos : 0); System.arraycopy(hb, ix(pos), hb, ix(0), rem); position(rem); limit(capacity()); discardMark(); return this; }
|
比如现在的状态是:
那么我们在执行 compact()
方法之后,会进行截取,此时limit - position = 6
,那么就会截取第4、5、6、7、8、9
这6个数据然后丢到最前面,接着position跑到7
表示这是下一个继续的位置:
现在我们通过代码来检验一下:
1 2 3 4 5 6 7 8 9
| public static void main(String[] args) { IntBuffer buffer = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}); for (int i = 0; i < 4; i++) buffer.get(); buffer.compact();
System.out.println("压缩之后的情况:"+Arrays.toString(buffer.array())); System.out.println("当前position位置:"+buffer.position()); System.out.println("当前limit位置:"+buffer.limit()); }
|
可以看到最后的结果没有问题:
我们接着来看第二个方法,那么如果我们现在需要复制一个内容一模一样的的缓冲区,该怎么做?直接使用duplicate()
方法就可以复制了:
1 2 3 4 5 6 7 8
| public IntBuffer duplicate() { return new HeapIntBuffer(hb, this.markValue(), this.position(), this.limit(), this.capacity(), offset); }
|
那么各位猜想一下,如果通过这种方式创了一个新的IntBuffer,那么下面的例子会出现什么结果:
1 2 3 4 5 6 7
| public static void main(String[] args) { IntBuffer buffer = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5}); IntBuffer duplicate = buffer.duplicate();
System.out.println(buffer == duplicate); System.out.println(buffer.array() == duplicate.array()); }
|
由于buffer是重新new的,所以第一个为false,而底层的数组由于在构造的时候没有进行任何的拷贝而是直接传递,因此实际上两个缓冲区的底层数组是同一个对象。所以,一个发生修改,那么另一个就跟着变了:
1 2 3 4 5 6 7
| public static void main(String[] args) { IntBuffer buffer = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5}); IntBuffer duplicate = buffer.duplicate();
buffer.put(0, 66666); System.out.println(duplicate.get()); }
|
现在我们接着来看下一个方法,slice()
方法会将缓冲区进行划分:
1 2 3 4 5 6 7 8 9 10 11
| public IntBuffer slice() { int pos = this.position(); int lim = this.limit(); int rem = (pos <= lim ? lim - pos : 0); return new HeapIntBuffer(hb, -1, 0, rem, rem, pos + offset); }
|
虽然现在底层依然使用的是之前的数组,但是由于设定了offset值,我们之前的操作似乎变得不太一样了:
回顾前面我们所讲解的内容,在读取和存放时,会被ix
方法进行调整:
1 2 3 4 5 6 7
| protected int ix(int i) { return i + offset; }
public int get() { return hb[ix(nextGetIndex())]; }
|
当然,在逻辑上我们可以认为是这样的:
现在我们来测试一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public static void main(String[] args) { IntBuffer buffer = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}); for (int i = 0; i < 4; i++) buffer.get(); IntBuffer slice = buffer.slice();
System.out.println("划分之后的情况:"+Arrays.toString(slice.array())); System.out.println("划分之后的偏移地址:"+slice.arrayOffset()); System.out.println("当前position位置:"+slice.position()); System.out.println("当前limit位置:"+slice.limit());
while (slice.hasRemaining()) { System.out.print(slice.get()+", "); } }
|
可以看到,最终结果:
最后两个方法就比较简单了,我们先来看rewind()
,它相当于是对position和mark进行了一次重置:
1 2 3 4 5
| public final Buffer rewind() { position = 0; mark = -1; return this; }
|
接着是clear()
,它相当于是将整个缓冲区回归到最初的状态了:
1 2 3 4 5 6
| public final Buffer clear() { position = 0; limit = capacity; mark = -1; return this; }
|
到这里,关于缓冲区的一些其他操作,我们就讲解到此。
缓冲区比较
缓冲区之间是可以进行比较的,我们可以看到equals方法和compareTo方法都是被重写了的,我们首先来看看equals
方法,注意,它是判断两个缓冲区剩余的内容是否一致:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public boolean equals(Object ob) { if (this == ob) return true; if (!(ob instanceof IntBuffer)) return false; IntBuffer that = (IntBuffer)ob; int thisPos = this.position(); int thisLim = this.limit(); int thatPos = that.position(); int thatLim = that.limit(); int thisRem = thisLim - thisPos; int thatRem = thatLim - thatPos; if (thisRem < 0 || thisRem != thatRem) return false; for (int i = thisLim - 1, j = thatLim - 1; i >= thisPos; i--, j--) if (!equals(this.get(i), that.get(j))) return false; return true; }
private static boolean equals(int x, int y) { return x == y; }
|
那么我们按照它的思路来验证一下:
1 2 3 4 5 6 7 8 9
| public static void main(String[] args) { IntBuffer buffer1 = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}); IntBuffer buffer2 = IntBuffer.wrap(new int[]{6, 5, 4, 3, 2, 1, 7, 8, 9, 0}); System.out.println(buffer1.equals(buffer2)); buffer1.position(6); buffer2.position(6); System.out.println(buffer1.equals(buffer2)); }
|
可以看到结果就是我们所想的那样:
那么我们接着来看比较,compareTo
方法,它实际上是Comparable
接口提供的方法,它实际上比较的也是pos开始剩余的内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public int compareTo(IntBuffer that) { int thisPos = this.position(); int thisRem = this.limit() - thisPos; int thatPos = that.position(); int thatRem = that.limit() - thatPos; int length = Math.min(thisRem, thatRem); if (length < 0) return -1; int n = thisPos + Math.min(thisRem, thatRem); for (int i = thisPos, j = thatPos; i < n; i++, j++) { int cmp = compare(this.get(i), that.get(j)); if (cmp != 0) return cmp; } return thisRem - thatRem; }
private static int compare(int x, int y) { return Integer.compare(x, y); }
|
这里我们就不多做介绍了。
只读缓冲区
接着我们来看看只读缓冲区,只读缓冲区就像其名称一样,它只能进行读操作,而不允许进行写操作。
那么我们怎么创建只读缓冲区呢?
public abstract IntBuffer asReadOnlyBuffer();
- 基于当前缓冲区生成一个只读的缓冲区。
我们来看看此方法的具体实现:
1 2 3 4 5 6 7 8
| public IntBuffer asReadOnlyBuffer() { return new HeapIntBufferR(hb, this.markValue(), this.position(), this.limit(), this.capacity(), offset); }
|
那么这个HeapIntBufferR类跟我们普通的HeapIntBuffer有什么不同之处呢?
可以看到它是继承自HeapIntBuffer的,那么我们来看看它的实现有什么不同:
1 2 3 4 5 6 7
| protected HeapIntBufferR(int[] buf, int mark, int pos, int lim, int cap, int off) { super(buf, mark, pos, lim, cap, off); this.isReadOnly = true; }
|
可以看到在其构造方法中,除了直接调用父类的构造方法外,还会将isReadOnly
标记修改为true,我们接着来看put操作有什么不同之处:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public boolean isReadOnly() { return true; }
public IntBuffer put(int x) { throw new ReadOnlyBufferException(); }
public IntBuffer put(int i, int x) { throw new ReadOnlyBufferException(); }
public IntBuffer put(int[] src, int offset, int length) { throw new ReadOnlyBufferException(); }
public IntBuffer put(IntBuffer src) { throw new ReadOnlyBufferException(); }
|
可以看到所有的put方法全部凉凉,只要调用就会直接抛出ReadOnlyBufferException异常。但是其他get方法依然没有进行重写,也就是说get操作还是可以正常使用的,但是只要是写操作就都不行:
1 2 3 4 5 6 7 8
| public static void main(String[] args) { IntBuffer buffer = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}); IntBuffer readBuffer = buffer.asReadOnlyBuffer();
System.out.println(readBuffer.isReadOnly()); System.out.println(readBuffer.get()); readBuffer.put(0, 666); }
|
可以看到结果为:
这就是只读状态下的缓冲区。
ByteBuffer和CharBuffer
通过前面的学习,我们基本上已经了解了缓冲区的使用,但是都是基于IntBuffer进行讲解,现在我们来看看另外两种基本类型的缓冲区ByteBuffer和CharBuffer,因为ByteBuffer底层存放的是很多单个byte字节,所以会有更多的玩法,同样CharBuffer是一系列字节,所以也有很多便捷操作。
我们先来看看ByteBuffer,我们可以直接点进去看:
1 2 3 4 5
| public abstract class ByteBuffer extends Buffer implements Comparable<ByteBuffer> { final byte[] hb; final int offset; boolean isReadOnly; ....
|
可以看到如果也是使用堆缓冲区子类实现,那么依然是一个byte[]
的形式保存数据。我们来尝试使用一下:
1 2 3 4 5 6 7 8 9 10 11 12
| public static void main(String[] args) { ByteBuffer buffer = ByteBuffer.allocate(10); buffer.putInt(Integer.MAX_VALUE); System.out.println("当前缓冲区剩余字节数:"+buffer.remaining());
buffer.flip(); while (buffer.hasRemaining()) { System.out.println(buffer.get()); } }
|
最后的结果为:
可以看到第一个byte为127、然后三个都是-1,我们来分析一下:
127
转换为二进制补码形式就是 01111111
,而-1
转换为二进制补码形式为11111111
那也就是说,第一个字节是01111111,而后续字节就是11111111,把它们拼接在一起:
- 二进制补码表示
01111111 11111111 11111111 11111111
转换为十进制就是2147483647
,也就是int的最大值。
那么根据我们上面的推导,各位能否计算得到下面的结果呢?
1 2 3 4 5 6 7 8 9 10
| public static void main(String[] args) { ByteBuffer buffer = ByteBuffer.allocate(10); buffer.put((byte) 0); buffer.put((byte) 0); buffer.put((byte) 1); buffer.put((byte) -1);
buffer.flip(); System.out.println(buffer.getInt()); }
|
经过上面的计算,得到的结果就是:
- 上面的数据以二进制补码的形式表示为:
00000000 00000000 00000001 11111111
- 将其转换为十进制那么就是:256 + 255 = 511
好吧,再来个魔鬼问题,把第一个换成1呢:10000000 00000000 00000001 11111111
,自己算。
我们接着来看看CharBuffer,这种缓冲区实际上也是保存一大堆char类型的数据:
1 2 3 4 5
| public static void main(String[] args) { CharBuffer buffer = CharBuffer.allocate(10); buffer.put("lbwnb"); System.out.println(Arrays.toString(buffer.array())); }
|
但是正是得益于char数组,它包含了很多的字符串操作,可以一次性存放一整个字符串。我们甚至还可以将其当做一个String来进行处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public static void main(String[] args) { CharBuffer buffer = CharBuffer.allocate(10); buffer.put("lbwnb"); buffer.append("!"); System.out.println("剩余容量:"+buffer.remaining());
buffer.flip(); System.out.println("整个字符串为:"+buffer); System.out.println("第3个字符是:"+buffer.charAt(2));
buffer .chars() .filter(i -> i < 'l') .forEach(i -> System.out.print((char) i)); }
|
当然除了一些常规操作之外,我们还可以直接将一个字符串作为参数创建:
1 2 3 4 5 6 7
| public static void main(String[] args) { CharBuffer buffer = CharBuffer.wrap("收藏等于学会~"); System.out.println(buffer);
buffer.put("111"); }
|
可以看到结果也是我们预料中的:
对于这两个比较特殊的缓冲区,我们就暂时讲解到这里。
直接缓冲区
注意:推荐学习完成JVM篇再来学习这一部分。
最后我们来看一下直接缓冲区,我们前面一直使用的都是堆缓冲区,也就是说实际上数据是保存在一个数组中的,如果你已经完成了JVM篇的学习,一定知道实际上占用的是堆内存,而我们也可以创建一个直接缓冲区,也就是申请堆外内存进行数据保存,采用操作系统本地的IO,相比堆缓冲区会快一些。
那么怎么使用直接缓冲区呢?我们可以通过allocateDirect
方法来创建:
1 2 3 4 5 6 7 8
| public static void main(String[] args) { ByteBuffer buffer = ByteBuffer.allocateDirect(10); buffer.put((byte) 66); buffer.flip(); System.out.println(buffer.get()); }
|
我们来看看这个allocateDirect
方法是如何创建一个直接缓冲区的:
1 2 3
| public static ByteBuffer allocateDirect(int capacity) { return new DirectByteBuffer(capacity); }
|
这个方法直接创建了一个新的DirectByteBuffer对象,那么这个类又是怎么进行创建的呢?
可以看到它并不是直接继承自ByteBuffer,而是MappedByteBuffer,并且实现了接口DirectBuffer,我们先来看看这个接口:
1 2 3 4 5
| public interface DirectBuffer { public long address(); public Object attachment(); public Cleaner cleaner(); }
|
1 2 3 4 5 6
| public abstract class MappedByteBuffer extends ByteBuffer { public final MappedByteBuffer load(); public final boolean isLoaded(); public final MappedByteBuffer force(); }
|
接着我们来看看DirectByteBuffer类的成员变量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| protected static final Unsafe unsafe = Bits.unsafe();
private static final long arrayBaseOffset = (long)unsafe.arrayBaseOffset(byte[].class);
protected static final boolean unaligned = Bits.unaligned();
private final Object att;
|
接着我们来看看构造方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| DirectByteBuffer(int cap) { super(-1, 0, cap, cap); boolean pa = VM.isDirectMemoryPageAligned(); int ps = Bits.pageSize(); long size = Math.max(1L, (long)cap + (pa ? ps : 0)); Bits.reserveMemory(size, cap);
long base = 0; try { base = unsafe.allocateMemory(size); } catch (OutOfMemoryError x) { Bits.unreserveMemory(size, cap); throw x; } unsafe.setMemory(base, size, (byte) 0); if (pa && (base % ps != 0)) { address = base + ps - (base & (ps - 1)); } else { address = base; } cleaner = Cleaner.create(this, new Deallocator(base, size, cap)); att = null; }
|
可以看到在构造方法中,是直接通过Unsafe类来申请足够的堆外内存保存数据,那么当我们不使用此缓冲区时,内存会被如何清理呢?我们来看看这个Cleaner:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public class Cleaner extends PhantomReference<Object>{
private static final ReferenceQueue<Object> dummyQueue = new ReferenceQueue<>(); private final Runnable thunk; static private Cleaner first = null;
private Cleaner next = null, prev = null; private static synchronized Cleaner add(Cleaner cl) { if (first != null) { cl.next = first; first.prev = cl; } first = cl; return cl; }
private Cleaner(Object referent, Runnable thunk) { super(referent, dummyQueue); this.thunk = thunk; }
public static Cleaner create(Object ob, Runnable thunk) { if (thunk == null) return null; return add(new Cleaner(ob, thunk)); } public void clean() { if (!remove(this)) return; try { thunk.run(); } catch (final Throwable x) { ... } }
|
那么我们先来看看具体的清理程序在做些什么,Deallocator是在直接缓冲区中声明的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| private static class Deallocator implements Runnable {
private static Unsafe unsafe = Unsafe.getUnsafe();
private long address; private long size; private int capacity;
private Deallocator(long address, long size, int capacity) { assert (address != 0); this.address = address; this.size = size; this.capacity = capacity; }
public void run() { if (address == 0) { return; } unsafe.freeMemory(address); address = 0; Bits.unreserveMemory(size, capacity); } }
|
好了,现在我们可以明确在清理的时候实际上也是调用Unsafe类进行内存释放操作,那么,这个清理操作具体是在什么时候进行的呢?首先我们要明确,如果是普通的堆缓冲区,由于使用的数组,那么一旦此对象没有任何引用时,就随时都会被GC给回收掉,但是现在是堆外内存,只能我们手动进行内存回收,那么当DirectByteBuffer也失去引用时,会不会触发内存回收呢?
答案是可以的,还记得我们刚刚看到Cleaner是PhantomReference的子类吗,而DirectByteBuffer是被鬼引用的对象,而具体的清理操作是Cleaner类的clean方法,莫非这两者有什么联系吗?
你别说,还真有,我们直接看到PhantomReference的父类Reference,我们会发现这样一个类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| private static class ReferenceHandler extends Thread { ... static { ensureClassInitialized(InterruptedException.class); ensureClassInitialized(Cleaner.class); } public void run() { while (true) { tryHandlePending(true); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| private T referent;
volatile ReferenceQueue<? super T> queue;
volatile Reference next;
transient private Reference<T> discovered;
private static Reference<Object> pending = null;
|
1 2 3 4 5 6 7 8 9 10 11 12
| static { ThreadGroup tg = Thread.currentThread().getThreadGroup(); for (ThreadGroup tgn = tg; tgn != null; tg = tgn, tgn = tg.getParent()); Thread handler = new ReferenceHandler(tg, "Reference Handler"); handler.setPriority(Thread.MAX_PRIORITY); handler.setDaemon(true); handler.start();
... }
|
那么也就是说Reference Handler线程是在一开始就启动了,那么我们的关注点可以放在tryHandlePending
方法上,看看这玩意到底在做个啥:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| static boolean tryHandlePending(boolean waitForNotify) { Reference<Object> r; Cleaner c; try { synchronized (lock) { if (pending != null) { r = pending; c = r instanceof Cleaner ? (Cleaner) r : null; pending = r.discovered; r.discovered = null; } else { if (waitForNotify) { lock.wait(); } return waitForNotify; } } } catch (OutOfMemoryError x) { Thread.yield(); return true; } catch (InterruptedException x) { return true; }
if (c != null) { c.clean(); return true; }
ReferenceQueue<? super Object> q = r.queue; if (q != ReferenceQueue.NULL) q.enqueue(r); return true; }
|
通过对源码的解读,我们就了解了直接缓冲区的内存加载释放整个流程。和堆缓冲区一样,当直接缓冲区没有任何强引用时,就有机会被GC正常回收掉并自动释放申请的内存。
我们接着来看看直接缓冲区的读写操作是如何进行的:
1 2 3
| public byte get() { return ((unsafe.getByte(ix(nextGetIndex())))); }
|
1 2 3
| private long ix(int i) { return address + ((long)i << 0); }
|
我们接着来看看写操作:
1 2 3 4
| public ByteBuffer put(byte x) { unsafe.putByte(ix(nextPutIndex()), ((x))); return this; }
|
可以看到无论是读取还是写入操作都是通过Unsafe类操作对应的内存地址完成的。
那么它的复制操作是如何实现的呢?
1 2 3 4 5 6 7 8
| public ByteBuffer duplicate() { return new DirectByteBuffer(this, this.markValue(), this.position(), this.limit(), this.capacity(), 0); }
|
1 2 3 4 5 6 7 8
| DirectByteBuffer(DirectBuffer db, int mark, int pos, int lim, int cap, int off) { super(mark, pos, lim, cap); address = db.address() + off; cleaner = null; att = db; }
|
可以看到,如果是进行复制操作,那么会直接会继续使用执行复制操作的DirectByteBuffer申请的内存空间。不知道各位是否能够马上联想到一个问题,我们知道,如果执行复制操作的DirectByteBuffer对象失去了强引用被回收,那么就会触发Cleaner并进行内存释放,但是有个问题就是,这段内存空间可能复制出来的DirectByteBuffer对象还需要继续使用,这时肯定是不能进行回收的,所以说这里使用了att变量将之前的DirectByteBuffer对象进行引用,以防止其失去强引用被垃圾回收,所以只要不是原来的DirectByteBuffer对象和复制出来的DirectByteBuffer对象都失去强引用时,就不会导致这段内存空间被回收。
这样,我们之前的未解之谜为啥有个att
也就得到答案了,有关直接缓冲区的介绍,就到这里为止。
通道
前面我们学习了NIO的基石——缓冲区,那么缓冲区具体用在什么地方呢,在本板块我们学习通道之后,相信各位就能知道了。那么,什么是通道呢?
在传统IO中,我们都是通过流进行传输,数据会源源不断从流中传出;而在NIO中,数据是放在缓冲区中进行管理,再使用通道将缓冲区中的数据传输到目的地。
通道接口层次
通道的根基接口是Channel
,所以的派生接口和类都是从这里开始的,我们来看看它定义了哪些基本功能:
1 2 3 4 5 6 7
| public interface Channel extends Closeable { public boolean isOpen();
public void close() throws IOException; }
|
我们接着来看看它的一些子接口,首先是最基本的读写操作:
1 2 3 4
| public interface ReadableByteChannel extends Channel { public int read(ByteBuffer dst) throws IOException; }
|
1 2 3 4
| public interface WritableByteChannel extends Channel { public int write(ByteBuffer src) throws IOException; }
|
有了读写功能后,最后整合为了一个ByteChannel接口:
1 2 3
| public interface ByteChannel extends ReadableByteChannel, WritableByteChannel{
}
|
在ByteChannel之下,还有更多的派生接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public interface SeekableByteChannel extends ByteChannel { ...
long position() throws IOException;
SeekableByteChannel position(long newPosition) throws IOException;
long size() throws IOException;
SeekableByteChannel truncate(long size) throws IOException; }
|
接着我们来看,除了读写之外,Channel还可以具有响应中断的能力:
1 2 3 4
| public interface InterruptibleChannel extends Channel { public void close() throws IOException; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public abstract class AbstractInterruptibleChannel implements Channel, InterruptibleChannel { private final Object closeLock = new Object(); private volatile boolean open = true;
protected AbstractInterruptibleChannel() { }
public final void close() throws IOException { synchronized (closeLock) { if (!open) return; open = false; implCloseChannel(); } }
protected abstract void implCloseChannel() throws IOException;
public final boolean isOpen() { return open; }
protected final void begin() { ... }
protected final void end(boolean completed) ... } ... }
|
而之后的一些实现类,都是基于这些接口定义的方法去进行实现的,比如FileChannel:
这样,我们就大致了解了一下通道相关的接口定义,那么我来看看具体是如何如何使用的。
比如现在我们要实现从输入流中读取数据然后打印出来,那么之前传统IO的写法:
1 2 3 4 5 6 7 8 9 10 11 12
| public static void main(String[] args) throws IOException { byte[] data = new byte[10]; InputStream in = System.in; while (true) { int len; while ((len = in.read(data)) >= 0) { System.out.print("读取到一批数据:"+new String(data, 0, len)); } } }
|
而现在我们使用通道之后:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public static void main(String[] args) throws IOException { ByteBuffer buffer = ByteBuffer.allocate(10); ReadableByteChannel readChannel = Channels.newChannel(System.in); while (true) { readChannel.read(buffer); buffer.flip(); System.out.println("读取到一批数据:"+new String(buffer.array(), 0, buffer.remaining())); buffer.clear(); } }
|
乍一看,好像感觉也没啥区别,不就是把数组换成缓冲区了吗,效果都是一样的,数据也是从Channel中读取得到,并且通过缓冲区进行数据装载然后得到结果,但是,Channel不像流那样是单向的,它就像它的名字一样,一个通道可以从一端走到另一端,也可以从另一端走到这一端,我们后面进行介绍。
文件传输FileChannel
前面我们介绍了通道的基本情况,这里我们就来尝试实现一下文件的读取和写入,在传统IO中,文件的写入和输出都是依靠FileOutputStream和FileInputStream来完成的:
1 2 3 4 5 6 7 8 9 10 11 12
| public static void main(String[] args) throws IOException { try(FileOutputStream out = new FileOutputStream("test.txt"); FileInputStream in = new FileInputStream("test.txt")){ String data = "伞兵一号卢本伟准备就绪!"; out.write(data.getBytes()); out.flush();
byte[] bytes = new byte[in.available()]; in.read(bytes); System.out.println(new String(bytes)); } }
|
而现在,我们只需要通过一个FileChannel就可以完成这两者的操作,获取文件通道的方式有以下几种:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public static void main(String[] args) throws IOException { FileInputStream in = new FileInputStream("test.txt"); FileChannel channel = in.getChannel(); ByteBuffer buffer = ByteBuffer.allocate(128); channel.read(buffer); buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.remaining())); }
|
可以看到通过输入流获取的文件通道读取是没有任何问题的,但是写入操作:
1 2 3 4 5 6 7 8
| public static void main(String[] args) throws IOException { FileInputStream in = new FileInputStream("test.txt"); FileChannel channel = in.getChannel(); channel.write(ByteBuffer.wrap("伞兵一号卢本伟准备就绪!".getBytes())); }
|
直接报错,说明只支持读取操作,那么输出流呢?
1 2 3 4 5 6 7 8
| public static void main(String[] args) throws IOException { FileOutputStream out = new FileOutputStream("test.txt"); FileChannel channel = out.getChannel(); channel.write(ByteBuffer.wrap("伞兵一号卢本伟准备就绪!".getBytes())); }
|
可以看到能够正常进行写入,但是读取呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public static void main(String[] args) throws IOException { FileOutputStream out = new FileOutputStream("test.txt"); FileChannel channel = out.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(128); channel.read(buffer); buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.remaining())); }
|
可以看到输出流生成的Channel又不支持读取,所以说本质上还是保持着输入输出流的特性,但是之前不是说Channel又可以输入又可以输出吗?这里我们来看看第二种方式:
1 2
| public class RandomAccessFile implements DataOutput, DataInput, Closeable {
|
我们可以通过RandomAccessFile来创建通道:
1 2 3 4 5 6 7 8 9 10 11 12
| public static void main(String[] args) throws IOException {
try(RandomAccessFile f = new RandomAccessFile("test.txt", "")){ } }
|
现在我们来测试一下它的读写操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public static void main(String[] args) throws IOException {
try(RandomAccessFile f = new RandomAccessFile("test.txt", "rw"); FileChannel channel = f.getChannel()){ channel.write(ByteBuffer.wrap("伞兵二号马飞飞准备就绪!".getBytes()));
System.out.println("写操作完成之后文件访问位置:"+channel.position()); channel.position(0);
ByteBuffer buffer = ByteBuffer.allocate(128); channel.read(buffer); buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.remaining())); } }
|
可以看到,一个FileChannel既可以完成文件读取,也可以完成文件的写入。
除了基本的读写操作,我们也可以直接对文件进行截断:
1 2 3 4 5 6 7 8 9 10 11 12
| public static void main(String[] args) throws IOException { try(RandomAccessFile f = new RandomAccessFile("test.txt", "rw"); FileChannel channel = f.getChannel()){ channel.truncate(20);
ByteBuffer buffer = ByteBuffer.allocate(128); channel.read(buffer); buffer.flip(); System.out.println(new String(buffer.array(), 0, buffer.remaining())); } }
|
可以看到文件的内容直接被截断了,文件内容就只剩一半了。
当然,如果我们要进行文件的拷贝,也是很方便的,只需要使用通道就可以,比如我们现在需要将一个通道的数据写入到另一个通道,就可以直接使用transferTo方法:
1 2 3 4 5 6 7 8
| public static void main(String[] args) throws IOException { try(FileOutputStream out = new FileOutputStream("test2.txt"); FileInputStream in = new FileInputStream("test.txt")){
FileChannel inChannel = in.getChannel(); inChannel.transferTo(0, inChannel.size(), out.getChannel()); } }
|
可以看到执行后,文件的内容全部被复制到另一个文件了。
当然,反向操作也是可以的:
1 2 3 4 5 6 7 8
| public static void main(String[] args) throws IOException { try(FileOutputStream out = new FileOutputStream("test2.txt"); FileInputStream in = new FileInputStream("test.txt")){
FileChannel inChannel = in.getChannel(); out.getChannel().transferFrom(inChannel, 0, inChannel.size()); } }
|
当我们要编辑某个文件时,通过使用MappedByteBuffer类,可以将其映射到内存中进行编辑,编辑的内容会同步更新到文件中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| try(RandomAccessFile f = new RandomAccessFile("test.txt", "rw"); FileChannel channel = f.getChannel()){
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 4, 10);
buffer.put("yyds".getBytes());
buffer.force(); }
|
可以看到,文件的某一个区域已经被我们修改了,并且这里实际上使用的就是DirectByteBuffer直接缓冲区,效率还是很高的。
文件锁FileLock
我们可以创建一个跨进程文件锁来防止多个进程之间的文件争抢操作(注意这里是进程,不是线程)FileLock是文件锁,它能保证同一时间只有一个进程(程序)能够修改它,或者都只可以读,这样就解决了多进程间的同步文件,保证了安全性。但是需要注意的是,它进程级别的,不是线程级别的,他可以解决多个进程并发访问同一个文件的问题,但是它不适用于控制同一个进程中多个线程对一个文件的访问。
那么我们来看看如何使用文件锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public static void main(String[] args) throws IOException, InterruptedException { RandomAccessFile f = new RandomAccessFile("test.txt", "rw"); FileChannel channel = f.getChannel(); System.out.println(new Date() + " 正在尝试获取文件锁..."); FileLock lock = channel.lock(0, 6, false); System.out.println(new Date() + " 已获取到文件锁!"); Thread.sleep(5000); System.out.println(new Date() + " 操作完毕,释放文件锁!"); lock.release(); }
|
有关共享锁和独占锁:
- 进程对文件加独占锁后,当前进程对文件可读可写,独占此文件,其它进程是不能读该文件进行读写操作的。
- 进程对文件加共享锁后,进程可以对文件进行读操作,但是无法进行写操作,共享锁可以被多个进程添加,但是只要存在共享锁,就不能添加独占锁。
现在我们来启动两个进程试试看,我们需要在IDEA中配置一下两个启动项:
现在我们依次启动它们:
可以看到确实是两个进程同一时间只能有一个进行访问,而另一个需要等待锁释放。
那么如果我们申请的是文件的不同部分呢?
1 2 3 4
| FileLock lock = channel.lock(0, 6, false);
FileLock lock = channel.lock(6, 6, false);
|
可以看到,两个进程这时就可以同时进行加锁操作了,因为它们锁的是不同的段落。
那么要是交叉呢?
1 2 3 4
| FileLock lock = channel.lock(0, 6, false);
FileLock lock = channel.lock(3, 6, false);
|
可以看到交叉的情况下也是会出现阻塞的。
接着我们来看看共享锁,共享锁允许多个进程同时加锁,但是不能进行写操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public static void main(String[] args) throws IOException, InterruptedException { RandomAccessFile f = new RandomAccessFile("test.txt", "rw"); FileChannel channel = f.getChannel(); System.out.println(new Date() + " 正在尝试获取文件锁..."); FileLock lock = channel.lock(0, Long.MAX_VALUE, true); System.out.println(new Date() + " 已获取到文件锁!"); channel.write(ByteBuffer.wrap(new Date().toString().getBytes())); System.out.println(new Date() + " 操作完毕,释放文件锁!"); lock.release(); }
|
当我们进行写操作时:
可以看到直接抛出异常,说另一个程序已锁定文件的一部分,进程无法访问(某些系统或是环境实测无效,比如UP主的arm架构MacOS就不生效,这个异常是在Windows环境下运行得到的)
当然,我们也可以测试一下多个进行同时加共享锁:
1 2 3 4 5 6 7 8 9 10 11 12
| public static void main(String[] args) throws IOException, InterruptedException { RandomAccessFile f = new RandomAccessFile("test.txt", "rw"); FileChannel channel = f.getChannel(); System.out.println(new Date() + " 正在尝试获取文件锁...");
FileLock lock = channel.lock(0, Long.MAX_VALUE, true); System.out.println(new Date() + " 已获取到文件锁!"); Thread.sleep(5000); System.out.println(new Date() + " 操作完毕,释放文件锁!"); lock.release(); }
|
可以看到结果是多个进程都能加共享锁:
当然,除了直接使用lock()
方法进行加锁之外,我们也可以使用tryLock()
方法以非阻塞方式获取文件锁,但是如果获取锁失败会得到null:
1 2 3 4 5 6 7 8 9 10 11
| public static void main(String[] args) throws IOException, InterruptedException { RandomAccessFile f = new RandomAccessFile("test.txt", "rw"); FileChannel channel = f.getChannel(); System.out.println(new Date() + " 正在尝试获取文件锁...");
FileLock lock = channel.tryLock(0, Long.MAX_VALUE, false); System.out.println(lock); Thread.sleep(5000);
lock.release(); }
|
可以看到,两个进程都去尝试获取独占锁:
第一个成功加锁的进程获得了对应的锁对象,而第二个进程直接得到的是null
。
到这里,有关文件锁的相关内容就差不多了。
多路复用网络通信
前面我们已经介绍了NIO框架的两大核心:Buffer和Channel,我们接着来看看最后一个内容。
传统阻塞I/O网络通信
说起网络通信,相信各位并不陌生,正是因为网络的存在我们才能走进现代化的社会,在JavaWeb阶段,我们学习了如何使用Socket建立TCP连接进行网络通信:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public static void main(String[] args) { try(ServerSocket server = new ServerSocket(8080)){ System.out.println("正在等待客户端连接..."); Socket socket = server.accept(); System.out.println("客户端已连接,IP地址为:"+socket.getInetAddress().getHostAddress()); BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); System.out.print("接收到客户端数据:"); System.out.println(reader.readLine()); OutputStreamWriter writer = new OutputStreamWriter(socket.getOutputStream()); writer.write("已收到!"); writer.flush(); }catch (IOException e){ e.printStackTrace(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public static void main(String[] args) { try (Socket socket = new Socket("localhost", 8080); Scanner scanner = new Scanner(System.in)){ System.out.println("已连接到服务端!"); OutputStream stream = socket.getOutputStream(); OutputStreamWriter writer = new OutputStreamWriter(stream); System.out.println("请输入要发送给服务端的内容:"); String text = scanner.nextLine(); writer.write(text+'\n'); writer.flush(); System.out.println("数据已发送:"+text); BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); System.out.println("收到服务器返回:"+reader.readLine()); }catch (IOException e){ System.out.println("服务端连接失败!"); e.printStackTrace(); }finally { System.out.println("客户端断开连接!"); } }
|
当然,我们也可以使用前面讲解的通道来进行通信:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public static void main(String[] args) { try (ServerSocketChannel serverChannel = ServerSocketChannel.open()){ serverChannel.bind(new InetSocketAddress(8080)); SocketChannel socket = serverChannel.accept(); System.out.println("客户端已连接,IP地址为:"+socket.getRemoteAddress());
ByteBuffer buffer = ByteBuffer.allocate(128); socket.read(buffer); buffer.flip(); System.out.print("接收到客户端数据:"+new String(buffer.array(), 0, buffer.remaining()));
socket.write(ByteBuffer.wrap("已收到!".getBytes()));
socket.close(); } catch (IOException e) { throw new RuntimeException(e); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public static void main(String[] args) { try (SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 8080)); Scanner scanner = new Scanner(System.in)){ System.out.println("已连接到服务端!"); System.out.println("请输入要发送给服务端的内容:"); String text = scanner.nextLine(); channel.write(ByteBuffer.wrap(text.getBytes()));
ByteBuffer buffer = ByteBuffer.allocate(128); channel.read(buffer); buffer.flip(); System.out.println("收到服务器返回:"+new String(buffer.array(), 0, buffer.remaining())); } catch (IOException e) { throw new RuntimeException(e); } }
|
虽然可以通过传统的Socket进行网络通信,但是我们发现,如果要进行IO操作,我们需要单独创建一个线程来进行处理,比如现在有很多个客户端,服务端需要同时进行处理,那么如果我们要处理这些客户端的请求,那么我们就只能单独为其创建一个线程来进行处理:
虽然这样看起来比较合理,但是随着客户端数量的增加,如果要保持持续通信,那么就不能摧毁这些线程,而是需要一直保留(但是实际上很多时候只是保持连接,一直在阻塞等待客户端的读写操作,IO操作的频率很低,这样就白白占用了一条线程,很多时候都是站着茅坑不拉屎),但是我们的线程不可能无限制的进行创建,总有一天会耗尽服务端的资源,那么现在怎么办呢,关键是现在又有很多客户端源源不断地连接并进行操作,这时,我们就可以利用NIO为我们提供的多路复用编程模型。
我们来看看NIO为我们提供的模型:
服务端不再是一个单纯通过accept()
方法来创建连接的机制了,而是根据客户端不同的状态,Selector会不断轮询,只有客户端在对应的状态时,比如真正开始读写操作时,才会创建线程或进行处理(这样就不会一直阻塞等待某个客户端的IO操作了),而不是创建之后需要一直保持连接,即使没有任何的读写操作。这样就不会因为占着茅坑不拉屎导致线程无限制地创建下去了。
通过这种方式,甚至单线程都能做到高效的复用,最典型的例子就是Redis了,因为内存的速度非常快,多线程上下文的开销就会显得有些拖后腿,还不如直接单线程简单高效,这也是为什么Redis单线程也能这么快的原因。
因此,我们就从NIO框架的第三个核心内容:Selector,开始讲起。
选择器与I/O多路复用
前面我们大概了解了一下选择器,我们知道,选择器是当具体有某一个状态(比如读、写、请求)已经就绪时,才会进行处理,而不是让我们的程序主动地进行等待。
既然我们现在需要实现IO多路复用,那么我们来看看常见的IO多路复用模型,也就是Selector的实现方案,比如现在有很多个用户连接到我们的服务器:
- select:当这些连接出现具体的某个状态时,只是知道已经就绪了,但是不知道详具体是哪一个连接已经就绪,每次调用都进行线性遍历所有连接,时间复杂度为
O(n)
,并且存在最大连接数限制。
- poll:同上,但是由于底层采用链表,所以没有最大连接数限制。
- epoll:采用事件通知方式,当某个连接就绪,能够直接进行精准通知(这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的,只要就绪会会直接回调callback函数,实现精准通知,但是只有Linux支持这种方式),时间复杂度
O(1)
,Java在Linux环境下正是采用的这种模式进行实现的。
好了,既然多路复用模型了解完毕了,那么我们就来看看如何让我们的网络通信实现多路复用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public static void main(String[] args) { try (ServerSocketChannel serverChannel = ServerSocketChannel.open(); Selector selector = Selector.open()){ serverChannel.bind(new InetSocketAddress(8080)); serverChannel.configureBlocking(false); serverChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { int count = selector.select(); System.out.println("监听到 "+count+" 个事件"); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if(key.isAcceptable()) { SocketChannel channel = serverChannel.accept(); System.out.println("客户端已连接,IP地址为:"+channel.getRemoteAddress()); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); } else if(key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(128); channel.read(buffer); buffer.flip(); System.out.println("接收到客户端数据:"+new String(buffer.array(), 0, buffer.remaining()));
channel.write(ByteBuffer.wrap("已收到!".getBytes())); } iterator.remove(); } } } catch (IOException e) { throw new RuntimeException(e); } }
|
接着我们来编写一下客户客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public static void main(String[] args) { try (SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 8080)); Scanner scanner = new Scanner(System.in)){ System.out.println("已连接到服务端!"); while (true) { System.out.println("请输入要发送给服务端的内容:"); String text = scanner.nextLine(); channel.write(ByteBuffer.wrap(text.getBytes())); System.out.println("已发送!"); ByteBuffer buffer = ByteBuffer.allocate(128); channel.read(buffer); buffer.flip(); System.out.println("收到服务器返回:"+new String(buffer.array(), 0, buffer.remaining())); } } catch (IOException e) { throw new RuntimeException(e); } }
|
我们来看看效果:
可以看到成功实现了,当然各位也可以跟自己的室友一起开客户端进行测试,现在,我们只用了一个线程,就能够同时处理多个请求,可见多路复用是多么重要。
实现Reactor模式
前面我们简单实现了多路复用网络通信,我们接着来了解一下Reactor模式,对我们的服务端进行优化。
现在我们来看看如何进行优化,我们首先抽象出两个组件,Reactor线程和Handler处理器:
- Reactor线程:负责响应IO事件,并分发到Handler处理器。新的事件包含连接建立就绪、读就绪、写就绪等。
- Handler处理器:执行非阻塞的操作。
实际上我们之前编写的算是一种单线程Reactor的朴素模型(面向过程的写法),我们来看看标准的写法:
客户端还是按照我们上面的方式连接到Reactor,并通过选择器走到Acceptor或是Handler,Acceptor主要负责客户端连接的建立,Handler负责读写操作,代码如下,首先是Handler:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class Handler implements Runnable{
private final SocketChannel channel;
public Handler(SocketChannel channel) { this.channel = channel; }
@Override public void run() { try { ByteBuffer buffer = ByteBuffer.allocate(128); channel.read(buffer); buffer.flip(); System.out.println("接收到客户端数据:"+new String(buffer.array(), 0, buffer.remaining())); channel.write(ByteBuffer.wrap("已收到!".getBytes())); }catch (IOException e){ e.printStackTrace(); } } }
|
接着是Acceptor,实际上就是把上面的业务代码搬个位置罢了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
public class Acceptor implements Runnable{
private final ServerSocketChannel serverChannel; private final Selector selector;
public Acceptor(ServerSocketChannel serverChannel, Selector selector) { this.serverChannel = serverChannel; this.selector = selector; }
@Override public void run() { try{ SocketChannel channel = serverChannel.accept(); System.out.println("客户端已连接,IP地址为:"+channel.getRemoteAddress()); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ, new Handler(channel)); }catch (IOException e){ e.printStackTrace(); } } }
|
这里我们在注册时丢了一个附加对象进去,这个附加对象会在选择器选择到此通道上时,可以通过attachment()
方法进行获取,对于我们简化代码有大作用,一会展示,我们接着来看看Reactor:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public class Reactor implements Closeable, Runnable{
private final ServerSocketChannel serverChannel; private final Selector selector; public Reactor() throws IOException{ serverChannel = ServerSocketChannel.open(); selector = Selector.open(); }
@Override public void run() { try { serverChannel.bind(new InetSocketAddress(8080)); serverChannel.configureBlocking(false); serverChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(serverChannel, selector)); while (true) { int count = selector.select(); System.out.println("监听到 "+count+" 个事件"); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { this.dispatch(iterator.next()); iterator.remove(); } } }catch (IOException e) { e.printStackTrace(); } }
private void dispatch(SelectionKey key){ Object att = key.attachment(); if(att instanceof Runnable) { ((Runnable) att).run(); } }
@Override public void close() throws IOException { serverChannel.close(); selector.close(); } }
|
最后我们编写一下主类:
1 2 3 4 5 6 7 8
| public static void main(String[] args) { try (Reactor reactor = new Reactor()){ reactor.run(); }catch (IOException e) { e.printStackTrace(); } }
|
这样,我们就实现了单线程Reactor模式,注意全程使用到的都只是一个线程,没有创建新的线程来处理任何事情。
但是单线程始终没办法应对大量的请求,如果请求量上去了,单线程还是很不够用,接着我们来看看多线程Reactor模式,它创建了多个线程处理,我们可以将数据读取完成之后的操作交给线程池来执行:
其实我们只需要稍微修改一下Handler就行了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class Handler implements Runnable{ private static final ExecutorService POOL = Executors.newFixedThreadPool(10); private final SocketChannel channel; public Handler(SocketChannel channel) { this.channel = channel; }
@Override public void run() { try { ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer); buffer.flip(); POOL.submit(() -> { try { System.out.println("接收到客户端数据:"+new String(buffer.array(), 0, buffer.remaining())); channel.write(ByteBuffer.wrap("已收到!".getBytes())); }catch (IOException e){ e.printStackTrace(); } }); } catch (IOException e) { throw new RuntimeException(e); } } }
|
这样,在数据读出之后,就可以将数据处理交给线程池执行。
但是这样感觉还是划分的不够,一个Reactor需要同时处理来自客户端的所有操作请求,显得有些乏力,那么不妨我们将Reactor做成一主多从的模式,让主Reactor只负责Accept操作,而其他的Reactor进行各自的其他操作:
现在我们来重新设计一下我们的代码,Reactor类就作为主节点,不进行任何修改,我们来修改一下其他的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| public class SubReactor implements Runnable, Closeable { private final Selector selector; private static final ExecutorService POOL = Executors.newFixedThreadPool(4); private static final SubReactor[] reactors = new SubReactor[4]; private static int selectedIndex = 0; static { for (int i = 0; i < 4; i++) { try { reactors[i] = new SubReactor(); POOL.submit(reactors[i]); } catch (IOException e) { e.printStackTrace(); } } } public static Selector nextSelector(){ Selector selector = reactors[selectedIndex].selector; selectedIndex = (selectedIndex + 1) % 4; return selector; }
private SubReactor() throws IOException { selector = Selector.open(); }
@Override public void run() { try { while (true) { int count = selector.select(); System.out.println(Thread.currentThread().getName()+" >> 监听到 "+count+" 个事件"); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { this.dispatch(iterator.next()); iterator.remove(); } } }catch (IOException e) { e.printStackTrace(); } }
private void dispatch(SelectionKey key){ Object att = key.attachment(); if(att instanceof Runnable) { ((Runnable) att).run(); } }
@Override public void close() throws IOException { selector.close(); } }
|
我们接着来修改一下Acceptor类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class Acceptor implements Runnable{
private final ServerSocketChannel serverChannel;
public Acceptor(ServerSocketChannel serverChannel) { this.serverChannel = serverChannel; }
@Override public void run() { try{ SocketChannel channel = serverChannel.accept(); System.out.println(Thread.currentThread().getName()+" >> 客户端已连接,IP地址为:"+channel.getRemoteAddress()); channel.configureBlocking(false); Selector selector = SubReactor.nextSelector(); selector.wakeup(); channel.register(selector, SelectionKey.OP_READ, new Handler(channel)); }catch (IOException e){ e.printStackTrace(); } } }
|
现在,SocketChannel相关的操作就由从Reactor进行处理了,而不是一律交给主Reactor进行操作。
至此,我们已经了解了NIO的三大组件:Buffer、Channel、Selector,有关NIO基础相关的内容,就讲解到这里。下一章我们将继续讲解基于NIO实现的高性能网络通信框架Netty。