NingG +

Java NIO梳理

几个要点:

Java IO

Java IO 与 Java NIO:

流(Stream)

IO本质上是byte的流动,而流(Stream)是byte移动的载体,从源点向目的点移动数据。源点和目的点,既可以是内存区域、磁盘文件、也可以是一个URL,只要能代表位置就可以。根据流的方向,流可以分为:输入流和输出流,从输入流读取数据,向输出流写入数据。

IO的分类

Java对io的支持主要集中在io包下,两类:

无论网络传输还是磁盘读写,最小的存储单元都是byte。但程序中操作的数据大都是char形式的,所以Java也提供了char的Stream。上述是IO流本身的支持:流的形态、流内部装的什么,但流并不等于IO,还有重要的一点:数据从哪里来?写到哪里去?主要是一下两种:

磁盘IO工作机制

io中数据写到何处也是重要的一点,其中最主要的就是将数据持久化到磁盘。数据在磁盘上最小的描述就是文件,上层应用对磁盘的读和写都是针对文件而言的。在java中,以File类来表示文件,如:

File file = new File("D:/test.txt");

但是严格来说,File并不表示一个真实的存在于磁盘上的文件。就像上面代码的文件其实并不存在,File做的只是根据你所提供的文件描述符,返回某一路径的虚拟对象,它并不关心文件或路径是否存在,可能存在,也可能是捏造的。就好象一张名片,名片的背后代表的是人。为什么要这么设计?在我看来还是要提高访问磁盘的效率,有点延迟加载的意思。大部分情况下,我们最关心的并不是文件存不存在,而是文件要如何操作。比如你手里有很多名片,你可能更关心的是有没有某某局长的名片,而只有在需要联系时,才发现名片是假的。也就是关心名片本身要强过名片的真伪。

以FileInputStream读取文件为例,过程是这样的:当传入一个文件路径时,会根据这个路径创建File对象,作为这个文件的一个“名片”。当我们试图通过FileInputStream对象去操作文件的时候,将会真正创建一个关联真实存在的磁盘文件的文件描述符FileDescriptor,通过FileInputStream构造方法可以看出:

fd = new FileDescriptor();

如果说File是文件的名片,那么FileDescriptor就是真正指向了一个打开的文件,可以操作磁盘文件。例如FileDescriptor.sync()方法可以将缓存中的数据强制刷新到磁盘文件中。如果我们需要读取的是字符,还需要通过StreamDecoder类将字节解码成字符。至于如何从物理磁盘上读取数据,那就是操作系统做的事情了。过程如图(图摘自网上):

Socket工作机制

Socket要说起来并不那么形象,它的中文翻译是“插座”,至于“套接字”这个翻译我实在不知道从何而来。可以这样理解插座的概念,由于本身有电网的存在,如果我们买了一台新电器,我们只要插上插座连接到电网上就能够使用。Socket就像一个插座,计算机通过Socket就能和网络或者其他计算机上进行通讯;当有数据通讯的需求时,只需要建立一个Socket“插座”,通过网卡与其他计算机相连获取数据。

Socket位于传输层和应用层之间,向应用层统一提供编程接口,应用层不必知道传输层的协议细节。Java中对Socket的支持主要是以下两种:

大部分情况下我们使用的都是基于TCP/IP协议的流Socket,因为它是一种稳定的通信协议。以此为例:

一台计算机要和另一台计算机进行通讯,获取其上应用程序的数据,必须通过Socket建立连接,要知道对方的IP和端口号。建立一个Socket连接需要通过底层TCP/IP协议来建立TCP连接,而建立TCP连接必须通过底层IP协议根据给定的IP在网络中找到目标主机。目标计算机上可能跑着多个应用,所以我们必须要根据端口号来制定目标应用程序,这样就可以通过一个 Socket 实例唯一代表一个主机上的一个应用程序的通信链路了。

那么Socket是如何建立通讯链路的呢?

假设有一台计算机作为客户端,另一台作为服务端。当客户端需要向服务端通信,客户端首先要创建一个Socket实例:

Socket socket = new Socket("127.0.0.1",1234);

若没有指定端口号,操作系统将为这个Socket实例分配一个没有被使用的本地端口号。此外创建了一个包含本地和远程地址和端口号的套接字数据结构,这个数据结构将一直保存在系统中直到这个连接关闭,代码如下:

public Socket(String host, int port)
	throws UnknownHostException, IOException
{
	this(host != null ? new InetSocketAddress(host, port) :
		 new InetSocketAddress(InetAddress.getByName(null), port),
		 (SocketAddress) null, true);
}

