Java NIO

Table of Contents

1 Java NIO

Java 1.4中引入了Java New I/O (NIO),其目的在于提高I/O处理速度。这样,Java程序员可以实现高速I/O而无需编写自定义的本机代码。NIO可将最耗时的I/O操作(即填充和提取缓冲区)转移到了操作系统,因而可以极大地提高速度。

本文主要参考:Java NIO, by Ron Hitchens

1.1 I/O基本概念(操作系统层面)

必须掌握操作系统层面的处理细节,才能更好地理解NIO模型。

1.1.1 缓冲区

缓冲区,以及缓冲区如何工作,是所有I/O的基础。所谓“输入/输出”讲的无非就是把数据移进或移出缓冲区。

进程执行I/O操作,归结起来,也就是向操作系统发出请求,让它要么把缓冲区里的数据排干,要么用数据把缓冲区填满。图 1 简单描述了数据从外部磁盘向运行中的进程的内存区域移动的过程。进程使用read()系统调用,要求其缓冲区被填满。内核随即向磁盘控制硬件发出命令,要求其从磁盘读取数据。磁盘控制器把数据直接写入内核内存缓冲区,这一步通过DMA完成,无需主CPU协助。一旦磁盘控制器把缓冲区装满,内核即把数据从内核空间的临时缓冲区拷贝到进程执行read()调用时指定的缓冲区。

java_nio_simplified_io_buffer_handling.jpg

Figure 1: Simplified I/O Buffer Handling

看了图 1 ,您可能会觉得,把数据从内核空间拷贝到用户空间似乎有些多余。为什么不直接让磁盘控制器把数据送到用户空间的缓冲区呢?这样做有几个问题。首先,硬件通常不能直接访问用户空间。其次,像磁盘这样基于块存储的硬件设备操作的是固定大小的数据块,而用户进程请求的可能是任意大小的或非对齐的数据块。在数据往来于用户空间与存储设备的过程中,内核负责数据的分解、再组合工作,因此充当着中间人的角色。

1.1.2 虚拟内存

所有现代操作系统都使用虚拟内存。虚拟内存意为使用虚假(或虚拟)地址取代物理(硬件RAM)内存地址。这样做好处颇多,总结起来主要有:

  1. 一个以上的虚拟地址可指向同一个物理内存地址。
  2. 虚拟内存空间可大于实际可用的硬件内存。

前一节提到,设备控制器不能通过DMA直接存储到用户空间,但通过利用上面提到的第一项,则可以达到相同效果。把内核空间地址与用户空间的虚拟地址映射到同一个物理地址,这样,DMA硬件(只能访问物理内存地址)就可以填充对内核与用户空间进程同时可见的缓冲区,如图 2 所示。

java_nio_multiply_mapped_memory_space.jpg

Figure 2: Multiply mapped memory space

这样真是太好了,省去了内核与用户空间的往来拷贝,但前提条件是,内核与用户缓冲区必须使用相同的页对齐,缓冲区的大小还必须是磁盘控制器块大小(通常为 512 字节磁盘扇区)的倍数。操作系统把内存地址空间划分为页,即固定大小的字节组。内存页的大小总是磁盘块大小的倍数,通常为2次幂(这样可简化寻址操作)。典型的内存页为1,024、2,048和4,096字节。虚拟和物理内存页的大小总是相同的。

1.1.3 文件I/O

文件I/O属文件系统范畴,文件系统与磁盘迥然不同。磁盘把数据存在扇区上,通常一个扇区512 字节。磁盘属硬件设备,对何谓文件一无所知,它只是提供了一系列数据存取窗口。在这点上,磁盘扇区与内存页颇有相似之处:都是统一大小,都可作为大的数组被访问。

文件系统是更高层次的抽象,是安排、解释磁盘(或其他随机存取块设备)数据的一种独特方式。您所写代码几乎无一例外地要与文件系统打交道,而不是直接与磁盘打交道。文件系统定义了文件名、路径、文件、文件属性等抽象概念。

1.1.3.1 内存映射文件

传统的文件I/O是通过用户进程发布read()和write()系统调用来传输数据的。为了在内核空间的文件系统页与用户空间的内存区之间移动数据,一次以上的拷贝操作几乎总是免不了的。

