IO 流
BIO
同步并阻塞的 IO 模型。客户端有连接请求时,服务器端就需要启动一个线程进行处理。
文件基本操作
java
//创建文件方式1
File file = new File("news1.txt");
file.createNewFile();
//创建文件方式2
File file3 = new File("d:\\", "news3.txt");
file3.createNewFile();
//删除文件
if (file.exists()) {
file.delete();
}
//创建/删除 目录
//在 Java 中目录是当成特殊的文件处理的
File dir = new File("D:\\demo2");
dir.mkdir();
if (dir.exists()) {
dir.delete();
}
//创建多级目录,多级目录需要递归删除
File dirs = new File("D:\\demo3\\demo33");
dirs.mkdirs();
字节流
InputStream
:所有字节输入流的超类(如FileInputStream
、ByteArrayInputStream
)OutputStream
:所有字节输出流的超类(如FileOutputStream
、ByteArrayOutputStream
)
文件读写示例
java
// 复制文件(字节流)
try (InputStream in = new FileInputStream("source.txt");
OutputStream out = new FileOutputStream("target.txt")) {
byte[] buffer = new byte[1024];
int bytesRead;
while ((bytesRead = in.read(buffer)) != -1) {
out.write(buffer, 0, bytesRead);
}
}
字符流
Reader
:所有字符输入流的超类(如FileReader
、StringReader
)Writer
:所有字符输出流的超类(如FileWriter
、StringWriter
)
缓冲优化:通常结合 BufferedReader
和 BufferedWriter
提升性能
java
// 按行读取文本文件
try (BufferedReader reader = new BufferedReader(new FileReader("input.txt"))) {
String line;
while ((line = reader.readLine()) != null) {
System.out.println(line);
}
}
关键细节与最佳实践
- 资源释放
- 使用
try-with-resources
自动关闭流
java
try (InputStream in = new FileInputStream("file.txt")) {
// 使用 in...
} // 自动调用 in.close()
- 缓冲技术
- 始终使用缓冲流 (如
BufferedInputStream
、BufferedWriter
) 提升性能
java
// 从文件读取并写入到另一个文件(缓冲优化)
try (BufferedInputStream bin = new BufferedInputStream(new FileInputStream("in.txt"));
BufferedOutputStream bout = new BufferedOutputStream(new FileOutputStream("out.txt"))) {
byte[] buffer = new byte[4096];
int bytesRead;
while ((bytesRead = bin.read(buffer)) != -1) {
bout.write(buffer, 0, bytesRead);
}
}
- 流类型选择表
数据类型 | 推荐流类型 | 典型类组合 |
---|---|---|
文本文件(无编码要求) | FileReader + BufferedReader | new BufferedReader(new FileReader(...)) |
文本文件(指定编码) | InputStreamReader + FileInputStream | new BufferReader(new InputStreamReader(new FileInputStream(...),"UTF-8")) |
二进制文件(图片、视频) | BufferedInputStream /BufferedOutputStream | new BufferedInputStream(new FileInputStream(...)) |
文本网络流 (HTTP响应) | InputStreamReader + Socket.getInputStream | 处理 Http Body 文本 |
基本数据类型读写 | DataInputStream + DataOutputStream | 配合 FileOutputStream 持久化数据 |
NIO
NIO 同步非阻塞 IO 模型,NIO 有三大核心部分 Channel
、Buffer
、Selector
- 多个
Channel
注册到同一个Selector
上 Selector
根据不同的事件在不同的Channel
上进行切换 (事件驱动)Buffer
就是一个内存块,底层有一个数组- 数据的读取写入通过
Buffer
,是双向的,通过flip()
进行切换
Buffer
java
//创建一个 Buffer,大小为 5 即可以存放 5 个 int
IntBuffer intBuffer = IntBuffer.allocate(5);
for (int i = 0; i < intBuffer.capacity(); i++) {
intBuffer.put(i * 2);
}
//读写切换
intBuffer.flip();
while (intBuffer.hasRemaining()){
System.out.println(intBuffer.get());
}
Buffer 的核心源码属性
java
// Invariants: mark <= position <= limit <= capacity
private int mark = -1; //标记
private int position = 0; //位置,下一个要被读或写的元素的索引
private int limit; //表示缓冲区的当前终点,即缓冲区中可以操作的元素个数
private final int capacity; //缓冲区最大容量,在创建时确定并且不能改变
Buffer
常用方法
java
public final int capacity() //返回缓冲区的容量
public final int position() //返回缓冲区当前位置
public final Buffer position(int newPosition) //设置缓冲区当前位置
public final int limit() //返回此缓冲区限制
public final Buffer limit(int newLimit) //设置此缓冲区的限制
public final Buffer clear() //清除缓冲区,恢复各标记到初始状态,但数据没有真正擦除
public final Buffer flip() //反转缓冲区,读写转换,limit=position,position=0
//jdk 1.6 引入
public final boolean hasArray() //缓冲区是否具有可访问的底层实现数组
public final Object array() //返回此缓冲区的底层实现数组
ByteBuffer
常用方法
java
public static ByteBuffer allocateDirect(int capacity) //创建一个直接缓冲区
public static ByteBuffer allocate(int capacity) //创建一个非直接缓冲区
MappedByteBuffer
java
RandomAccessFile randomAccessFile = new RandomAccessFile("1.txt","rw");
FileChannel fileChannel = randomAccessFile.getChannel();
//MappedByteBuffer 让文件直接在堆外内存修改,不用拷贝一次,速度更快
MappedByteBuffer map = fileChannel.map(FileChannel.MapMode.READ_WRITE,
0, //可以直接修改的起始位置
5);// 映射到内存的大小,即可以修改的范围
map.put(0,(byte)'H');
map.put(3,(byte)'9');
- 利用
Buffer
数组进行读写
java
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);
// 绑定端口到 socket , 并启动
serverSocketChannel.socket()
.bind(inetSocketAddress);
//创建 buffer 数组
ByteBuffer[] byteBuffers = new ByteBuffer[2];
byteBuffers[0] = ByteBuffer.allocate(5);
byteBuffers[1] = ByteBuffer.allocate(3);
//等客户端连接
SocketChannel socketChannel = serverSocketChannel.accept();
int messageSize = 8;
while (true) {
int byteRead = 0;
while (byteRead < messageSize) {
long read = socketChannel.read(byteBuffers);
byteRead += read; // 累计读取的字节数
//使用流打印,查看当前 buffer 的 position 和 limit
for (ByteBuffer byteBuffer : byteBuffers) {
System.out.println("position=" + byteBuffer.position()
+ ",limit=" + byteBuffer.limit());
}
}
//将所有的 buffer 进行读写反转
for (ByteBuffer byteBuffer : byteBuffers) {
byteBuffer.flip();
}
//将数据写出显示到客户端
long byteWritten = 0;
while (byteWritten < messageSize) {
long write = socketChannel.write(byteBuffers);
byteWritten += write;
}
//所有的 buffer 进行 clear
for (ByteBuffer byteBuffer : byteBuffers) {
byteBuffer.clear();
}
}
Channel
Channel
是一种双向的,可以读写数据的通道,Stream
要么只能读,要么只能写
NIO 方式实现文件拷贝
java
try (
FileInputStream fileInputStream = new FileInputStream("source.txt");
FileOutputStream fileOutputStream = new FileOutputStream("target.txt");
){
FileChannel inChannel = fileInputStream.getChannel();
FileChannel outChannel = fileOutputStream.getChannel();
//创建 Buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//读入到 Buffer 中
while (true) {
int read = inChannel.read(byteBuffer);
if (read == -1) { // 读完退出
break;
}
}
//反转,从 Buffer 中写出
byteBuffer.flip();
outChannel.write(byteBuffer);
}
Channel 的 transferFrom 直接实现文件拷贝
java
try (
FileInputStream fileInputStream = new FileInputStream("线程生命周期.png");
FileOutputStream fileOutputStream = new FileOutputStream("img2.png");
){
FileChannel inChannel = fileInputStream.getChannel();
FileChannel outChannel = fileOutputStream.getChannel();
//使用 transferFrom 完成拷贝
outChannel.transferFrom(inChannel, 0, inChannel.size());
}
Selector
Selector
是一个多路复用器,用于监控多个Channel
的 IO 事件,由事件类型驱动。
事件类型 | 就绪条件 |
---|---|
OP_READ | 接收缓冲区中有数据可读 |
OP_WRITE | 发送缓冲区有空间可写或非阻塞模式下连接未完成 |
OP_CONNECT | 客户端与服务端已完成三次握手 finishConnect() |
OP_ACCEPT | 服务端套接字监听队列中有新连接请求 |
NIO 网络编程流程
服务端
- 初始化阶段
- 创建
Selector
java
Selector selector = Selector.open();
- 启动
ServerSocketChannel
java
//启动 ServerSocketChannel
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); //非阻塞模式
ssc.socket().bind(new InetSocketAddress(6666));
//注册 Accept 事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
- 事件轮询
- 通过
Selector.select()
轮询就绪事件
java
while (true) {
//阻塞直到有事件就绪
if (selector.select() == 0)
continue;
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove(); //处理完必须移除
if (key.isAcceptable()) {
//处理新连接
handleAccept(key);
} else if (key.isReadable()) {
//处理读数据
handleRead(key);
} else if (key.isWritable()) {
//处理写数据
handleWrite(key);
}
}
}
- 事件处理逻辑
- 连接
java
private void handleAccept(SelectionKey key) throws IOException {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//接受客户端连接
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
//注册读事件
sc.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocate(1024));
}
- 读取 (Read)
java
private void handleRead(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
if (sc.read(buffer) == -1) {
sc.close();
return;
}
buffer.flip(); //切换为读模式
// 处理数据
while (buffer.hasRemaining()) {
System.out.println((char) buffer.get());
}
//模拟响应数据,传递给写事件
String response = "根据读入数据生成的响应数据";
ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
key.attach(responseBuffer); //绑定到 key 传递上下文
//注册写事件,准备响应
key.interestOps(SelectionKey.OP_WRITE);
}
- 写出 (Write)
java
private void handleWrite(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
//取出读操作的响应数据
ByteBuffer buffer = (ByteBuffer) key.attachment();
while (buffer.hasRemaining()) {
if (sc.write(buffer) == 0)
break;
}
//重置为读事件 (如果需要持续监听)
key.interestOps(SelectionKey.OP_READ);
key.attach(null); //清空上下文
}
- 客户端模拟请求
java
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
String str = "hello,world";
//将 byte array 写入 ByteBuffer
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
// buffer 数据写入到 socketChannel
socketChannel.write(buffer);
System.in.read();
}
群聊系统案例
ChatServer
- 初始化
java
public class GroupChatServer {
private Selector selector;
private ServerSocketChannel listenChannel;
private static final int PORT = 6667;
//初始化
public GroupChatServer() {
try {
//创建 选择器
selector = Selector.open();
//创建监听 Channel
listenChannel = ServerSocketChannel.open();
//绑定监听端口
listenChannel.socket().bind(new InetSocketAddress(PORT));
//非阻塞模式
listenChannel.configureBlocking(false);
//注册监听事件
listenChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
}
- 监听事件
java
public void listen() {
try {
//循环处理
while (true) {
int count = selector.select();
if (count > 0) {
//遍历得到 selectionKeys 集合
Iterator<SelectionKey> iterator =
selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//处理后移除
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = listenChannel.accept();
sc.configureBlocking(false);
sc.register(selector,SelectionKey.OP_READ);
System.out.println(sc.getRemoteAddress() + " 上线");
}
if (key.isReadable()) {
read(key);
}
}
}
}
}catch (IOException e) {
e.printStackTrace();
}
}
- 读消息
java
private void read(SelectionKey key) {
SocketChannel channel = null;
try {
channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = channel.read(buffer);
if (count > 0) {
//缓冲区的数据转为字符串
String msg = new String(buffer.array());
System.out.println("from 客户端:" + msg);
sendToOther(msg,channel);
}
}catch (IOException e) {
try {
System.out.println(channel.getRemoteAddress() + " 离线了");
//取消注册
key.cancel();
//关闭通道
channel.close();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
- 转发消息
java
//转发消息给其他客户通道
private void sendToOther(String msg,SocketChannel self) throws IOException {
System.out.println("服务器转发消息中...");
for (SelectionKey key : selector.keys()) {
Channel targetChannel = key.channel();
//排除自己
if (targetChannel instanceof SocketChannel dest
&& targetChannel != self) {
//数据放入 buffer
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
//写出到 通道
dest.write(buffer);
}
}
}
ChatClient
- 初始化
java
public class GroupChatClient {
private final String HOST = "127.0.0.1";
private final int PORT = 6667;
private Selector selector;
private SocketChannel socketChannel;
private String username;
public GroupChatClient() throws IOException {
selector = Selector.open();
socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
username = socketChannel.getLocalAddress().toString().substring(1);
System.out.println(username + " is ok...");
}
}
- 发送消息
java
public void sendInfo(String info) {
info = username + "说:" + info;
try {
socketChannel.write(ByteBuffer.wrap(info.getBytes()));
} catch (IOException e) {
e.printStackTrace();
}
}
- 读取消息
java
public void readInfo() {
try {
int readChannels = selector.select();
if (readChannels > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
sc.read(buffer);
String msg = new String(buffer.array());
System.out.println(msg.trim());
}
}
}else {
System.out.println("没有可用的通道");
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
验证
- ChatClient
java
public static void main(String[] args) throws Exception{
GroupChatClient chatClient = new GroupChatClient();
new Thread(() -> {
while (true) {
chatClient.readInfo();
}
}).start();
//发送数据给服务器端
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String s = scanner.nextLine();
chatClient.sendInfo(s);
}
}
- ChatServer
java
public static void main(String[] args) {
GroupChatServer chatServer = new GroupChatServer();
chatServer.listen();
}
零拷贝 (Zero-Copy)
零拷贝是一种优化技术,旨在消除数据在内核态与用户态之间的冗余拷贝,降低 CPU 和 内存开销。
传统数据拷贝的问题
以从磁盘读取文件并通过网络发送为例,传统流程如下:
- 磁盘 -> 内核缓冲区:DMA 拷贝 (无需 CPU)
- 内核缓冲区 -> 用户缓冲区:CPU 拷贝 (例如
FileInputStream.read()
) - 用户缓冲区 -> Socket 缓冲区:CPU 拷贝 (例如
SocketOutputStream.write()
) - Socket 缓冲区 -> 网卡:DMA 拷贝
传统流程会有 4 次上下文切换,2 次 CPU 拷贝
零拷贝实现方式
sendfile()
系统调用 (linux 2.4+)
- 磁盘 -> 内核缓冲区:DMA 拷贝 (无需 CPU)
- 内核缓冲区 -> Socket 缓冲区:CPU 拷贝少量数据
- Socket 缓冲区 -> 网卡:DMA 拷贝
2 次上下文切换,1 次 CPU 拷贝
FileChannel.transferTo()
内部调用了sendfile()
系统调用
java
FileChannel source = FileChannel.open(Paths.get("source.txt"));
SocketChannel socket = SocketChannel.open(new InetSocketAddress("localhost", 8080));
source.transferTo(0, source.size(), socket);
mmap
内存映射文件
将文件映射到进程的虚拟内存空间,用户态代码可通过指针直接操作文件 (内核与用户共用同一块物理内存)
总结
零拷贝适用场景:
- 只读或直传场景:无需修改数据,如文件下载、消息队列消费
- 高并发、大流量:需要最大限度减少 CPU 和 内存 开销
- 操作系统支持:Linux 环境且内核版本较新
传统拷贝适用场景:
- 数据需要加工处理:如加密、压缩、协议转换等
- 兼容性受限:操作系统内核不支持
- 小数据量传输:零拷贝优化收益低于上下文切换成本