08NIO和AIO

NIO和AIO

什么是NIO Buffer Channel 网络编程 AIO

demo实例主要是: tmp2023040103 BIO tmp2023040901 NIO tmp2023041401 AIO

为什么需要了解NIO和AIO?

NIO为主,虽然对多线程控制和协作没有提出什么观点。 但是在应用层面,改变了线程的使用方式,解决了一些实际的困难,节省了系统成本。

NIO是new IO AIO是异步IO (异步在后台使用线程做某些事情,使得前面的线程可以很快返回)

NIO和线程相关的部分的梳理

什么是NIO

NIO是 New I/O的缩写, 与旧式基于流的I/O方法对应。 他是一套新的 Java I/O标准, 在java1.4中被纳入jdk。

NIO特性

  • NIO是基于块(Block)的, 而流是基于字节的
  • NIO为所有的原始类型提供(Buffer)缓存支持
  • NIO增加通道(Channel)对象,作为新的原始I/O的抽象
  • NIO支持文件锁和内存映射文件访问接口
  • NIO提供了基于Selector的异步网络I/O
image-20230330002741515

Buffer

NIO核心,所有的操作都要通过Buffer

java中Buffer的实现(基本类型都有对应的实现) ByteBuffer,CharBuffer…

基本使用流程

FileInputStream fin = new FileInputStream(new)

Buffer有三个重要的参数:位置(position),容量(capacity)和上限(limit)

image-20230401105500755

然后有几个常用函数

public final Buffer rewind();  //将position置零,并清除标志位(mark)
public final Buffer clear();    //将position置零,同时将limit设置为capacity大小,并清除标志位(mark)
public final Buffer flip(); //先将limit设置到position所在位置,然后将position置零,并清除标志位(mark);通常在写读转换时使用

使用demo

import java.nio.ByteBuffer;

public class Main {
    public static void main(String[] args) {
        ByteBuffer bf = ByteBuffer.allocate(15);
        System.out.println("开始");
        System.out.println("capacity:" + bf.capacity() + "\tposition:" + bf.position() + "\tlimit()" + bf.limit());
        //写10byte
        for(int i = 0 ; i < 10; i ++ ) {
            bf.put((byte) i);
        }
        System.out.println("写10byte");
        System.out.println("capacity:" + bf.capacity() + "\tposition:" + bf.position() + "\tlimit()" + bf.limit());
        //flip
        bf.flip();
        System.out.println("flip");
        System.out.println("capacity:" + bf.capacity() + "\tposition:" + bf.position() + "\tlimit()" + bf.limit());
        //读5byte
        for(int i = 0; i <  5; i++) {
            bf.get();
        }
        System.out.println("读5byte");
        System.out.println("capacity:" + bf.capacity() + "\tposition:" + bf.position() + "\tlimit()" + bf.limit());
        //flip
        bf.flip();
        System.out.println("flip");
        System.out.println("capacity:" + bf.capacity() + "\tposition:" + bf.position() + "\tlimit()" + bf.limit());
    }

}

Channel

I/O的抽象,另一端的目标可能是文件或socket

Channel源码上的说明:

/*
 * Copyright (c) 2000, 2013, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package java.nio.channels;

import java.io.IOException;
import java.io.Closeable;


/**
 * A nexus for I/O operations.
 *
 * <p> A channel represents an open connection to an entity such as a hardware
 * device, a file, a network socket, or a program component that is capable of
 * performing one or more distinct I/O operations, for example reading or
 * writing.
 *
 * <p> A channel is either open or closed.  A channel is open upon creation,
 * and once closed it remains closed.  Once a channel is closed, any attempt to
 * invoke an I/O operation upon it will cause a {@link ClosedChannelException}
 * to be thrown.  Whether or not a channel is open may be tested by invoking
 * its {@link #isOpen isOpen} method.
 *
 * <p> Channels are, in general, intended to be safe for multithreaded access
 * as described in the specifications of the interfaces and classes that extend
 * and implement this interface.
 */

public interface Channel extends Closeable {

    /**
     * Tells whether or not this channel is open.
     *
     * @return {@code true} if, and only if, this channel is open
     */
    public boolean isOpen();