大多数操作系统都支持的特殊类型的I/O操作,允许用户进程最大限度地利用面向页的系统I/O特性,并完全摒弃缓冲区拷贝。这就是内存映射I/O,如图 3 所示。

java_nio_user_memory_mapped_to_filesystem_pages.jpg

Figure 3: User memory mapped to filesystem pages

内存映射I/O使用文件系统建立从用户空间直到可用文件系统页的虚拟内存映射(通过mmap系统调用)。这样做有下面好处:

  • 用户进程把文件数据当作内存,所以无需调用read()或write()系统调用。
  • 当用户进程碰触到映射内存空间,页错误会自动产生,从而将文件数据从磁盘读进内存。如果用户修改了映射内存空间,相关页会自动标记为脏,随后刷新到磁盘,文件得到更新。

2 Java NIO核心组件之一:缓冲区

一个 Buffer 对象是固定数量的数据的容器。 其作用是一个存储器,或者分段运输区,在这里数据可被存储并在之后用于检索。图 4 是Buffer的类层次图。

java_nio_buffer_family_tree.jpg

Figure 4: The Buffer family tree

2.1 缓冲区四个属性

所有的缓冲区都具有四个属性来提供关于其所包含的数据元素的信息。它们是:

容量(Capacity)
缓冲区能够容纳的数据元素的最大数量。这一容量在缓冲区创建时被设定,并且永远不能被改变。
上界(Limit)
缓冲区的第一个不能被读或写的元素。
位置(Position)
下一个要被读或写的元素的索引。位置会自动由相应的get()和put()函数更新。
标记(Mark)
一个备忘位置。调用mark()来设定mark=postion。调用reset()设定position=mark。标记在设定前是未定义的(undefined)。

这四个属性之间总是遵循以下关系:0 <= mark <= position <= limit <= capacity

5 展示了一个新创建的容量为10的ByteBuffer逻辑视图。

java_nio_newly_created_bytebuffer.jpg

Figure 5: A newly created ByteBuffer

6 是在新建缓冲区上执行5次put()操作后的逻辑视图。

java_nio_buffer_after_5put.jpg

Figure 6: Buffer after five put()s

2.2 缓冲区API

Buffer类部分方法的签名如下:

package java.nio;

public abstract class Buffer {
    // This is a partial API listing
    public final int capacity();
    public final int position();
    public final Buffer position (int newPositio);
    public final int limit();
    public final Buffer limit (int newLimit);
    public final Buffer mark();
    public final Buffer reset();
    public final Buffer clear();
    public final Buffer flip();
    public final Buffer rewind();
    public final int remaining();
    public final boolean hasRemaining();
    public abstract boolean isReadOnly();
    public abstract boolean isDirect();
}

上面的Buffer API并没有包括get()或put()函数。不过每一个Buffer子类都有这两个函数,由于它们所采用的参数类型,以及它们返回的数据类型,对每个子类(Buffer的子类有ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer)来说都是唯一的,所以它们不能在顶层Buffer类中被抽象地声明。

2.2.1 分配Buffer(allocate)

要获取一个Buffer对象,我们首先需要分配内存空间。每个类型的Buffer都有一个allocate()方法。我们可以通过这个方法分配Buffer。如:

import java.nio.ByteBuffer;

public abstract class BufferTest {

    public static void main(String[] args) {

        ByteBuffer buf = ByteBuffer.allocate(16);

        buf.put((byte) 'A');
        buf.put((byte) 'B');
        buf.put((byte) 'C');

        System.out.println(buf.get(0));      // 输出65
        System.out.println(buf.get(1));      // 输出66
        System.out.println(buf.get(2));      // 输出67

        System.out.println(buf.position());  // 输出3
        System.out.println(buf.limit());     // 输出16
        System.out.println(buf.capacity());  // 输出16
    }
}

2.2.2 翻转(flip)

当Buffer已经写结束了,我们想把Buffer内容全部读取出来,怎么做呢?在设置好limit和position后,多次调用get()方法即可实现。如:

import java.nio.ByteBuffer;

public abstract class BufferTest {

