advanced java (补三)NIO

Java NIO

Java 在 jdk1.4 版本加入了 java.nio 包,提供了一系列改进的输入/输出处理的新特性,被统称为NIO( 即 New I/O ) 。其中 nio 将IO处理抽象成 Buffer 缓冲区,Channel 管道 和 Selector选择器 三个概念。三者组成了这个核心的API。Buffer 表示缓存的数据,nio模型都是基于Buffer 读写数据。Channel 用于表示各种类型 IO 的数据读写,将数据读入或写在 Buffer 上。Selector 用于响应处理 Channel 的事件,其中多个 Channel 可以注册在同一个 Selector 上。然后就是 Buffer 的 Scatter / Gather 的概念。以及用于各种 Channel ,包括用于文件IO的 FileChannel ,网络IO 包括 TCP 的 ServerSocketChannel ,UDP 的 DataGramChannel ,以及异步文件通道 AsynchronousFileChannel 。最后就是关于管道,路径和文件的帮助类。

核心部分

NIO主要有三个核心部分组成:

Scatter / Gather

Scattering read指的是从通道读取的操作能把数据写入多个buffer

ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body   = ByteBuffer.allocate(1024);

ByteBuffer[] bufferArray = { header, body };

channel.read(bufferArray);

gathering write 表示的是从多个buffer把数据写入到一个channel中

ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body   = ByteBuffer.allocate(1024);

//write data into buffers

ByteBuffer[] bufferArray = { header, body };

channel.write(bufferArray);

FileChannel (文件IO)

首先 磁盘IO 由于磁盘文件(也就是regular file)是没有就绪这过程的,它们随时是就绪的,模型上是这样的,因此不适用同步非阻塞(non-blocking io)。同步非阻塞适用于网络IO但是对硬盘IO不起作用,磁盘只能通过阻塞传输数据。

使用传统的 java.io 来读写文件数据,是面向 数据流 的输入输出,同时需要 OutputStream 和 InputStream 两种流 (但是实际上,旧的io包已经使用NIO重新实现过,同样具有效率)

String filePath = "/tmp/test.txt";
File file = new File(filePath);
//输出流写入数据
try {
    OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(file));
    BufferedWriter br = new BufferedWriter(writer);
    //写入数据
    writer.write("aaaaaaa\n".toCharArray());
    writer.flush();

    writer.write("bbbbbbb\n".toCharArray());
    writer.flush();

    writer.write("ccccccc\n".toCharArray());
    writer.flush();
    writer.close();

} catch (IOException e) {
    e.printStackTrace();
}

//输入流读取数据
try {
    InputStreamReader reader = new InputStreamReader(new FileInputStream(file));
    BufferedReader br = new BufferedReader(reader);
    //读取数据
    char data[] = new char[8];
    while (br.read(data)  !=  -1){
        System.out.print(data);
    }
} catch (IOException e) {
    e.printStackTrace();
}

而如果使用 java.nio ,是面向 缓冲区 的输入和输出,只需要一个缓冲就可以同时读写数据

//缓冲区,需要注意的是 HeapByteBuffer 是堆外内存。如果没有一个良好的设计,可能存在内存泄漏的问题
ByteBuffer bf = ByteBuffer.allocate(8);
try {
    // 通过缓冲写入数据
    FileChannel outFc  = new FileOutputStream("/tmp/test.txt",true).getChannel();
    bf.put("aaaaaaa\n".getBytes());
    bf.flip();
    outFc.write(bf);
    bf.clear();

    bf.put("bbbbbbb\n".getBytes());
    bf.flip();
    outFc.write(bf);
    bf.clear();

    bf.put("ccccccc\n".getBytes());
    bf.flip();
    outFc.write(bf);
    bf.clear();
    outFc.close();

    //通过缓冲读取数据
    FileChannel inFc = new FileInputStream("/tmp/test.txt").getChannel();
    while (inFc.read(bf)  !=  -1){
        System.out.print(new String(bf.array()));
        bf.clear();
    }
    outFc.close();

} catch (IOException e) {
    e.printStackTrace();
}

ServerSocketChannel (网络IO,TCP)

下面是一个模拟HTTP请求,访问 http://127.0.0.1:18081/ , 返回 Hello World 的例子