客户端试图和服务端建立TCP连接,此时会进行三次握手。

完成三次握手后Socket的构造函数成功返回,Socket实例创建完毕。

互联网是一种尽力而为(best-effort)的网络,客户端的起始消息或服务器端的回复消息都可能在传输过程中丢失。出于这个原因,TCP 协议实现将以递增的时间间隔重复发送几次握手消息。如果TCP客户端在一段时间后还没有收到服务器的返回消息,则发生超时并放弃连接。这种情况下,构造函数将抛出IOException 异常。

而服务端也需要创建与之对应的ServerSocket,ServerSocket的创建比较简单,只需要指定端口号:

ServerSocket serverSocket = new ServerSocket(10001);

   同时操作系统也会为ServerSocket实例创建一个底层数据结构:

bind(new InetSocketAddress(bindAddr, port), backlog);  //见构造方法(backlog:最大客户端等待队列)

这个数据结构中包含指定监听的端口号和包含监听地址的通配符,通常情况下是监听所有地址,下面是比较典型的ServerSocket代码:

public void testSocket() throws Exception
{
	ServerSocket serverSocket = new ServerSocket(10002);
	Socket socket = null;
	try
	{
		while (true)
		{
			socket = serverSocket.accept();
			System.out.println("socket连接:" + socket.getRemoteSocketAddress().toString());
			BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

			while(true)
			{
				String readLine = in.readLine();
				System.out.println("收到消息" + readLine);
				if("end".equals(readLine))
				{
					break;
				}
			}
			in.close();
			socket.close();
		}
	}
	catch (SocketException se)
	{
		System.out.println("客户端断开连接");
	}
	catch (IOException e)
	{
		e.printStackTrace();
	}
	finally
	{
		System.out.println("socket关闭:" + socket.getRemoteSocketAddress().toString());
		socket.close();
	}
}   

当调用accept()方法时,服务端将进入阻塞状态,等待客户端的请求。当有客户端请求到来时,将为这个链接创建一个套接字数据结构,包括请求客户端的地址和端口号。该数据结构将被关联到ServerSocket实例的一个未连接列表里。此时连接并没有成功建立,处于三次握手阶段,Socket构造函数并未成功返回。当三次握手成功后,会将Socket实例对应的数据结构从未完成列表移到完成列表中。所以 ServerSocket 所关联的列表中每个数据结构,都代表与一个客户端的建立的 TCP 连接。(client链接,两个列表:等待列表、完成列表)

当连接成功创建后,我们要做的就是传输数据,这才是主要目的。如上例代码,在客户端和服务端都有一个Socket实例,而每个Socket实例都会拥有一个InputStream和OutputStream,我们正是通过它们传输数据。当Socket对象创建时,操作系统将会为InputStream和OutputStream分别分配一定大小的缓冲区,数据的写入和读取都是通过缓存区完成的。发送端的缓冲区称之为SendQ,是一个FIFO的队列,接收端的缓冲区称之为RecvQ,同样也是FIFO队列。

数据传输时,发送端将数据写入到OutputStream对应的SendQ队列中,以字节为单位发送到接收端InputStream的RecvQ队列中。当SendQ队列填满时,发送端的write方法将会阻塞住;而当RecvQ队列中没有数据时,接收端的read方法也将被阻塞。

Socket模式的工作原理

几个疑问:

下图中,服务器端ServerSocket独占一个线程,负责监听Client发送过来的连接请求,并为每个Client请求新建一个处理线程;Socket工作模式的细节,参考:Java Socket梳理

Java NIO

为什么会产生Java NIO(JDK 1.4+)?因为传统Java IO,特别是基于网络的IO操作(socket),有几个特点:

Java IO vs Java NIO

简要对比如下表:

Java IO Java NIO 说明
面向流 面向缓冲区 面向缓冲区,数据移动方便,处理灵活,但处理复杂
阻塞IO 非阻塞IO 用户线程read()、write()操作不阻塞
- 选择器 单线程管理多通道,提升效率

简单来说:

NIO使人们只用一个或几个单线程,就可以管理多个通道(网络连接或文件),但代价是解析数据可能比从一个阻塞流中读取数据更为复杂。

适用场景:Java IO和Java NIO的适用场景如下:

对于网络I/O,传统的阻塞式I/O,一个线程对应一个连接,采用线程池的模式在大部分场景下简单高效。当连接数茫茫多时,并且数据的移动非常频繁,NIO无疑是更好的选择。

