本文共 13430 字,大约阅读时间需要 44 分钟。
一个应用程序进行IO时,需要系统内核的参与,发送syscall指令产生中断。
发生中断意味着需要操作系统介入,开展管理工作。由于操作系统的管理工作(比如切换线程、分配I/O设备等),需要使用特权指令,因此CPU要从用户态转为核心态。中断可以使CPU从用户态转化为核心态,使操作系统获得计算机的控制权,有了中断才能实现多道程序并发执行。
应用程序通过系统调用请求操作系统的服务。系统中的各种共享资源都由操作系统统一掌管,因此在用户程序中,凡是与资源有关的操作(如存储分配、I/O操作、文件管理等),都必须通过系统调用的方式向操作系统提出服务请求,由操作系统代为完成(进入核心态)。这样可以保证系统的稳定性和安全性,防止用户进行非法操作。
strace -ff -o ./tmp java TestSocket
:抓取Linux程序对内核有没有发生系统调用
一个BIO Server服务端的默认IO包括:
0表示输入流、1表示输出流、2表示异常流、3表示引入jar包的IO流、4表示IPV4的网络IO流、5表示IPV6的网络IO流。
每建立一个连接就生成了一个fd文件描述符。
select,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写。
select 时间复杂度O(n)
它仅仅知道了,有I/O事件发生了,却并不知道是哪那几个流(可能有一个,多个,甚至全部),只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。所以select具有O(n)的无差别轮询复杂度,同时处理的流越多,无差别轮询时间就越长,最大连接数限制为1024。
poll 时间复杂度O(n)
poll本质上和select没有区别,它将用户传入的fd数组拷贝到内核空间,然后查询每个fd(文件描述符)对应的设备状态, 但是它没有最大连接数的限制,原因是它是基于链表来存储的。
epoll 时间复杂度O(1)
epoll可以理解为event poll,不同于忙轮询和无差别轮询,epoll会把哪个流发生了怎样的I/O事件通知用户。所以epoll实际上是事件驱动(每个事件关联上fd)的,此时对这些流的操作都是有意义的。由硬件(如网卡)发出中断通知内核,充分发挥硬件,尽量不浪费CPU。
当有100万个客户端连接时,select与poll会有非常明显的问题,即在某一时刻,进程收集有事件的连接时,其实这100万连接中的大部分都是没有事件发生的。因此如果每次收集事件时,都把100万连接的套接字传给操作系统(这首先是用户态内存到内核态内存的大量复制),而由操作系统内核寻找这些连接上有没有未处理的事件,将会是巨大的资源浪费,然后select和poll就是这样做的,因此它们最多只能处理几千个并发连接。而epoll不这样做,它在Linux内核中申请了一个简易的文件系统,把原先的一个select或poll调用分成了3部分:
int epoll_create(int size); int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
这样只需要在进程启动时建立1个epoll对象,并在需要的时候向它添加或删除连接就可以了,因此,在实际收集事件时,epoll_wait的效率就会非常高,因为调用epoll_wait时并没有向它传递这100万个连接,内核也不需要去遍历全部的连接。
我们在调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个红黑树用于存储以后epoll_ctl传来的socket外,还会再建立一个rdllist双向链表,用于存储准备就绪的事件,当epoll_wait调用时,仅仅观察这个rdllist双向链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。
所有添加到epoll中的事件都会与设备(如网卡)驱动程序建立回调关系,也就是说相应事件的发生时会调用这里的回调方法。这个回调方法在内核中叫做ep_poll_callback,它会把这样的事件放到上面的rdllist双向链表中。
struct eventpoll { ... /*红黑树的根节点,这棵树中存储着所有添加到epoll中的事件, 也就是这个epoll监控的事件*/ struct rb_root rbr; /*双向链表rdllist保存着将要通过epoll_wait返回给用户的、满足条件的事件*/ struct list_head rdllist; ...};
在epoll中对于每一个事件都会建立一个epitem结构体:
struct epitem { ... //红黑树节点 struct rb_node rbn; //双向链表节点 struct list_head rdllink; //事件句柄等信息 struct epoll_filefd ffd; //指向其所属的eventepoll对象 struct eventpoll *ep; //期待的事件类型 struct epoll_event event; ...}; // 这里包含每一个事件对应着的信息。
当调用epoll_wait检查是否有发生事件的连接时,只是检查eventpoll对象中的rdllist双向链表是否有epitem元素而已,如果rdllist链表不为空,则这里的事件复制到用户态内存(使用共享内存提高效率)中,同时将事件数量返回给用户。因此epoll_waitx效率非常高。epoll_ctl在向epoll对象中添加、修改、删除事件时,从rbr红黑树中查找事件也非常快,也就是说epoll是非常高效的,它可以轻易地处理百万级别的并发连接。
NIO、Nginx、Redis均采用epoll。
Redis为单线程,这个线程不仅负责IO,还需要处理LRU、AOF等操作,需要轮询;
Nginx为多线程,IO线程只需要阻塞等待事件驱动epoll_wait即可。
epoll有两种模式:
参考文章:
Blocking IO(Input - OutPut)
阻塞式IO,对每一个连接都创建一个线程,在服务器接收客户端连接、写数据、读数据时都会阻塞,导致并发数少,线程频换切换导致效率低。真正的网络编程中,BIO很少使用。
BIO实现一个聊天室
服务端:
public class MyServer { // 保存所有的Socket连接 public static ListsocketList = Collections.synchronizedList(new ArrayList<>()); public static void main(String[] args) throws IOException { ServerSocket ss = new ServerSocket(30000); System.out.println("服务器启动..."); while (true){ Socket s = ss.accept();//阻塞 socketList.add(s); // 启动客户端服务 new Thread(new ServerThread(s)).start(); } }}
服务端线程(为每一个连接分配一个线程):
public class ServerThread implements Runnable { Socket s = null; BufferedReader br = null; public ServerThread(Socket s) throws IOException { this.s = s; br = new BufferedReader(new InputStreamReader(s.getInputStream())); } @Override public void run() { try{ String content = null; while((content = readFromClient()) != null){ //遍历每一个Socket for(Socket s : MyServer.socketList){ PrintStream ps = new PrintStream(s.getOutputStream()); ps.println(content); } } }catch (IOException e){ e.printStackTrace(); } } private String readFromClient(){ try { return br.readLine(); } catch (IOException e) { MyServer.socketList.remove(s); } return null; }}
客户端(读线程):
public class MyClient { public static void main(String[] args) throws Exception{ Socket s = new Socket("127.0.0.1", 30000); //客户端启动线程读取服务端的数据 new Thread(new ClientThread(s)).start(); PrintStream ps = new PrintStream(s.getOutputStream()); String line = null; BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); while((line = br.readLine())!= null){ ps.println(line); //System.out.println("已传输到服务器"); } }}
客户端(写线程)
public class ClientThread implements Runnable { Socket s = null; BufferedReader br = null; public ClientThread(Socket s) throws IOException { this.s = s; br = new BufferedReader(new InputStreamReader(s.getInputStream())); } @Override public void run() { try { String content = null; while ((content = br.readLine()) != null){ System.out.println(content); } }catch (Exception e){ e.printStackTrace(); } }}
New IO/Non-Blocking IO 非阻塞IO (Tomcat使用NIO)
每一个连接称为一个channel,selector(选择器)轮询每一个channel,当获取连接,读,写事件出现时,selector会将这些事件获取并用单线程处理。
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;public class Server { public static void main(String[] args) throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress("127.0.0.1", 8888)); ssc.configureBlocking(false); // 设定为非阻塞模型 System.out.println("server started, listening on :" + ssc.getLocalAddress()); Selector selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT); // 注册连接事件 while(true) { selector.select(); Setkeys = selector.selectedKeys(); Iterator it = keys.iterator(); while(it.hasNext()) { SelectionKey key = it.next(); it.remove(); handle(key); } } } private static void handle(SelectionKey key) { if(key.isAcceptable()) { try { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); sc.register(key.selector(), SelectionKey.OP_READ); // 监控读事件 } catch (IOException e) { e.printStackTrace(); } finally { } } else if (key.isReadable()) { //flip复位操作 SocketChannel sc = null; try { sc = (SocketChannel)key.channel(); // NIO利用Buffer提高读写效率 ByteBuffer buffer = ByteBuffer.allocate(512); buffer.clear(); int len = sc.read(buffer); if(len != -1) { System.out.println(new String(buffer.array(), 0, len)); } ByteBuffer bufferToWrite = ByteBuffer.wrap("HelloClient".getBytes()); sc.write(bufferToWrite); } catch (IOException e) { e.printStackTrace(); } finally { if(sc != null) { try { sc.close(); } catch (IOException e) { e.printStackTrace(); } } } } }}
注:服务器对各个客户端的事件没有顺序。
Boss + Worker
由一个单独的线程处理事件轮询,将事件交给线程池来处理(Netty雏形)。
Asynchronous IO(异步IO) 不再需要轮询
AIO 通过调用accept方法,一个会话接入之后再次调用(递归)accept方法,监听下一次会话,读取也不再阻塞,回调complete方法异步进行。不再需要selector使用channel线程组来接收。
由操作系统处理事件,处理结束后通知server的回调/钩子函数(观察者模式),写法与Netty类似。
public class ServerWithThreadGroup { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); AsynchronousChannelGroup threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1); //中文测试 final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(threadGroup) .bind(new InetSocketAddress(8888)); serverChannel.accept(null, new CompletionHandler() { @Override //回调函数 public void completed(AsynchronousSocketChannel client, Object attachment) { serverChannel.accept(null, this); try { System.out.println(client.getRemoteAddress()); ByteBuffer buffer = ByteBuffer.allocate(1024); client.read(buffer, buffer, new CompletionHandler () { //读取成功时回调函数 @Override public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); System.out.println(new String(attachment.array(), 0, result)); client.write(ByteBuffer.wrap("HelloClient".getBytes())); } 读取失败时回调函数 @Override public void failed(Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); } }); } catch (IOException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, Object attachment) { exc.printStackTrace(); } }); while (true) { Thread.sleep(1000); } }}
从NIO上面我们能看到,对于IO的两个阶段的阻塞,只是对于第一个阶段有所改善,对于第二个阶段在NIO里面仍然是阻塞的。而真正的理想的异步非阻塞IO(AAIO)要做的就是,将IO操作的两个阶段都全部交给内核系统完成,用户线程只需要告诉内核,我要读取一块数据,请你帮我读取,读取完了放在我给你的地址里面,然后告诉我一声就可以了。
AIO可以做到真正的异步的操作,但实现起来比较复杂,支持纯异步IO的操作系统非常少,目前只有windows的IOCP技术实现了,而在Linux上,目前有很多开源的异步IO库,例如libevent、libev、libuv,但基本都不是纯的异步IO操作,底层还是是使用的epoll实现的。
参考:https://www.cnblogs.com/yuxiang1/p/10003866.html
Netty出现的主要原因,如下:
Java NIO类库和API繁杂众多,使用麻烦;
Java NIO封装程度并不高,常常需要配合Java多线程编程来使用,这是因为NIO编程涉及到Reactor模式;
Java NIO异常体系不完善,如客户端面临断连,重连,网络闪断,半包读写,网络阻塞,异常码流等问题,虽然开发相对容易,但是可靠性和稳定性并不高;
Java NIO本身的bug,修复较慢。
Netty底层没有用AIO,而是采用NIO,因为在Linux上AIO与NIO都是用epoll(轮询)实现,AIO反而多了一层封装。但是其API设计更像AIO。
p.s. 在windows上AIO是用事件模型,效率较高,比在Linux上使用AIO快。
public class HelloNetty { public static void main(String[] args) { new NettyServer(8888).serverStart(); }}class NettyServer { int port = 8888; public NettyServer(int port) { this.port = port; } public void serverStart() { // 用于接收连接 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 用于处理事件 EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new Handler()); } }); try { ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }}class Handler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //super.channelRead(ctx, msg); System.out.println("server: channel read"); ByteBuf buf = (ByteBuf)msg; System.out.println(buf.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(msg); ctx.close(); //buf.release(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //super.exceptionCaught(ctx, cause); cause.printStackTrace(); ctx.close(); }}
同步-异步关注的是消息通信的机制
阻塞-非阻塞关注的是等待消息时的状态
以烧水为例(程序就是人,内核就是水壶):
1.同步阻塞
点火(发消息)——> 傻等(同步),不等到水开不干别的事(阻塞)
2.同步非阻塞
点火(发消息)——> 去看电视,时不时查看结果(非阻塞),水开后自己处理(同步)
3.异步阻塞
点火(发消息)——> 傻等水壶响(阻塞),水开后自动处理
异步阻塞很少发生
4.异步非阻塞
点火(发消息)——> 该干嘛干嘛(非阻塞),水开后自动处理