博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
从IO-BIO-NIO-AIO-到Netty
阅读量:4224 次
发布时间:2019-05-26

本文共 13430 字,大约阅读时间需要 44 分钟。

文章目录

IO

操作系统层面

一个应用程序进行IO时,需要系统内核的参与,发送syscall指令产生中断。

发生中断意味着需要操作系统介入,开展管理工作。由于操作系统的管理工作(比如切换线程、分配I/O设备等),需要使用特权指令,因此CPU要从用户态转为核心态。中断可以使CPU从用户态转化为核心态,使操作系统获得计算机的控制权,有了中断才能实现多道程序并发执行。

应用程序通过系统调用请求操作系统的服务。系统中的各种共享资源都由操作系统统一掌管,因此在用户程序中,凡是与资源有关的操作(如存储分配、I/O操作、文件管理等),都必须通过系统调用的方式向操作系统提出服务请求,由操作系统代为完成(进入核心态)。这样可以保证系统的稳定性和安全性,防止用户进行非法操作。

strace -ff -o ./tmp java TestSocket:抓取Linux程序对内核有没有发生系统调用

一个BIO Server服务端的默认IO包括:

0表示输入流、1表示输出流、2表示异常流、3表示引入jar包的IO流、4表示IPV4的网络IO流、5表示IPV6的网络IO流。

每建立一个连接就生成了一个fd文件描述符。

IO的多路复用

select,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写。

select 时间复杂度O(n)

它仅仅知道了,有I/O事件发生了,却并不知道是哪那几个流(可能有一个,多个,甚至全部),只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。所以select具有O(n)的无差别轮询复杂度,同时处理的流越多,无差别轮询时间就越长,最大连接数限制为1024。

poll 时间复杂度O(n)

poll本质上和select没有区别,它将用户传入的fd数组拷贝到内核空间,然后查询每个fd(文件描述符)对应的设备状态, 但是它没有最大连接数的限制,原因是它是基于链表来存储的。

epoll 时间复杂度O(1)

epoll可以理解为event poll,不同于忙轮询和无差别轮询,epoll会把哪个流发生了怎样的I/O事件通知用户。所以epoll实际上是事件驱动(每个事件关联上fd)的,此时对这些流的操作都是有意义的。由硬件(如网卡)发出中断通知内核,充分发挥硬件,尽量不浪费CPU。

epoll

当有100万个客户端连接时,select与poll会有非常明显的问题,即在某一时刻,进程收集有事件的连接时,其实这100万连接中的大部分都是没有事件发生的。因此如果每次收集事件时,都把100万连接的套接字传给操作系统(这首先是用户态内存到内核态内存的大量复制),而由操作系统内核寻找这些连接上有没有未处理的事件,将会是巨大的资源浪费,然后select和poll就是这样做的,因此它们最多只能处理几千个并发连接。而epoll不这样做,它在Linux内核中申请了一个简易的文件系统,把原先的一个select或poll调用分成了3部分:

int epoll_create(int size);  int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);  int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
  1. 调用epoll_create建立一个epoll对象(在epoll文件系统中给这个句柄分配资源);
  2. 调用epoll_ctl向epoll对象中添加这100万个连接的套接字;
  3. 调用epoll_wait收集发生事件的连接。

这样只需要在进程启动时建立1个epoll对象,并在需要的时候向它添加或删除连接就可以了,因此,在实际收集事件时,epoll_wait的效率就会非常高,因为调用epoll_wait时并没有向它传递这100万个连接,内核也不需要去遍历全部的连接。

我们在调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个红黑树用于存储以后epoll_ctl传来的socket外,还会再建立一个rdllist双向链表,用于存储准备就绪的事件,当epoll_wait调用时,仅仅观察这个rdllist双向链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。

所有添加到epoll中的事件都会与设备(如网卡)驱动程序建立回调关系,也就是说相应事件的发生时会调用这里的回调方法。这个回调方法在内核中叫做ep_poll_callback,它会把这样的事件放到上面的rdllist双向链表中。

