Skip to content

IO 流

BIO

同步并阻塞的 IO 模型。客户端有连接请求时,服务器端就需要启动一个线程进行处理。

BIO.png

文件基本操作

java
//创建文件方式1
File file = new File("news1.txt");
file.createNewFile();

//创建文件方式2
File file3 = new File("d:\\", "news3.txt");
file3.createNewFile();

//删除文件
if (file.exists()) {
    file.delete();
}

//创建/删除 目录
//在 Java 中目录是当成特殊的文件处理的
File dir = new File("D:\\demo2");
dir.mkdir();
if (dir.exists()) {
    dir.delete();
}

//创建多级目录,多级目录需要递归删除
File dirs = new File("D:\\demo3\\demo33");
dirs.mkdirs();

字节流

  • InputStream:所有字节输入流的超类(如FileInputStreamByteArrayInputStream
  • OutputStream:所有字节输出流的超类(如FileOutputStreamByteArrayOutputStream

文件读写示例

java
// 复制文件(字节流)
try (InputStream in = new FileInputStream("source.txt");
OutputStream out = new FileOutputStream("target.txt")) {
    byte[] buffer = new byte[1024];
    int bytesRead;
    while ((bytesRead = in.read(buffer)) != -1) {
        out.write(buffer, 0, bytesRead);
    }
}

字符流

  • Reader:所有字符输入流的超类(如FileReaderStringReader
  • Writer:所有字符输出流的超类(如FileWriterStringWriter

缓冲优化:通常结合 BufferedReaderBufferedWriter 提升性能

java
// 按行读取文本文件
try (BufferedReader reader = new BufferedReader(new FileReader("input.txt"))) {
    String line;
    while ((line = reader.readLine()) != null) {
        System.out.println(line);
    }
}

关键细节与最佳实践

  1. 资源释放
  • 使用 try-with-resources 自动关闭流
java
try (InputStream in = new FileInputStream("file.txt")) {
        // 使用 in...
} // 自动调用 in.close()
  1. 缓冲技术
  • 始终使用缓冲流 (如 BufferedInputStreamBufferedWriter ) 提升性能
java
// 从文件读取并写入到另一个文件(缓冲优化)
try (BufferedInputStream bin = new BufferedInputStream(new FileInputStream("in.txt"));
     BufferedOutputStream bout = new BufferedOutputStream(new FileOutputStream("out.txt"))) {
    byte[] buffer = new byte[4096];
    int bytesRead;
    while ((bytesRead = bin.read(buffer)) != -1) {
        bout.write(buffer, 0, bytesRead);
    }
}
  1. 流类型选择表
数据类型推荐流类型典型类组合
文本文件(无编码要求)FileReader + BufferedReadernew BufferedReader(new FileReader(...))
文本文件(指定编码)InputStreamReader + FileInputStreamnew BufferReader(new InputStreamReader(new FileInputStream(...),"UTF-8"))
二进制文件(图片、视频)BufferedInputStream/BufferedOutputStreamnew BufferedInputStream(new FileInputStream(...))
文本网络流 (HTTP响应)InputStreamReader + Socket.getInputStream处理 Http Body 文本
基本数据类型读写DataInputStream + DataOutputStream配合 FileOutputStream 持久化数据

NIO

NIO 同步非阻塞 IO 模型,NIO 有三大核心部分 ChannelBufferSelector

NIO.png

  • 多个 Channel 注册到同一个 Selector
  • Selector 根据不同的事件在不同的 Channel 上进行切换 (事件驱动)
  • Buffer 就是一个内存块,底层有一个数组
  • 数据的读取写入通过 Buffer ,是双向的,通过 flip() 进行切换

Buffer

java
//创建一个 Buffer,大小为 5 即可以存放 5 个 int
IntBuffer intBuffer = IntBuffer.allocate(5);

for (int i = 0; i < intBuffer.capacity(); i++) {
    intBuffer.put(i * 2);
}

//读写切换
intBuffer.flip();

while (intBuffer.hasRemaining()){
    System.out.println(intBuffer.get());
}

Buffer 的核心源码属性

java
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;  //标记
private int position = 0;  //位置,下一个要被读或写的元素的索引
private int limit;  //表示缓冲区的当前终点,即缓冲区中可以操作的元素个数
private final int capacity; //缓冲区最大容量,在创建时确定并且不能改变
  • Buffer 常用方法
java
public final int capacity() //返回缓冲区的容量

public final int position() //返回缓冲区当前位置
public final Buffer position(int newPosition) //设置缓冲区当前位置

public final int limit()  //返回此缓冲区限制
public final Buffer limit(int newLimit) //设置此缓冲区的限制

public final Buffer clear() //清除缓冲区,恢复各标记到初始状态,但数据没有真正擦除 
public final Buffer flip() //反转缓冲区,读写转换,limit=position,position=0

//jdk 1.6 引入
public final boolean hasArray() //缓冲区是否具有可访问的底层实现数组
public final Object array() //返回此缓冲区的底层实现数组
  • ByteBuffer 常用方法
java
public static ByteBuffer allocateDirect(int capacity) //创建一个直接缓冲区
public static ByteBuffer allocate(int capacity) //创建一个非直接缓冲区
  • MappedByteBuffer
java
RandomAccessFile randomAccessFile = new RandomAccessFile("1.txt","rw");
FileChannel fileChannel = randomAccessFile.getChannel();
//MappedByteBuffer 让文件直接在堆外内存修改,不用拷贝一次,速度更快
MappedByteBuffer map = fileChannel.map(FileChannel.MapMode.READ_WRITE,
        0, //可以直接修改的起始位置
        5);// 映射到内存的大小,即可以修改的范围

map.put(0,(byte)'H');
map.put(3,(byte)'9');
  • 利用 Buffer 数组进行读写
java
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);

// 绑定端口到 socket , 并启动
serverSocketChannel.socket()
        .bind(inetSocketAddress);

//创建 buffer 数组
ByteBuffer[] byteBuffers = new ByteBuffer[2];
byteBuffers[0] = ByteBuffer.allocate(5);
byteBuffers[1] = ByteBuffer.allocate(3);

//等客户端连接
SocketChannel socketChannel = serverSocketChannel.accept();
int messageSize = 8;
while (true) {
    int  byteRead = 0;
    while (byteRead < messageSize) {
        long read = socketChannel.read(byteBuffers);
        byteRead += read; // 累计读取的字节数

        //使用流打印,查看当前 buffer 的 position 和 limit
        for (ByteBuffer byteBuffer : byteBuffers) {
            System.out.println("position=" + byteBuffer.position() 
                        + ",limit=" + byteBuffer.limit());
        }
    }

    //将所有的 buffer 进行读写反转
    for (ByteBuffer byteBuffer : byteBuffers) {
        byteBuffer.flip();
    }

    //将数据写出显示到客户端
    long byteWritten = 0;
    while (byteWritten < messageSize) {
        long write = socketChannel.write(byteBuffers);
        byteWritten += write;
    }
    //所有的 buffer 进行 clear
    for (ByteBuffer byteBuffer : byteBuffers) {
        byteBuffer.clear();
    }
}

Channel

  • Channel 是一种双向的,可以读写数据的通道Stream 要么只能读,要么只能写

NIO 方式实现文件拷贝

java
try (
        FileInputStream fileInputStream = new FileInputStream("source.txt");
        FileOutputStream fileOutputStream = new FileOutputStream("target.txt");
        ){
    FileChannel inChannel = fileInputStream.getChannel();
    FileChannel outChannel = fileOutputStream.getChannel();
    //创建 Buffer
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

    //读入到 Buffer 中
    while (true) {
        int read = inChannel.read(byteBuffer);
        if (read == -1) { // 读完退出
            break;
        }
    }

    //反转,从 Buffer 中写出
    byteBuffer.flip();

    outChannel.write(byteBuffer);
}

Channel 的 transferFrom 直接实现文件拷贝

java
try (
        FileInputStream fileInputStream = new FileInputStream("线程生命周期.png");
        FileOutputStream fileOutputStream = new FileOutputStream("img2.png");
        ){
    FileChannel inChannel = fileInputStream.getChannel();
    FileChannel outChannel = fileOutputStream.getChannel();

    //使用 transferFrom 完成拷贝
    outChannel.transferFrom(inChannel, 0, inChannel.size());
}

Selector

  • Selector 是一个多路复用器,用于监控多个 Channel 的 IO 事件,由事件类型驱动。
事件类型就绪条件
OP_READ接收缓冲区中有数据可读
OP_WRITE发送缓冲区有空间可写或非阻塞模式下连接未完成
OP_CONNECT客户端与服务端已完成三次握手 finishConnect()
OP_ACCEPT服务端套接字监听队列中有新连接请求

NIO 网络编程流程

服务端

  1. 初始化阶段
  • 创建 Selector
java
Selector selector = Selector.open();
  • 启动 ServerSocketChannel
java
//启动 ServerSocketChannel
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); //非阻塞模式
ssc.socket().bind(new InetSocketAddress(6666));
//注册 Accept 事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
  1. 事件轮询
  • 通过 Selector.select() 轮询就绪事件
java
while (true) {
    //阻塞直到有事件就绪
    if (selector.select() == 0)
        continue;

    Set<SelectionKey> keys = selector.selectedKeys();
    Iterator<SelectionKey> iterator = keys.iterator();

    while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        iterator.remove(); //处理完必须移除

        if (key.isAcceptable()) {
            //处理新连接
            handleAccept(key);
        } else if (key.isReadable()) {
            //处理读数据
            handleRead(key);
        } else if (key.isWritable()) {
            //处理写数据
            handleWrite(key);
        }
    }
}
  1. 事件处理逻辑
  • 连接
java
private  void handleAccept(SelectionKey key) throws IOException {
    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
    //接受客户端连接
    SocketChannel sc = ssc.accept();
    sc.configureBlocking(false);
    //注册读事件
    sc.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocate(1024));
}
  • 读取 (Read)