    public static void main(String[] args) {

        ByteBuffer buf = ByteBuffer.allocate(16);

        buf.put((byte) 'A');
        buf.put((byte) 'B');
        buf.put((byte) 'C');

        buf.limit(buf.position());     //将limit设置为当前位置
        buf.position(0);               //将position重置为0

        while (buf.hasRemaining()) {
            System.out.println(buf.get());   // 依次按行输出65 66 67
        }

        System.out.println(buf.position());  // 输出3
        System.out.println(buf.limit());     // 输出16
        System.out.println(buf.capacity());  // 输出16
    }
}

其中,上面代码中的下面两行

// 下面两行代码可以用buf.flip();代替。
buf.limit(buf.position());
buf.position(0);

就是函数flip()实现的功能(除此外flip()还会将mark清除)。flip()的源码为:

public final Buffer flip() {
    limit = position;
    position = 0;
    mark = -1;
    return this;
}

flip()函数将一个能够继续添加数据元素的“填充状态”的缓冲区翻转成一个准备读出元素的“释放状态”。 也就是说ByteBuffer无法同时支持读写,函数flip()可以把它从“写模式”切换为“读模式”。

假设图 7 是对Buffer进行写操作结束后的状态,那么此时调用flip()方法后,Buffer的状态会如图 8 所示。

java_nio_buffer_after_modification.jpg

Figure 7: Buffer的当前状态

java_nio_buffer_after_flipped.jpg

Figure 8: Buffer执行完flip()后的状态

2.2.3 倒带(rewind)

rewind()函数与flip()相似,但不影响limit属性。它只是将position设回0。您可以使用rewind()后退,重读已经被翻转的缓冲区中的数据。
rewind()方法的源码:

public final Buffer rewind() {
    position = 0;
    mark = -1;
    return this;
}

2.2.4 Direct vs. non-direct buffers

一个Buffer(下面以ByteBuffer为例)要么是Non-Direct的(分配在Heap上,由jvm负责管理),要么是Non-direct的。使用方法 isDirect() 可以测试Buffer是否为Direct的。

2.2.4.1 Non-direct buffer

使用方法 ByteBuffer.allocate() 得到的ByteBuffer是Non-direct的。

2.2.4.2 Direct buffer

使用方法 ByteBuffer.allocateDirect() 得到的ByteBuffer是Direct的。jvm可以保证在gc时不会移动它,不过jvm没有提供释放它的方法,大胆猜测当它没有引用时jvm可能释放它。 ByteBuffer.allocateDirect() 的官方建议使用场景:“It is recommended that direct buffers be allocated primarily for large, long-lived buffers that are subject to the underlying system's native I/O operations.”

使用方法 FileChannel.map() 可得到MappedByteBuffer(底层使用系统调用 mmap 来实现),它也是Direct的。

3 Java NIO核心组件之二:通道

A channel represents an open connection to an entity such as a hardware device, a file, a network socket, or a program component that is capable of performing one or more distinct I/O operations, for example reading or writing.

Channel用于在字节缓冲区和位于通道另一侧的实体(通常是一个文件或套接字)之间有效地传输数据。 多数情况下,通道与操作系统的文件描述符有着一对一的关系。虽然通道比文件描述符更广义,但您将经常使用到的多数通道都是连接到打开的文件描述符的。The channel classes provide the abstraction needed to maintain platform independence but still model the native I/O capabilities of modern operating systems.

Channels always read to, or write from, a Buffer. 如图 9 所示。

java_nio_overview_channels_buffers.png

Figure 9: Channels read data into Buffers, and Buffers write data into Channels

NIO中的Channel的主要实现有:

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

3.1 FileChannel

一个FileChannel对象可以在一个打开的RandomAccessFile、FileInputStream或FileOutputStream对象上调用getChannel()方法来获取;或者通过FileChannel的open()方法来获取。

下面是分别使用FileInputStream读取文件和使用NIO读取文件的实例:

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;

public abstract class ChannelTest {

    public static void main(String[] args) {
        String filePath = "/Users/cig01/1.txt";
        System.out.println("Using FileInputStream");
        method1(filePath);
        System.out.println("Using NIO");
        method2(filePath);
    }