struct eventpoll {  ...  /*红黑树的根节点,这棵树中存储着所有添加到epoll中的事件,  也就是这个epoll监控的事件*/  struct rb_root rbr;  /*双向链表rdllist保存着将要通过epoll_wait返回给用户的、满足条件的事件*/  struct list_head rdllist;  ...};

在epoll中对于每一个事件都会建立一个epitem结构体

struct epitem {  ...  //红黑树节点  struct rb_node rbn;  //双向链表节点  struct list_head rdllink;  //事件句柄等信息  struct epoll_filefd ffd;  //指向其所属的eventepoll对象  struct eventpoll *ep;  //期待的事件类型  struct epoll_event event;  ...}; // 这里包含每一个事件对应着的信息。

当调用epoll_wait检查是否有发生事件的连接时,只是检查eventpoll对象中的rdllist双向链表是否有epitem元素而已,如果rdllist链表不为空,则这里的事件复制到用户态内存(使用共享内存提高效率)中,同时将事件数量返回给用户。因此epoll_waitx效率非常高。epoll_ctl在向epoll对象中添加、修改、删除事件时,从rbr红黑树中查找事件也非常快,也就是说epoll是非常高效的,它可以轻易地处理百万级别的并发连接。

NIO、Nginx、Redis均采用epoll。

Redis为单线程,这个线程不仅负责IO,还需要处理LRU、AOF等操作,需要轮询;

Nginx为多线程,IO线程只需要阻塞等待事件驱动epoll_wait即可。

epoll有两种模式:

  • ET模式(边缘触发)只有数据到来才触发不管缓存区中是否还有数据,缓冲区剩余未读尽的数据不会导致epoll_wait返回;
  • LT 模式(水平触发,默认)只要有数据都会触发,缓冲区剩余未读尽的数据会导致epoll_wait返回。

参考文章:

  • https://blog.csdn.net/daaikuaichuan/article/details/83862311(推荐)
  • https://www.jianshu.com/p/fe54ca4affe8
  • https://blog.csdn.net/wteruiycbqqvwt/article/details/90299610

BIO

Blocking IO(Input - OutPut)

阻塞式IO,对每一个连接都创建一个线程,在服务器接收客户端连接、写数据、读数据时都会阻塞,导致并发数少,线程频换切换导致效率低。真正的网络编程中,BIO很少使用。

BIO实现一个聊天室

服务端:

public class MyServer {
// 保存所有的Socket连接 public static List
socketList = Collections.synchronizedList(new ArrayList<>()); public static void main(String[] args) throws IOException {
ServerSocket ss = new ServerSocket(30000); System.out.println("服务器启动..."); while (true){
Socket s = ss.accept();//阻塞 socketList.add(s); // 启动客户端服务 new Thread(new ServerThread(s)).start(); } }}

服务端线程(为每一个连接分配一个线程):

public class ServerThread implements Runnable {
Socket s = null; BufferedReader br = null; public ServerThread(Socket s) throws IOException {
this.s = s; br = new BufferedReader(new InputStreamReader(s.getInputStream())); } @Override public void run() {
try{
String content = null; while((content = readFromClient()) != null){
//遍历每一个Socket for(Socket s : MyServer.socketList){
PrintStream ps = new PrintStream(s.getOutputStream()); ps.println(content); } } }catch (IOException e){
e.printStackTrace(); } } private String readFromClient(){
try {
return br.readLine(); } catch (IOException e) {
MyServer.socketList.remove(s); } return null; }}

客户端(读线程):

public class MyClient {
public static void main(String[] args) throws Exception{
Socket s = new Socket("127.0.0.1", 30000); //客户端启动线程读取服务端的数据 new Thread(new ClientThread(s)).start(); PrintStream ps = new PrintStream(s.getOutputStream()); String line = null; BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); while((line = br.readLine())!= null){
ps.println(line); //System.out.println("已传输到服务器"); } }}

客户端(写线程)

public class ClientThread implements Runnable {
Socket s = null; BufferedReader br = null; public ClientThread(Socket s) throws IOException {
this.s = s; br = new BufferedReader(new InputStreamReader(s.getInputStream())); } @Override public void run() {
try {
String content = null; while ((content = br.readLine()) != null){
System.out.println(content); } }catch (Exception e){
e.printStackTrace(); } }}

NIO

New IO/Non-Blocking IO 非阻塞IO (Tomcat使用NIO)

NIO单线程模型

每一个连接称为一个channel,selector(选择器)轮询每一个channel,当获取连接,读,写事件出现时,selector会将这些事件获取并用单线程处理。

import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;public class Server {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress("127.0.0.1", 8888)); ssc.configureBlocking(false); // 设定为非阻塞模型 System.out.println("server started, listening on :" + ssc.getLocalAddress()); Selector selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT); // 注册连接事件 while(true) {
selector.select(); Set
keys = selector.selectedKeys(); Iterator
it = keys.iterator(); while(it.hasNext()) {
SelectionKey key = it.next(); it.remove(); handle(key); } } } private static void handle(SelectionKey key) {
if(key.isAcceptable()) {
try {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); sc.register(key.selector(), SelectionKey.OP_READ); // 监控读事件 } catch (IOException e) {
e.printStackTrace(); } finally {
} } else if (key.isReadable()) {
//flip复位操作 SocketChannel sc = null; try {
sc = (SocketChannel)key.channel(); // NIO利用Buffer提高读写效率 ByteBuffer buffer = ByteBuffer.allocate(512); buffer.clear(); int len = sc.read(buffer); if(len != -1) {
System.out.println(new String(buffer.array(), 0, len)); } ByteBuffer bufferToWrite = ByteBuffer.wrap("HelloClient".getBytes()); sc.write(bufferToWrite); } catch (IOException e) {
e.printStackTrace(); } finally {
if(sc != null) {
try {
sc.close(); } catch (IOException e) {
e.printStackTrace(); } } } } }}

注:服务器对各个客户端的事件没有顺序。

NIO-reactor模式

Boss + Worker

由一个单独的线程处理事件轮询,将事件交给线程池来处理(Netty雏形)。

AIO

Asynchronous IO(异步IO) 不再需要轮询

AIO 通过调用accept方法,一个会话接入之后再次调用(递归)accept方法,监听下一次会话,读取也不再阻塞,回调complete方法异步进行。不再需要selector使用channel线程组来接收。

由操作系统处理事件,处理结束后通知server的回调/钩子函数(观察者模式),写法与Netty类似。

public class ServerWithThreadGroup {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool(); AsynchronousChannelGroup threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1); //中文测试 final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(threadGroup) .bind(new InetSocketAddress(8888)); serverChannel.accept(null, new CompletionHandler
() {
@Override //回调函数 public void completed(AsynchronousSocketChannel client, Object attachment) {
serverChannel.accept(null, this); try {
System.out.println(client.getRemoteAddress()); ByteBuffer buffer = ByteBuffer.allocate(1024); client.read(buffer, buffer, new CompletionHandler
() {
//读取成功时回调函数 @Override public void completed(Integer result, ByteBuffer attachment) {
attachment.flip(); System.out.println(new String(attachment.array(), 0, result)); client.write(ByteBuffer.wrap("HelloClient".getBytes())); } 读取失败时回调函数 @Override public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace(); } }); } catch (IOException e) {
e.printStackTrace(); } } @Override public void failed(Throwable exc, Object attachment) {
exc.printStackTrace(); } }); while (true) {
Thread.sleep(1000); } }}