// 创建网络服务端
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 设置为非阻塞模式 - 默认都是阻塞的
serverSocketChannel.socket().bind(new InetSocketAddress(18081)); // 绑定端口
System.out.println("启动成功");
while (true) {
    SocketChannel socketChannel = serverSocketChannel.accept(); // 获取新tcp连接通道
    // tcp请求 读取/响应
    if (socketChannel != null) {
        System.out.println("收到新连接 : " + socketChannel.getRemoteAddress());
        socketChannel.configureBlocking(false); // 默认是阻塞的,一定要设置为非阻塞
        try {
            ByteBuffer requestBuffer = ByteBuffer.allocateDirect(1024);
            while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
                // 长连接情况下,需要手动判断数据有没有读取结束 (此处做一个简单的判断: 超过0字节就认为请求结束了)
                if (requestBuffer.position() > 0) {
                    break;
                }
            }
            if (requestBuffer.position() == 0) {
                continue;
            } // 如果没数据了, 则不继续后面的处理

            requestBuffer.flip();
            byte[] content = new byte[requestBuffer.limit()];
            requestBuffer.get(content);
            System.out.println(new String(content));
            System.out.println("收到数据,来自:" + socketChannel.getRemoteAddress());

            // 响应结果 200
            String response = "HTTP/1.1 200 OK\r\n" +
                "Content-Length: 11\r\n\r\n" +
                "Hello World";
            ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
            while (buffer.hasRemaining()) {
                socketChannel.write(buffer);// 非阻塞
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
直接内存

之前提到过网络非阻塞io的 epoll 过程中可以通过用户空间和内核空间的地址映射到同一块物理内存上,减少内核态和用户态切换的开销,这也就是直接内存 allocateDirect的作用 。直接内存只适用于网络IO,文件IO如果使用,会抛出 UnsupportedOperationException 的异常。需要注意的是,直接内存属于堆外内存,不受 JVM GC 管理,如果使用了不恰当的处理方法,可能会存在内存泄漏的问题。

DataGramChannel(UDP)

DatagramChannel 是一个可以发送、接收UDP数据包的通道。通过 send() 方法从 DatagramChannel 发送数据

ByteBuffer buf = ByteBuffer.allocateDirect(1024);
DatagramChannel channel = DatagramChannel.open();
channel.socket().bind(new InetSocketAddress(38081));
channel.receive(buf);

int byteSent = channel.send(buf, new InetSocketAddress("example.com", 80));
channel.connect(new InetSocketAddress(80));

AsynchronousFileChannel (异步文件通道)

通过Future读取数据
ByteBuffer buf = ByteBuffer.allocate(1024);
Path path = Paths.get("/tmp/test.txt");

AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
Future<Integer> operation = fileChannel.read(buf, 0);
while(!operation.isDone());
buf.flip();
byte[] data = new byte[buf.limit()];
buf.get(data);
System.out.println(new String(data));
buf.clear();
通过Future写数据
ByteBuffer buf = ByteBuffer.allocate(1024);
Path path = Paths.get("/tmp/test.txt");

AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
buf.put("test data\n".getBytes());
buf.flip();

Future<Integer> operation = fileChannel.write(buf, 0);
buf.clear();

while(!operation.isDone());
System.out.println("Write done");

同时也可以通过 CompletionHandler ,重写方法来读取和写数据

管道,路径和文件帮助类

Pipe管道

管道指的是两个线程间单向传输数据的连接。

// 创建管道
ByteBuffer buf = ByteBuffer.allocate(48);
Pipe pipe = Pipe.open();
//写入数据
Pipe.SinkChannel sinkChannel = pipe.sink();
String newData = "New String to write to file...";

buf.clear();
buf.put(newData.getBytes());

buf.flip();

while(buf.hasRemaining()) {
    sinkChannel.write(buf);
}

//读取数据
buf.clear();
Pipe.SourceChannel sourceChannel = pipe.source();
int bytesRead = sourceChannel.read(buf);
System.out.println(new String(buf.array()));
路径

nio 提供了java.nio.file.Paths 辅助类,用于建立绝对路径和相对路径

Path absolutePath = Paths.get("/tmp/test.txt");
Path relativePath = Paths.get("/tmp","test.txt");
文件

同时 nio 提供了java.nio.flie.Files 类 ,来更方便实现文件路径的操作和对于文件的创建、移动、删除、获取信息等操作。

判断文件是否存在

Path path = Paths.get("data/logging.properties");

boolean pathExists =
        Files.exists(path,
            new LinkOption[]{ LinkOption.NOFOLLOW_LINKS});

创建目录

Path path = Paths.get("/tmp/subdir");

try {
    Path newDir = Files.createDirectory(path);
} catch(FileAlreadyExistsException  e){
    // the directory already exists.
} catch (IOException e) {
    //something else went wrong
    e.printStackTrace();
}

复制文件

Path sourcePath      = Paths.get("data/logging.properties");
Path destinationPath = Paths.get("data/logging-copy.properties");

try {
    Files.copy(sourcePath, destinationPath);
} catch(FileAlreadyExistsException e) {
    //destination file already exists
} catch (IOException e) {
    //something else went wrong
    e.printStackTrace();
}

覆盖文件

Path sourcePath      = Paths.get("data/logging.properties");
Path destinationPath = Paths.get("data/logging-copy.properties");

try {
    Files.copy(sourcePath, destinationPath,
            StandardCopyOption.REPLACE_EXISTING);
} catch(FileAlreadyExistsException e) {
    //destination file already exists
} catch (IOException e) {
    //something else went wrong
    e.printStackTrace();
}

移动文件

Path sourcePath      = Paths.get("data/logging-copy.properties");
Path destinationPath = Paths.get("data/subdir/logging-moved.properties");

try {
    Files.move(sourcePath, destinationPath,
            StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
    //moving file failed.
    e.printStackTrace();
}

删除文件

Path path = Paths.get("data/subdir/logging-moved.properties");

try {
    Files.delete(path);
} catch (IOException e) {
    //deleting file failed
    e.printStackTrace();
}

以及可以通过 Files.walkFileTree() 遍历目录。