java
private void handleRead(SelectionKey key) throws IOException {
    SocketChannel sc = (SocketChannel) key.channel();
    ByteBuffer buffer = (ByteBuffer) key.attachment();
    if (sc.read(buffer) == -1) {
        sc.close();
        return;
    }
    buffer.flip(); //切换为读模式
    // 处理数据
    while (buffer.hasRemaining()) {
        System.out.println((char) buffer.get());
    }
    //模拟响应数据,传递给写事件
    String response = "根据读入数据生成的响应数据";
    ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
    key.attach(responseBuffer); //绑定到 key 传递上下文
    //注册写事件,准备响应
    key.interestOps(SelectionKey.OP_WRITE);
}
  • 写出 (Write)
java
private void handleWrite(SelectionKey key) throws IOException {
    SocketChannel sc = (SocketChannel) key.channel();
    //取出读操作的响应数据
    ByteBuffer buffer = (ByteBuffer) key.attachment();

    while (buffer.hasRemaining()) {
        if (sc.write(buffer) == 0)
            break;
    }

    //重置为读事件 (如果需要持续监听)
    key.interestOps(SelectionKey.OP_READ);
    key.attach(null); //清空上下文
}
  1. 客户端模拟请求
java
public static void main(String[] args) throws Exception {

    SocketChannel socketChannel = SocketChannel.open();
    socketChannel.configureBlocking(false);

    InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
    
    String str = "hello,world";
    //将 byte array 写入 ByteBuffer
    ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
    // buffer 数据写入到 socketChannel
    socketChannel.write(buffer);
    System.in.read();
}