    /* 使用FileInputStream读取文件 */
    public static void method1(String filePath) {
        InputStream in = null;
        try {
            in = new BufferedInputStream(new FileInputStream(filePath));

            byte[] buf = new byte[8];    // 这里仅为演示,一般会使用更大的数组。

            int bytesRead = in.read(buf);
            while (bytesRead != -1) {
                for (int i = 0; i < bytesRead; i++) {
                    System.out.print((char) buf[i]);
                }
                bytesRead = in.read(buf);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (in != null) {
                    in.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /* 使用NIO读取文件 */
    public static void method2(String filePath) {
        FileChannel fileChannel = null;
        try {
            Path path = Paths.get(filePath);
            fileChannel =  FileChannel.open(path);

            ByteBuffer buf = ByteBuffer.allocate(8);  // 这里仅为演示,一般会使用更大的buffer。

            int bytesRead = fileChannel.read(buf);    // 从channel中读数据

            while (bytesRead != -1) {
                System.out.println("Number of bytes read: " + bytesRead);

                buf.flip();

                System.out.print("Buffer contents: ");

                while (buf.hasRemaining()) {
                    System.out.print((char) buf.get());
                }

                System.out.println();

                buf.clear();
                bytesRead = fileChannel.read(buf);    // 从channel中读数据
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (fileChannel != null) {
                    fileChannel.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

假设文件"/Users/cig01/1.txt"的内容为:1234567890(加一个换行符),则上面代码的输出为:

Using FileInputStream
1234567890
Using NIO
Number of bytes read: 8
Buffer contents: 12345678
Number of bytes read: 3
Buffer contents: 90

3.2 零拷贝

考虑从文件中读出数据并将数据传输到网络上的场景(很多应用都有这样的场景,比如静态web服务器、ftp服务器等)。

使用传统方法实现这个功能,你需要使用下面函数:

File.read(fileDesc, buf, len);
Socket.send(socket, buf, len);

数据从文件移动到套接字的过程如图 10 所示。

java_nio_traditional_data_copying.gif

Figure 10: Traditional data copying approach

从图 10 中可知,使用传统方法时一共有4次数据拷贝过程:2次DMA拷贝(无需CPU参与,速度很快),两次需要CPU参与的拷贝(比DMA拷贝要慢)。

3.2.1 transferTo()

在上面例子中,我们发现两次CPU拷贝(即图中第2次和第3次拷贝)是多余的。应用程序只是起到缓存数据并将其传回到套接字的作用而以,别无他用。数据可以直接从读取缓冲区传输到套接字缓冲区。

FileChannel类中的方法transferTo()可以实现这个操作,它将数据从文件通道传输到了另一可写通道(这个例子中可以为SocketChannel)。

public void transferTo(long position, long count, WritableByteChannel target);

使用transferTo()时,数据的移动过程如图 11 所示。

java_nio_data_copy_with_transferTo.gif

Figure 11: 一次CPU拷贝

11 中有一次CPU拷贝和两次DMA拷贝,和图 10 相比,减少了一次CPU拷贝。

注1:在Unix/Linux中,transferTo()是使用系统调用 sendfile 实现的。
注2:使用transferTo(),除了减少了CPU拷贝次数,还减少了context switching的次数。
注3:最后一次CPU拷贝可以省略吗?答案是肯定的(不过它需要硬件及驱动程序的支持)。 我们把缓冲区描述符(而不是数据本身)传到Socket缓冲区,再把数据长度传过去,这样DMA控制器就可以直接将缓存中的数据打包发送到网络中,而无需先拷贝到Socket缓冲区。这称为收集拷贝功能(Gather Operation)。 如图 12 所示。
注4:如果硬件及驱动程序支持Gather Operation,方法transferTo()会自动使用这个特性,对Java应用程序来说是透明的。

java_nio_data_copy_with_transferTo_with_gather_op.gif

Figure 12: 消除了所有CPU拷贝(需要硬件及驱动程序的支持),所以称为“零拷贝”

参考:
Efficient data transfer through zero copy
浅析Linux中的零拷贝技术

4 Java NIO核心组件之三:选择器

A selector (java.nio.channels.Selector and subclasses) provides a mechanism for waiting on channels and recognizing when one or more become available for data transfer.

Java的selector表达的概念和Unix中的select系统调用类似。在实现层面上,Java使用epoll(Linux 2.6以前会使用poll)或者kqueues(FreeBsd, MacOS)等来实现selector。

5 异步I/O

Java 1.7中增加了 AsynchronousChannel ,这种类型的Channel支持异步I/O操作。再利用java.util.concurrent.Future或者java.nio.channels.CompletionHandler可以实现异步I/O。下面将分别介绍它们。

参考:Java IO Tutorial - Java Asynchronous I/O

5.1 异步I/O实例:使用AsynchronousChannel和Future

下面使用AsynchronousChannel和Future读取文件内容的实例:

import static java.nio.file.StandardOpenOption.READ;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.Future;

public class TestAsyncChannelAndFuture {
    public static void main(String[] args) throws Exception {
        Path path = Paths.get("/Users/cig01/test/file1");

        try (AsynchronousFileChannel afc = AsynchronousFileChannel.open(path, READ)) {
            int fileSize = (int) afc.size();
            ByteBuffer dataBuffer = ByteBuffer.allocate(fileSize);

            Future<Integer> result = afc.read(dataBuffer, 0);

            int readBytes = result.get();
            // 当读取结束后result.get()才返回,调用get()前使用result.isDone()可测试是否结束

            System.out.format("%s bytes read from %s%n", readBytes, path);

            System.out.format("Read data is:%n");
            byte[] byteData = dataBuffer.array();
            Charset cs = Charset.forName("UTF-8");
            String data = new String(byteData, cs);
            System.out.println(data);
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

假设文件“/Users/cig01/test/file1”的内容为“abcxyz”(最后有换行符),则上面程序的输出如下:

7 bytes read from /Users/cig01/test/file1
Read data is:
abcxyz

5.2 异步I/O实例:使用AsynchronousChannel和CompletionHandler

下面使用AsynchronousChannel和CompletionHandler读取文件内容的实例:

import static java.nio.file.StandardOpenOption.READ;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.nio.file.Paths;

public class TestAsyncChannelAndCompletionHandler {
    public static void main(String[] args) throws Exception {
        Path path = Paths.get("/Users/cig01/test/file1");
        AsynchronousFileChannel afc = AsynchronousFileChannel.open(path, READ);
        ReadHandler handler = new ReadHandler();
        int fileSize = (int) afc.size();
        ByteBuffer dataBuffer = ByteBuffer.allocate(fileSize);

        Attachment attach = new Attachment();
        attach.asyncChannel = afc;
        attach.buffer = dataBuffer;
        attach.path = path;

        afc.read(dataBuffer, 0, attach, handler);

        System.out.println("Sleeping for 5 seconds...");
        Thread.sleep(5000);  // 等待读取完成,否则进程会提前退出
    }
}

class Attachment {
    public Path path;
    public ByteBuffer buffer;
    public AsynchronousFileChannel asyncChannel;
}

class ReadHandler implements CompletionHandler<Integer, Attachment> {
    @Override
    public void completed(Integer result, Attachment attach) {
        System.out.format("%s bytes read from %s%n", result, attach.path);
        System.out.format("Read data is:%n");
        byte[] byteData = attach.buffer.array();
        Charset cs = Charset.forName("UTF-8");
        String data = new String(byteData, cs);
        System.out.println(data);
        try {
            // Close the channel
            attach.asyncChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void failed(Throwable e, Attachment attach) {
        System.out.format("Read operation on %s file failed." + "The error is: %s%n", attach.path, e.getMessage());
        try {
            // Close the channel
            attach.asyncChannel.close();
        } catch (IOException e1) {
            e1.printStackTrace();
        }
    }
}

假设文件“/Users/cig01/test/file1”的内容为“abcxyz”(最后有换行符),则上面程序的输出如下:

Sleeping for 5 seconds...
7 bytes read from /Users/cig01/test/file1
Read data is:
abcxyz

Author: cig01

Created: <2013-12-29 Sun 00:00>

Last updated: <2018-04-27 Fri 18:13>

Creator: Emacs 25.3.1 (Org mode 9.1.4)