NIO标榜的是高速、可伸缩的I/O,因为它更亲近操作系统。当需求很平凡,没有太高的效率要求的时候,你看不出它的好,反而觉得NIO代码实现复杂,不易理解。选择与否全看使用的场景,这点就看使用者的权衡了。

面向流与面向缓冲

Java NIO和IO之间第一个最大的区别是,IO是面向流的,NIO是面向缓冲区的。 Java IO面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。 Java NIO的缓冲导向方法略有不同。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但是,还需要检查是否该缓冲区中包含所有您需要处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。

思考:下面几个理解,Java NIO 缓冲区的利弊:

阻塞与非阻塞IO

Java IO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。 Java NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。

思考:下面几个理解

选择器(Selectors)

Java NIO的选择器允许一个单独的线程来监视多个输入通道,你可以注册多个通道使用一个选择器,然后使用一个单独的线程来“选择”通道:这些通道里已经有可以处理的输入,或者选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道。

基本原理

Java NIO是在jdk1.4开始使用的,它既可以说成“新I/O”,也可以说成非阻塞式I/O。下面是java NIO的工作原理:

  1. 由一个专门的线程来处理所有的 IO 事件,并负责分发。
  2. 事件驱动机制:事件到的时候触发,而不是同步的去监视事件。
  3. 线程通讯:线程之间通过 wait,notify 等方式通讯。保证每次上下文切换都是有意义的。减少无谓的线程切换。

具体NIO原理,参考下图:

java NIO采用了双向通道(channel)进行数据传输,而不是单向的流(stream),在通道上可以注册我们感兴趣的事件。一共有以下四种事件:

事件名 对应值
服务端接收客户端事件 SelectionKey.OP_ACCEPT(16)
客户端连接服务端事件 SelectionKey.OP_CONNECT(8)
读事件 SelectionKey.OP_READ(1)
写事件 SelectionKey.OP_WRITE(4)

服务端和客户端各自维护一个管理通道的对象,我们称之为selector,该对象能检测一个或多个通道 (channel) 上的事件。我们以服务端为例,如果服务端的selector上注册了读事件,某时刻客户端给服务端发送了一些数据,阻塞I/O这时会调用read()方法阻塞地读取数据,而NIO的服务端会在selector中添加一个读事件。服务端的处理线程会轮询地访问selector,如果访问selector时发现有感兴趣的事件到达,则处理这些事件,如果没有感兴趣的事件到达,则处理线程会一直阻塞直到感兴趣的事件到达为止。下面是java NIO的通信模型示意图:

完整Dome

此处的Demo代码已经提交到GitHub上simple-web-demo下learn-java-basic工程中top.ningg.java.nio包下。

NIOServer

构造selector,并为selector上绑定channel:

package top.ningg.java.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/*
 * NIO服务端
 */
public class NIOServer {
	
	private Selector selector; // 通道管理器

	/*
	 * 获得一个ServerSocket通道,并对该通道做一些初始化的工作
	 */
	public void initServer(int port) throws IOException {

		ServerSocketChannel serverChannel = ServerSocketChannel.open(); // 获得一个ServerSocket通道
		serverChannel.configureBlocking(false); // 设置通道为非阻塞
		serverChannel.socket().bind(new InetSocketAddress(port)); // 将该通道对应的ServerSocket绑定到port端口

		this.selector = Selector.open(); // 获得一个通道管理器
		// 将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,
		// 当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。
		serverChannel.register(selector, SelectionKey.OP_ACCEPT);
	}

	/*
	 * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
	 */
	@SuppressWarnings("unchecked")
	public void listen() throws IOException {
		System.out.println("服务端启动成功!");

		while (true) { // 轮询访问selector

			selector.select(); // 当注册的事件到达时,方法返回;否则,该方法会一直阻塞
			Iterator ite = this.selector.selectedKeys().iterator(); // 获得selector中选中的项的迭代器,选中的项为注册的事件
			
			while (ite.hasNext()) {
				
				SelectionKey key = (SelectionKey) ite.next();
				ite.remove(); 

				if (key.isAcceptable()) { // 客户端请求连接事件
					ServerSocketChannel server = (ServerSocketChannel) key.channel();
					SocketChannel channel = server.accept(); // 获得和客户端连接的通道
					channel.configureBlocking(false); 

					channel.write(ByteBuffer.wrap(new String("向客户端发送了一条信息").getBytes()));
					// 在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。
					channel.register(this.selector, SelectionKey.OP_READ);

				} else if (key.isReadable()) { // 获得了可读的事件
					read(key);
				}

			}

		}
	}