群聊系统案例

ChatServer

  • 初始化
java
public class GroupChatServer {

    private Selector selector;
    private ServerSocketChannel listenChannel;
    private static final int PORT = 6667;

    //初始化
    public GroupChatServer() {
        try {
            //创建 选择器
            selector = Selector.open();
            //创建监听 Channel
            listenChannel = ServerSocketChannel.open();
            //绑定监听端口
            listenChannel.socket().bind(new InetSocketAddress(PORT));
            //非阻塞模式
            listenChannel.configureBlocking(false);
            //注册监听事件
            listenChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
  • 监听事件
java
public void listen() {
    try {
        //循环处理
        while (true) {
            int count = selector.select();
            if (count > 0) {
                //遍历得到 selectionKeys 集合
                Iterator<SelectionKey> iterator =
                        selector.selectedKeys().iterator();

                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    //处理后移除
                    iterator.remove();
                    if (key.isAcceptable()) {
                        SocketChannel sc = listenChannel.accept();
                        sc.configureBlocking(false);
                        sc.register(selector,SelectionKey.OP_READ);
                        System.out.println(sc.getRemoteAddress() + " 上线");
                    }
                    if (key.isReadable()) {
                        read(key);
                    }
                }
            }
        }
    }catch (IOException e) {
        e.printStackTrace();
    }
}
  • 读消息
java
private void read(SelectionKey key) {
    SocketChannel channel = null;
    try {
        channel = (SocketChannel) key.channel();

        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int count = channel.read(buffer);
        if (count > 0) {
            //缓冲区的数据转为字符串
            String msg = new String(buffer.array());
            System.out.println("from 客户端:" + msg);
            sendToOther(msg,channel);
        }
    }catch (IOException e) {
        try {
            System.out.println(channel.getRemoteAddress() + " 离线了");
            //取消注册
            key.cancel();
            //关闭通道
            channel.close();
        } catch (IOException ex) {
            throw new RuntimeException(ex);
        }
    }
}
  • 转发消息
java
//转发消息给其他客户通道
private void sendToOther(String msg,SocketChannel self) throws IOException {
    System.out.println("服务器转发消息中...");
    for (SelectionKey key : selector.keys()) {
        Channel targetChannel = key.channel();

        //排除自己
        if (targetChannel instanceof SocketChannel dest
                && targetChannel != self) {
            //数据放入 buffer
            ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
            //写出到 通道
            dest.write(buffer);
        }
    }
}

ChatClient

  • 初始化
java
public class GroupChatClient {
    
    private final String HOST = "127.0.0.1";
    private final int PORT = 6667;
    private Selector selector;
    private SocketChannel socketChannel;
    private String username;

    public GroupChatClient() throws IOException {
        selector = Selector.open();
        socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
        username = socketChannel.getLocalAddress().toString().substring(1);
        System.out.println(username + " is ok...");
    }
}
  • 发送消息
java
public void sendInfo(String info) {
    info = username + "说:" + info;
    try {
        socketChannel.write(ByteBuffer.wrap(info.getBytes()));
    } catch (IOException e) {
        e.printStackTrace();
    }
}
  • 读取消息
java
public void readInfo() {
    try {
        int readChannels = selector.select();
        if (readChannels > 0) {
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                if (key.isReadable()) {
                    SocketChannel sc = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    sc.read(buffer);
                    String msg = new String(buffer.array());
                    System.out.println(msg.trim());
                }
            }
        }else {
            System.out.println("没有可用的通道");
        }
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

验证

  • ChatClient
java
public static void main(String[] args) throws Exception{
    GroupChatClient chatClient = new GroupChatClient();

    new Thread(() -> {
        while (true) {
            chatClient.readInfo();
        }
    }).start();

    //发送数据给服务器端
    Scanner scanner = new Scanner(System.in);
    while (scanner.hasNextLine()) {
        String s = scanner.nextLine();
        chatClient.sendInfo(s);
    }
}
  • ChatServer
java
public static void main(String[] args) {
    GroupChatServer chatServer = new GroupChatServer();
    chatServer.listen();
}

零拷贝 (Zero-Copy)

零拷贝是一种优化技术,旨在消除数据在内核态与用户态之间的冗余拷贝,降低 CPU 和 内存开销。

传统数据拷贝的问题

以从磁盘读取文件并通过网络发送为例,传统流程如下:

  • 磁盘 -> 内核缓冲区:DMA 拷贝 (无需 CPU)
  • 内核缓冲区 -> 用户缓冲区:CPU 拷贝 (例如 FileInputStream.read()
  • 用户缓冲区 -> Socket 缓冲区:CPU 拷贝 (例如 SocketOutputStream.write()
  • Socket 缓冲区 -> 网卡:DMA 拷贝

传统流程会有 4 次上下文切换,2 次 CPU 拷贝

零拷贝实现方式

sendfile() 系统调用 (linux 2.4+)

  • 磁盘 -> 内核缓冲区:DMA 拷贝 (无需 CPU)
  • 内核缓冲区 -> Socket 缓冲区:CPU 拷贝少量数据
  • Socket 缓冲区 -> 网卡:DMA 拷贝

2 次上下文切换,1 次 CPU 拷贝

FileChannel.transferTo() 内部调用了 sendfile() 系统调用

java
FileChannel source = FileChannel.open(Paths.get("source.txt"));
SocketChannel socket = SocketChannel.open(new InetSocketAddress("localhost", 8080));
source.transferTo(0, source.size(), socket);

mmap 内存映射文件

将文件映射到进程的虚拟内存空间,用户态代码可通过指针直接操作文件 (内核与用户共用同一块物理内存)

总结

零拷贝适用场景

  • 只读或直传场景:无需修改数据,如文件下载、消息队列消费
  • 高并发、大流量:需要最大限度减少 CPU 和 内存 开销
  • 操作系统支持:Linux 环境且内核版本较新

传统拷贝适用场景

  • 数据需要加工处理:如加密、压缩、协议转换等
  • 兼容性受限:操作系统内核不支持
  • 小数据量传输:零拷贝优化收益低于上下文切换成本