    /**
     * Closes this channel.
     *
     * <p> After a channel is closed, any further attempt to invoke I/O
     * operations upon it will cause a {@link ClosedChannelException} to be
     * thrown.
     *
     * <p> If this channel is already closed then invoking this method has no
     * effect.
     *
     * <p> This method may be invoked at any time.  If some other thread has
     * already invoked it, however, then another invocation will block until
     * the first invocation is complete, after which it will return without
     * effect. </p>
     *
     * @throws  IOException  If an I/O error occurs
     */
    public void close() throws IOException;

}

网络编程

demo: 用java.io中的同步接口,实现的一个echoServer,和Client

为每一个客户端使用一个线程的EchoServer

实际上就是BIO (阻塞io)

(这里的client使用多个线程处理命令行输入输出,socket输入输出, 来保证输入输出能及时响应;并用BlockingQueue来传递线程间数据)

echoServer:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class EchoServer {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        try (ServerSocket serverSocket = new ServerSocket(8000)) {
            while (true) {
                Socket clientSocket = serverSocket.accept();
                executorService.submit(new HandleMessage(clientSocket));
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    static class HandleMessage implements Runnable {
        Socket socket;
        public HandleMessage(Socket socket) {
            this.socket = socket;
        }
        @Override
        public void run() {
            try (
                    BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
                    PrintWriter pw = new PrintWriter(socket.getOutputStream(), true, StandardCharsets.UTF_8);
            ) {
                String str = null;
                while ((str = br.readLine()) != null) {
                    pw.println(str);
//                    System.out.println(str);
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

Client:

import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

public class Client {
    public static void main(String[] args) throws IOException {

        //新建socket连接server端
        Socket socket = new Socket();
        socket.connect(new InetSocketAddress("localhost", 8000));

//        //用来暂存来自socket的内容
//        LinkedBlockingDeque<String> msgFromRemoteList = new LinkedBlockingDeque<>();
//        //来自socket的输入,都转存到msgFromRemoteList
//        Thread readFromSocketHandler = new ReadFromSocketHandler(socket, msgFromRemoteList);
        //来自socket的输入,都实时发送到System.out
        Thread readFromSocketHandler = new ReadFromSocketHandler(socket, System.out);
        readFromSocketHandler.start();

        //用来暂存向socket输入的内容
        LinkedBlockingDeque<String> msgToRemoteList = new LinkedBlockingDeque<>();
        //向msgToRemoteList写内容,继而会发送到socket
        Thread writeToSocketHandler = new WriteToSocketHandler(socket, msgToRemoteList);
        writeToSocketHandler.start();

        //当命令行按下了中断按钮(一般是ctrl+c),中断退出处理
        Runtime.getRuntime().addShutdownHook(new ExitHandler(Arrays.asList(writeToSocketHandler)));

        //本地命令行接收输入,展示输出
        try (
                Scanner scanner = new Scanner(System.in);
        ) {
            String cmd;
            while ((cmd = scanner.nextLine()) != null) {
                //输入了明确的退出命令
                if ("quit".equals(cmd)) {
                    break;
                }
                else {
                    //将命令发送到远程处理
                    msgToRemoteList.push(cmd);
                }
            }
            //两个线程已经设置为了守护线程,但是阻塞的socket的io不响应中断, 不会主动退出
            // 方法一(这里采用的方法): 需要先告诉对端,要关闭socket了
            //     这里不能手动socket.close(),否则readFromSocketHandler中的BufferedReader.readLine会报错;
            //     需要使用中断结束掉writeToSocketHandler,由此线程关闭socket.outputStream,对端确认关闭时,readFromSocketHandler中的BufferedReader.readLine会返回null正常退出
            writeToSocketHandler.interrupt();
//        readFromSocketHandler.interrupt();
            //方法二(暂未采用):当然,也可以设置这里socket关闭后,手动处理下readFromSocketHandler中的SocketException,在其中处理关闭逻辑
            //方法三(暂未采用):还可以覆写对应线程的interrupt方法,将socket.close写进去,并忽略错误:https://blog.csdn.net/c1776167012/article/details/109989517)
//        socket.close();
        }//scanner.close();
        catch (RuntimeException re) {
//            throw re;
        }
    }

    static class ExitHandler extends Thread {
        List<Thread> threads;

        public ExitHandler(List<Thread> threads) {
            this.threads = threads;
        }

        @Override
        public void run() {
            for (Thread thread : threads) {
                thread.interrupt();
            }
            for (Thread thread : threads) {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    static class ReadFromSocketHandler extends Thread {
        Socket socket;
        //从socket接收到的内容,放入阻塞队列 或 输出到对应的流当中
        BlockingDeque<String> msgList;
        OutputStream outputStream;

        public ReadFromSocketHandler(Socket socket, BlockingDeque<String> msgInList) {
            super();
            //设为守护线程(主线程或其他非守护线程退出时,当前线程也退出)
            this.setDaemon(true);
            assert socket != null;
            assert msgInList != null;
            this.socket = socket;
            this.msgList = msgInList;
        }

        public ReadFromSocketHandler(Socket socket, OutputStream os) {
            super();
            //设为守护线程(主线程或其他非守护线程退出时,当前线程也退出)
            this.setDaemon(true);
            assert socket != null;
            assert os != null;
            this.socket = socket;
            this.outputStream = os;
        }

        @Override
        public void run() {
            assert socket.isConnected();
            //将socket接收到的数据转存到BlockingQueue中, 或直接发送到给定的outputStream中
            try (
                    BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
                    BufferedWriter bw = msgList == null ? new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) : null;
            ) {
                String str;
                while (!socket.isClosed() && (str = br.readLine()) != null) {
                    if (msgList != null) {
                        msgList.push(str);
                    }
                    else {
                        bw.write(str);
                        bw.write('\n');
                        bw.flush();
                    }
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    static class WriteToSocketHandler extends Thread {
        Socket socket;
        BlockingQueue<String> msgList;

        public WriteToSocketHandler(Socket socket, BlockingQueue<String> msgOutList) {
            this.socket = socket;
            this.msgList = msgOutList;
        }

        @Override
        public void run() {
            assert socket.isConnected();
            //将BlockingQueue中的数据发送到socket当中
            try (PrintWriter pw = new PrintWriter(socket.getOutputStream(), true, StandardCharsets.UTF_8);) {
                while (true) {
                    try {
                        String str = msgList.take();
                        pw.println(str);
                        pw.flush();
                    } catch (InterruptedException e) {
                        //接收中断,正常退出
                        break;
                    }
                }

            } catch (IOException e) {
                throw new RuntimeException(e);
            }

        }
    }

}

我这里,EchoServer创建了两个固定线程线程池。 每个客户端连接建立socket时,使用一个线程处理。只要客户端socket没断开,将一直由之前分配的线程处理对应的客户端消息。

你会发现,这里只允许两个客户端连接EchoServer。当第三个客户端连接时,serverSocket仍会accept对应的连接。但是对应任务提交到线程池后,没有线程处理对应的socket。便会阻塞第三个客户端中的所有io。

问题:

  • 为每一个客户端使用一个线程,如果客户端保持长连接,或者出现延时等情况,线程可能会占用很长时间。因为数据的准备和读取都在这个线程中。(准备指,数据从客户端传到服务端;读取指,数据从客户端io中读入对应的进程变量里)
  • 此时,如果客户端数量众多,可能hi小号大量的系统资源

解决:

  • 非阻塞的NIO (我个人观点,就是其他地方说的IO多路复用技术、selector技术)
  • 数据准备好了再工作

使用NIO的EchoServer

不一定与前面的EchoClient适配(这里是用jmeter做客户端测的)

运行环境:jmeter和EchoServer(debug模式)都在同一台电脑上。8core,16g

jmeter: Thread Group: Number of Threads(users): 20000 Ramp-up period(seconds): 10 Loop Count: 1

也就是 20000个请求,在10s内执行完的情况。Error%: <1%.

当 25000个请求,在10s内执行完的情况时。 Error%: > 60%. 已几乎不可用。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.*;

public class EchoServer {
    Selector selector;

    /**
     * 初始化server
     */
    public void init(Integer port) throws IOException {
        assert port != null;

        selector = Selector.open();

        //创建用于accept连接的channel,并绑定端口
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress("localhost", port));
        ssc.configureBlocking(false);

        //注册到selector
        ssc.register(selector, SelectionKey.OP_ACCEPT);
    }

    /**
     * 开始处理业务
     */
    public void work() throws IOException {
        while (true) {
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                //获取并移除sk
                SelectionKey sk = iterator.next();
                iterator.remove();

                //判断并处理关注的事件
                if (sk.isValid() && sk.isAcceptable()) {
                    doAccept(sk);
                }

                if (sk.isValid() && sk.isReadable()) {
                    doRead(sk);
                }

                if (sk.isValid() && sk.isWritable()) {
                    doWrite(sk);
                }

            }
        }
    }

    /**
     * 关闭server
     */
    public void close() {
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException ioe) {
                ioe.printStackTrace();
            }
        }
    }



    /**
     * selectedKey可读时,执行的操作
     *
     * @param sk selectionkey
     * @throws IOException
     */
    private void doRead(SelectionKey sk) throws IOException {
        SocketChannel sc = (SocketChannel) sk.channel();
        ConcurrentLinkedQueue<ByteBuffer> attachment = (ConcurrentLinkedQueue<ByteBuffer>) sk.attachment();
        if (attachment == null) {
            //需要保证线程安全吗?这里?会有其他线程存取attachment吗?
            synchronized (sk) {
                attachment = (ConcurrentLinkedQueue<ByteBuffer>) sk.attachment();
                if (attachment == null) {
                    attachment = new ConcurrentLinkedQueue<>();
                    sk.attach(attachment);
                }
            }
        }

        //保证到这里attachment已经初始化完毕
        assert attachment != null && attachment instanceof ConcurrentLinkedQueue<ByteBuffer>;

        ByteBuffer bb = ByteBuffer.allocate(1024);
        int count = sc.read(bb);
        if (count > 0) {
            //这里只是把收到的ByteBuffer存起来
            attachment.add(bb);
            //继续读,或者或客户端响应信息
            sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        }
        else if (count == 0) {
            //什么都不做
        }
        else {
            //end of stream
            sk.cancel();
            sc.close();
        }
    }


    /**
     * selectionKey 可写时执行的操作
     *
     * @param sk selectionKey
     */
    private void doWrite(SelectionKey sk) throws IOException {
        SocketChannel sc = (SocketChannel) sk.channel();
        ConcurrentLinkedQueue<ByteBuffer> attachment = (ConcurrentLinkedQueue<ByteBuffer>) sk.attachment();

        if (attachment == null || attachment.isEmpty()) return;

        ByteBuffer bb = null;
        while ((bb = attachment.peek()) != null) {
            bb.flip();
            if (bb.hasRemaining()) {
//                //这里是转为string,直接输出到标准输出
//                String str = new String(bb.array(), bb.position(), bb.limit(), StandardCharsets.UTF_8);
//                System.out.print(str);
                //并回写
                sc.write(bb);
                if(!bb.hasRemaining()) {
                    //如果写完了,丢弃此ByteBuffer
                    attachment.poll();
                }
            }
        }

        sk.interestOps(SelectionKey.OP_READ);
    }


    private void doAccept(SelectionKey sk) throws IOException {
        //server side socket才会有accept情况
        ServerSocketChannel ssc = (ServerSocketChannel) sk.channel();

        //接受新的连接
        SocketChannel sc = ssc.accept();
        if(sc == null) return;

        //配置为非阻塞,并注册到selector,关注io可读情况,若有需要也可以关注可写事件
        sc.configureBlocking(false);
        sc.register(selector, SelectionKey.OP_READ);
    }


    public static void main(String[] args) {
        EchoServer server = new EchoServer();
        try {
            server.init(8000);
            server.work();
        } catch (IOException ioe) {
            ioe.printStackTrace();
            server.close();
        }
    }

}

AIO的介绍和使用

BIO是(Blocking I/O)

  • 线程阻塞在读写IO的位置,直至本次读写完毕。

NIO是(New I/O, 底层机制是IO多路复用)

  • IO准备过程交给了操作系统,
  • 然后批量选取准备好的IO,
  • 而读写这些准备好的IO的过程由用户代码控制。

AIO是

  • 把IO的准备过程+IO读写过程都交给了操作系统和java标准库,
  • 读写完成时回调用户代码。

AIO

  • 读完了再通知我
  • 不会加快IO,只是再读完之后进行通知
  • 使用回调函数,进行业务通知

主要用到的是这个接口下的方法

/*
 * Copyright (c) 2007, 2013, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package java.nio.channels;

import java.nio.ByteBuffer;
import java.util.concurrent.Future;

/**
 * An asynchronous channel that can read and write bytes.
 *
 * <p> Some channels may not allow more than one read or write to be outstanding
 * at any given time. If a thread invokes a read method before a previous read
 * operation has completed then a {@link ReadPendingException} will be thrown.
 * Similarly, if a write method is invoked before a previous write has completed
 * then {@link WritePendingException} is thrown. Whether or not other kinds of
 * I/O operations may proceed concurrently with a read operation depends upon
 * the type of the channel.
 *
 * <p> Note that {@link java.nio.ByteBuffer ByteBuffers} are not safe for use by
 * multiple concurrent threads. When a read or write operation is initiated then
 * care must be taken to ensure that the buffer is not accessed until the
 * operation completes.
 *
 * @see Channels#newInputStream(AsynchronousByteChannel)
 * @see Channels#newOutputStream(AsynchronousByteChannel)
 *
 * @since 1.7
 */

public interface AsynchronousByteChannel
    extends AsynchronousChannel
{
    /**
     * Reads a sequence of bytes from this channel into the given buffer.
     *
     * <p> This method initiates an asynchronous read operation to read a
     * sequence of bytes from this channel into the given buffer. The {@code
     * handler} parameter is a completion handler that is invoked when the read
     * operation completes (or fails). The result passed to the completion
     * handler is the number of bytes read or {@code -1} if no bytes could be
     * read because the channel has reached end-of-stream.
     *
     * <p> The read operation may read up to <i>r</i> bytes from the channel,
     * where <i>r</i> is the number of bytes remaining in the buffer, that is,
     * {@code dst.remaining()} at the time that the read is attempted. Where
     * <i>r</i> is 0, the read operation completes immediately with a result of
     * {@code 0} without initiating an I/O operation.
     *
     * <p> Suppose that a byte sequence of length <i>n</i> is read, where
     * {@code 0}&nbsp;{@code <}&nbsp;<i>n</i>&nbsp;{@code <=}&nbsp;<i>r</i>.
     * This byte sequence will be transferred into the buffer so that the first
     * byte in the sequence is at index <i>p</i> and the last byte is at index
     * <i>p</i>&nbsp;{@code +}&nbsp;<i>n</i>&nbsp;{@code -}&nbsp;{@code 1},
     * where <i>p</i> is the buffer's position at the moment the read is
     * performed. Upon completion the buffer's position will be equal to
     * <i>p</i>&nbsp;{@code +}&nbsp;<i>n</i>; its limit will not have changed.
     *
     * <p> Buffers are not safe for use by multiple concurrent threads so care
     * should be taken to not access the buffer until the operation has
     * completed.
     *
     * <p> This method may be invoked at any time. Some channel types may not
     * allow more than one read to be outstanding at any given time. If a thread
     * initiates a read operation before a previous read operation has
     * completed then a {@link ReadPendingException} will be thrown.
     *
     * @param   <A>
     *          The type of the attachment
     * @param   dst
     *          The buffer into which bytes are to be transferred
     * @param   attachment
     *          The object to attach to the I/O operation; can be {@code null}
     * @param   handler
     *          The completion handler
     *
     * @throws  IllegalArgumentException
     *          If the buffer is read-only
     * @throws  ReadPendingException
     *          If the channel does not allow more than one read to be outstanding
     *          and a previous read has not completed
     * @throws  ShutdownChannelGroupException
     *          If the channel is associated with a {@link AsynchronousChannelGroup
     *          group} that has terminated
     */
    <A> void read(ByteBuffer dst,
                  A attachment,
                  CompletionHandler<Integer,? super A> handler);

    /**
     * Reads a sequence of bytes from this channel into the given buffer.
     *
     * <p> This method initiates an asynchronous read operation to read a
     * sequence of bytes from this channel into the given buffer. The method
     * behaves in exactly the same manner as the {@link
     * #read(ByteBuffer,Object,CompletionHandler)
     * read(ByteBuffer,Object,CompletionHandler)} method except that instead
     * of specifying a completion handler, this method returns a {@code Future}
     * representing the pending result. The {@code Future}'s {@link Future#get()
     * get} method returns the number of bytes read or {@code -1} if no bytes
     * could be read because the channel has reached end-of-stream.
     *
     * @param   dst
     *          The buffer into which bytes are to be transferred
     *
     * @return  A Future representing the result of the operation
     *
     * @throws  IllegalArgumentException
     *          If the buffer is read-only
     * @throws  ReadPendingException
     *          If the channel does not allow more than one read to be outstanding
     *          and a previous read has not completed
     */
    Future<Integer> read(ByteBuffer dst);

    /**
     * Writes a sequence of bytes to this channel from the given buffer.
     *
     * <p> This method initiates an asynchronous write operation to write a
     * sequence of bytes to this channel from the given buffer. The {@code
     * handler} parameter is a completion handler that is invoked when the write
     * operation completes (or fails). The result passed to the completion
     * handler is the number of bytes written.
     *
     * <p> The write operation may write up to <i>r</i> bytes to the channel,
     * where <i>r</i> is the number of bytes remaining in the buffer, that is,
     * {@code src.remaining()} at the time that the write is attempted. Where
     * <i>r</i> is 0, the write operation completes immediately with a result of
     * {@code 0} without initiating an I/O operation.
     *
     * <p> Suppose that a byte sequence of length <i>n</i> is written, where
     * {@code 0}&nbsp;{@code <}&nbsp;<i>n</i>&nbsp;{@code <=}&nbsp;<i>r</i>.
     * This byte sequence will be transferred from the buffer starting at index
     * <i>p</i>, where <i>p</i> is the buffer's position at the moment the
     * write is performed; the index of the last byte written will be
     * <i>p</i>&nbsp;{@code +}&nbsp;<i>n</i>&nbsp;{@code -}&nbsp;{@code 1}.
     * Upon completion the buffer's position will be equal to
     * <i>p</i>&nbsp;{@code +}&nbsp;<i>n</i>; its limit will not have changed.
     *
     * <p> Buffers are not safe for use by multiple concurrent threads so care
     * should be taken to not access the buffer until the operation has
     * completed.
     *
     * <p> This method may be invoked at any time. Some channel types may not
     * allow more than one write to be outstanding at any given time. If a thread
     * initiates a write operation before a previous write operation has
     * completed then a {@link WritePendingException} will be thrown.
     *
     * @param   <A>
     *          The type of the attachment
     * @param   src
     *          The buffer from which bytes are to be retrieved
     * @param   attachment
     *          The object to attach to the I/O operation; can be {@code null}
     * @param   handler
     *          The completion handler object
     *
     * @throws  WritePendingException
     *          If the channel does not allow more than one write to be outstanding
     *          and a previous write has not completed
     * @throws  ShutdownChannelGroupException
     *          If the channel is associated with a {@link AsynchronousChannelGroup
     *          group} that has terminated
     */
    <A> void write(ByteBuffer src,
                   A attachment,
                   CompletionHandler<Integer,? super A> handler);

    /**
     * Writes a sequence of bytes to this channel from the given buffer.
     *
     * <p> This method initiates an asynchronous write operation to write a
     * sequence of bytes to this channel from the given buffer. The method
     * behaves in exactly the same manner as the {@link
     * #write(ByteBuffer,Object,CompletionHandler)
     * write(ByteBuffer,Object,CompletionHandler)} method except that instead
     * of specifying a completion handler, this method returns a {@code Future}
     * representing the pending result. The {@code Future}'s {@link Future#get()
     * get} method returns the number of bytes written.
     *
     * @param   src
     *          The buffer from which bytes are to be retrieved
     *
     * @return A Future representing the result of the operation
     *
     * @throws  WritePendingException
     *          If the channel does not allow more than one write to be outstanding
     *          and a previous write has not completed
     */
    Future<Integer> write(ByteBuffer src);
}

评论

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注