	/*
	 * 处理读取客户端发来的信息 的事件
	 */
	public void read(SelectionKey key) throws IOException {
		SocketChannel channel = (SocketChannel) key.channel(); // 服务器可读取消息:得到事件发生的Socket通道
		ByteBuffer buffer = ByteBuffer.allocate(10); // 创建读取的缓冲区
		channel.read(buffer);
		byte[] data = buffer.array();
		String msg = new String(data).trim();
		System.out.println("服务端收到信息:" + msg);
	}

	public static void main(String[] args) throws IOException {
		NIOServer server = new NIOServer();
		server.initServer(8000);
		server.listen();
	}

}

NIOClient

构造selector,并为其绑定channel以及事件:

package top.ningg.java.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/*
 * NIO客户端
 */
public class NIOClient {

	private Selector selector; 

	public void initClient(String ip, int port) throws IOException {

		SocketChannel channel = SocketChannel.open(); 
		channel.configureBlocking(false); 
		this.selector = Selector.open();

		channel.connect(new InetSocketAddress(ip, port));
		channel.register(selector, SelectionKey.OP_CONNECT);
	}

	@SuppressWarnings("unchecked")
	public void listen() throws IOException {

		while (true) { 
			
			selector.select();
			Iterator ite = this.selector.selectedKeys().iterator(); 
			
			while (ite.hasNext()) {
				SelectionKey key = (SelectionKey) ite.next();
				ite.remove(); 

				if (key.isConnectable()) {
					SocketChannel channel = (SocketChannel) key.channel();

					if (channel.isConnectionPending()) {
						channel.finishConnect();
					}

					channel.configureBlocking(false); 

					channel.write(ByteBuffer.wrap(new String("向服务端发送了一条信息").getBytes()));
					channel.register(this.selector, SelectionKey.OP_READ);

				} else if (key.isReadable()) { // 获得了可读的事件
					read(key);
				}

			}

		}
	}

	/*
	 * 处理读取服务端发来的信息 的事件
	 */
	public void read(SelectionKey key) throws IOException {

		SocketChannel channel = (SocketChannel) key.channel(); // 客户端可读取消息:得到事件发生的Socket通道

		ByteBuffer buffer = ByteBuffer.allocate(10); // 创建读取的缓冲区
		channel.read(buffer);
		byte[] data = buffer.array();
		String msg = new String(data).trim();
		System.out.println("客户端收到信息:" + msg);
	}

	/*
	 * 启动客户端测试
	 */
	public static void main(String[] args) throws IOException {
		NIOClient client = new NIOClient();
		client.initClient("localhost", 8000);
		client.listen();
	}

}

高性能IO设计模式

在传统的网络服务设计模式中,有两种比较经典的模式:

对于多线程模式,也就说来了client,服务器就会新建一个线程来处理该client的读写事件,如下图所示:

这种模式虽然处理起来简单方便,但是由于服务器为每个client的连接都采用一个线程去处理,使得资源占用非常大。因此,当连接数量达到上限时,再有用户请求连接,直接会导致资源瓶颈,严重的可能会直接导致服务器崩溃。

因此,为了解决这种一个线程对应一个客户端模式带来的问题,提出了采用线程池的方式,也就说创建一个固定大小的线程池,来一个客户端,就从线程池取一个空闲线程来处理,当客户端处理完读写操作之后,就交出对线程的占用。因此这样就避免为每一个客户端都要创建线程带来的资源浪费,使得线程可以重用。

但是线程池也有它的弊端,如果连接大多是长连接,因此可能会导致在一段时间内,线程池中的线程都被占用,那么当再有用户请求连接时,由于没有可用的空闲线程来处理,就会导致客户端连接失败,从而影响用户体验。因此,线程池比较适合大量的短连接应用。

因此便出现了下面的两种高性能IO设计模式:ReactorProactor

在Reactor模式中,会先对每个client注册感兴趣的事件,然后有一个线程专门去轮询每个client是否有事件发生,当有事件发生时,便顺序处理每个事件,当所有事件处理完之后,便再转去继续轮询,如下图所示:

从这里可以看出,上面的Java NIO就是采用Reactor模式。注意,上面的图中展示的 是顺序处理每个事件,当然为了提高事件处理速度,可以通过多线程或者线程池的方式来处理事件。

在Proactor模式中,当检测到有事件发生时,会新起一个异步操作,然后交由内核线程去处理,当内核线程完成IO操作之后,发送一个通知告知操作已完成,可以得知,异步IO模型采用的就是Proactor模式。

参考来源

Top