从NIO上面我们能看到,对于IO的两个阶段的阻塞,只是对于第一个阶段有所改善,对于第二个阶段在NIO里面仍然是阻塞的。而真正的理想的异步非阻塞IO(AAIO)要做的就是,将IO操作的两个阶段都全部交给内核系统完成,用户线程只需要告诉内核,我要读取一块数据,请你帮我读取,读取完了放在我给你的地址里面,然后告诉我一声就可以了。

AIO可以做到真正的异步的操作,但实现起来比较复杂,支持纯异步IO的操作系统非常少,目前只有windows的IOCP技术实现了,而在Linux上,目前有很多开源的异步IO库,例如libevent、libev、libuv,但基本都不是纯的异步IO操作,底层还是是使用的epoll实现的。

参考:https://www.cnblogs.com/yuxiang1/p/10003866.html

Netty

Netty出现的主要原因,如下:

  • Java NIO类库和API繁杂众多,使用麻烦;

  • Java NIO封装程度并不高,常常需要配合Java多线程编程来使用,这是因为NIO编程涉及到Reactor模式;

  • Java NIO异常体系不完善,如客户端面临断连,重连,网络闪断,半包读写,网络阻塞,异常码流等问题,虽然开发相对容易,但是可靠性和稳定性并不高;

  • Java NIO本身的bug,修复较慢。

Netty底层没有用AIO,而是采用NIO,因为在Linux上AIO与NIO都是用epoll(轮询)实现,AIO反而多了一层封装。但是其API设计更像AIO。

p.s. 在windows上AIO是用事件模型,效率较高,比在Linux上使用AIO快。

public class HelloNetty {
public static void main(String[] args) {
new NettyServer(8888).serverStart(); }}class NettyServer {
int port = 8888; public NettyServer(int port) {
this.port = port; } public void serverStart() {
// 用于接收连接 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 用于处理事件 EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer
() {
@Override protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new Handler()); } }); try {
ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) {
e.printStackTrace(); } finally {
workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }}class Handler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//super.channelRead(ctx, msg); System.out.println("server: channel read"); ByteBuf buf = (ByteBuf)msg; System.out.println(buf.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(msg); ctx.close(); //buf.release(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//super.exceptionCaught(ctx, cause); cause.printStackTrace(); ctx.close(); }}

同步-异步-阻塞-非阻塞

同步-异步关注的是消息通信的机制

阻塞-非阻塞关注的是等待消息时的状态

以烧水为例(程序就是人,内核就是水壶):

1.同步阻塞

点火(发消息)——> 傻等(同步),不等到水开不干别的事(阻塞)

2.同步非阻塞

点火(发消息)——> 去看电视,时不时查看结果(非阻塞),水开后自己处理(同步)

3.异步阻塞

点火(发消息)——> 傻等水壶响(阻塞),水开后自动处理

异步阻塞很少发生

4.异步非阻塞

点火(发消息)——> 该干嘛干嘛(非阻塞),水开后自动处理

你可能感兴趣的文章
为什么不能建立引用数组?
查看>>
Union的一个知识点
查看>>
基类析构函数为虚函数的研究
查看>>
函数入栈出栈以及栈帧
查看>>
从RTTI谈C++的向下转型
查看>>
面试心得(BAT)
查看>>
Windows Server 2012 Web方式修改域用户密码-通过Remote Desktop Web实现
查看>>
华为USG5300 采用IKE安全策略方式建立IPSec隧道
查看>>
Centos7 nginx访问日志文件割接
查看>>
我是一条内存!
查看>>
OIM API Usage
查看>>
OIM实现OIM用户修改、OIM用户Disable流程审批
查看>>
UEFI与MBR区别
查看>>
Ubuntu每次开机后提示:检测到系统程序出现问题的解决方法
查看>>
CC2640 看门狗配置
查看>>
linnux外网不通,提示Destination Host Unreachable解决
查看>>
为strawberry perl安装PadWalker
查看>>
Notepad++的字体设置加Consolas和微软雅黑混合字体
查看>>
gdb 调试常用命令
查看>>
POJ 3125 (Queue)
查看>>