网络编程(四)AIO

AIO

实现

3.gif

服务器端实现

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.channels.AlreadyConnectedException;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.nio.channels.ConnectionPendingException;
import java.nio.channels.InterruptedByTimeoutException;
import java.nio.channels.ShutdownChannelGroupException;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import sun.misc.Unsafe;
import sun.nio.ch.Iocp.OverlappedChannel;
import sun.nio.ch.Iocp.ResultHandler;

class WindowsAsynchronousSocketChannelImpl extends AsynchronousSocketChannelImpl implements OverlappedChannel {
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static int addressSize;
    private static final int SIZEOF_WSABUF;
    private static final int OFFSETOF_LEN = 0;
    private static final int OFFSETOF_BUF;
    private static final int MAX_WSABUF = 16;
    private static final int SIZEOF_WSABUFARRAY;
    final long handle;
    private final Iocp iocp;
    private final int completionKey;
    private final PendingIoCache ioCache;
    private final long readBufferArray;
    private final long writeBufferArray;

    private static int dependsArch(int var0, int var1) {
        return addressSize == 4 ? var0 : var1;
    }

    WindowsAsynchronousSocketChannelImpl(Iocp var1, boolean var2) throws IOException {
        super(var1);
        long var3 = (long)IOUtil.fdVal(this.fd);
        int var5 = 0;

        try {
            var5 = var1.associate(this, var3);
        } catch (ShutdownChannelGroupException var7) {
            if (var2) {
                closesocket0(var3);
                throw var7;
            }
        } catch (IOException var8) {
            closesocket0(var3);
            throw var8;
        }

        this.handle = var3;
        this.iocp = var1;
        this.completionKey = var5;
        this.ioCache = new PendingIoCache();
        this.readBufferArray = unsafe.allocateMemory((long)SIZEOF_WSABUFARRAY);
        this.writeBufferArray = unsafe.allocateMemory((long)SIZEOF_WSABUFARRAY);
    }

    WindowsAsynchronousSocketChannelImpl(Iocp var1) throws IOException {
        this(var1, true);
    }

    public AsynchronousChannelGroupImpl group() {
        return this.iocp;
    }

    public <V, A> PendingFuture<V, A> getByOverlapped(long var1) {
        return this.ioCache.remove(var1);
    }

    long handle() {
        return this.handle;
    }

    void setConnected(InetSocketAddress var1, InetSocketAddress var2) {
        Object var3 = this.stateLock;
        synchronized(this.stateLock) {
            this.state = 2;
            this.localAddress = var1;
            this.remoteAddress = var2;
        }
    }

    void implClose() throws IOException {
        closesocket0(this.handle);
        this.ioCache.close();
        unsafe.freeMemory(this.readBufferArray);
        unsafe.freeMemory(this.writeBufferArray);
        if (this.completionKey != 0) {
            this.iocp.disassociate(this.completionKey);
        }

    }

    public void onCancel(PendingFuture<?, ?> var1) {
        if (var1.getContext() instanceof WindowsAsynchronousSocketChannelImpl.ConnectTask) {
            this.killConnect();
        }

        if (var1.getContext() instanceof WindowsAsynchronousSocketChannelImpl.ReadTask) {
            this.killReading();
        }

        if (var1.getContext() instanceof WindowsAsynchronousSocketChannelImpl.WriteTask) {
            this.killWriting();
        }

    }

    private void doPrivilegedBind(final SocketAddress var1) throws IOException {
        try {
            AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
                public Void run() throws IOException {
                    WindowsAsynchronousSocketChannelImpl.this.bind(var1);
                    return null;
                }
            });
        } catch (PrivilegedActionException var3) {
            throw (IOException)var3.getException();
        }
    }

    <A> Future<Void> implConnect(SocketAddress var1, A var2, CompletionHandler<Void, ? super A> var3) {
        if (!this.isOpen()) {
            ClosedChannelException var13 = new ClosedChannelException();
            if (var3 == null) {
                return CompletedFuture.withFailure(var13);
            } else {
                Invoker.invoke(this, var3, var2, (Object)null, var13);
                return null;
            }
        } else {
            InetSocketAddress var4 = Net.checkAddress(var1);
            SecurityManager var5 = System.getSecurityManager();
            if (var5 != null) {
                var5.checkConnect(var4.getAddress().getHostAddress(), var4.getPort());
            }

            IOException var6 = null;
            Object var7 = this.stateLock;
            synchronized(this.stateLock) {
                if (this.state == 2) {
                    throw new AlreadyConnectedException();
                }

                if (this.state == 1) {
                    throw new ConnectionPendingException();
                }

                if (this.localAddress == null) {
                    try {
                        InetSocketAddress var8 = new InetSocketAddress(0);
                        if (var5 == null) {
                            this.bind(var8);
                        } else {
                            this.doPrivilegedBind(var8);
                        }
                    } catch (IOException var11) {
                        var6 = var11;
                    }
                }

                if (var6 == null) {
                    this.state = 1;
                }
            }

            if (var6 != null) {
                try {
                    this.close();
                } catch (IOException var10) {
                    ;
                }

                if (var3 == null) {
                    return CompletedFuture.withFailure(var6);
                } else {
                    Invoker.invoke(this, var3, var2, (Object)null, var6);
                    return null;
                }
            } else {
                PendingFuture var14 = new PendingFuture(this, var3, var2);
                WindowsAsynchronousSocketChannelImpl.ConnectTask var15 = new WindowsAsynchronousSocketChannelImpl.ConnectTask(var4, var14);
                var14.setContext(var15);
                if (Iocp.supportsThreadAgnosticIo()) {
                    var15.run();
                } else {
                    Invoker.invokeOnThreadInThreadPool(this, var15);
                }

                return var14;
            }
        }
    }

    <V extends Number, A> Future<V> implRead(boolean var1, ByteBuffer var2, ByteBuffer[] var3, long var4, TimeUnit var6, A var7, CompletionHandler<V, ? super A> var8) {
        PendingFuture var9 = new PendingFuture(this, var8, var7);
        ByteBuffer[] var10;
        if (var1) {
            var10 = var3;
        } else {
            var10 = new ByteBuffer[]{var2};
        }

        final WindowsAsynchronousSocketChannelImpl.ReadTask var11 = new WindowsAsynchronousSocketChannelImpl.ReadTask(var10, var1, var9);
        var9.setContext(var11);
        if (var4 > 0L) {
            Future var12 = this.iocp.schedule(new Runnable() {
                public void run() {
                    var11.timeout();
                }
            }, var4, var6);
            var9.setTimeoutTask(var12);
        }

        if (Iocp.supportsThreadAgnosticIo()) {
            var11.run();
        } else {
            Invoker.invokeOnThreadInThreadPool(this, var11);
        }

        return var9;
    }

    <V extends Number, A> Future<V> implWrite(boolean var1, ByteBuffer var2, ByteBuffer[] var3, long var4, TimeUnit var6, A var7, CompletionHandler<V, ? super A> var8) {
        PendingFuture var9 = new PendingFuture(this, var8, var7);
        ByteBuffer[] var10;
        if (var1) {
            var10 = var3;
        } else {
            var10 = new ByteBuffer[]{var2};
        }

        final WindowsAsynchronousSocketChannelImpl.WriteTask var11 = new WindowsAsynchronousSocketChannelImpl.WriteTask(var10, var1, var9);
        var9.setContext(var11);
        if (var4 > 0L) {
            Future var12 = this.iocp.schedule(new Runnable() {
                public void run() {
                    var11.timeout();
                }
            }, var4, var6);
            var9.setTimeoutTask(var12);
        }

        if (Iocp.supportsThreadAgnosticIo()) {
            var11.run();
        } else {
            Invoker.invokeOnThreadInThreadPool(this, var11);
        }

        return var9;
    }

    private static native void initIDs();

    private static native int connect0(long var0, boolean var2, InetAddress var3, int var4, long var5) throws IOException;

    private static native void updateConnectContext(long var0) throws IOException;

    private static native int read0(long var0, int var2, long var3, long var5) throws IOException;

    private static native int write0(long var0, int var2, long var3, long var5) throws IOException;

    private static native void shutdown0(long var0, int var2) throws IOException;

    private static native void closesocket0(long var0) throws IOException;

    static {
        addressSize = unsafe.addressSize();
        SIZEOF_WSABUF = dependsArch(8, 16);
        OFFSETOF_BUF = dependsArch(4, 8);
        SIZEOF_WSABUFARRAY = 16 * SIZEOF_WSABUF;
        IOUtil.load();
        initIDs();
    }

    private class ConnectTask<A> implements Runnable, ResultHandler {
        private final InetSocketAddress remote;
        private final PendingFuture<Void, A> result;

        ConnectTask(InetSocketAddress var1, PendingFuture<Void, A> var2) {
            this.remote = var2;
            this.result = var3;
        }

        private void closeChannel() {
            try {
                WindowsAsynchronousSocketChannelImpl.this.close();
            } catch (IOException var2) {
                ;
            }

        }

        private IOException toIOException(Throwable var1) {
            if (var1 instanceof IOException) {
                if (var1 instanceof ClosedChannelException) {
                    var1 = new AsynchronousCloseException();
                }

                return (IOException)var1;
            } else {
                return new IOException((Throwable)var1);
            }
        }

        private void afterConnect() throws IOException {
            WindowsAsynchronousSocketChannelImpl.updateConnectContext(WindowsAsynchronousSocketChannelImpl.this.handle);
            Object var1 = WindowsAsynchronousSocketChannelImpl.this.stateLock;
            synchronized(WindowsAsynchronousSocketChannelImpl.this.stateLock) {
                WindowsAsynchronousSocketChannelImpl.this.state = 2;
                WindowsAsynchronousSocketChannelImpl.this.remoteAddress = this.remote;
            }
        }

        public void run() {
            long var1 = 0L;
            Throwable var3 = null;

            label87: {
                try {
                    WindowsAsynchronousSocketChannelImpl.this.begin();
                    PendingFuture var4 = this.result;
                    synchronized(this.result) {
                        var1 = WindowsAsynchronousSocketChannelImpl.this.ioCache.add(this.result);
                        int var5 = WindowsAsynchronousSocketChannelImpl.connect0(WindowsAsynchronousSocketChannelImpl.this.handle, Net.isIPv6Available(), this.remote.getAddress(), this.remote.getPort(), var1);
                        if (var5 != -2) {
                            this.afterConnect();
                            this.result.setResult((Object)null);
                            break label87;
                        }
                    }
                } catch (Throwable var12) {
                    if (var1 != 0L) {
                        WindowsAsynchronousSocketChannelImpl.this.ioCache.remove(var1);
                    }

                    var3 = var12;
                    break label87;
                } finally {
                    WindowsAsynchronousSocketChannelImpl.this.end();
                }

                return;
            }

            if (var3 != null) {
                this.closeChannel();
                this.result.setFailure(this.toIOException(var3));
            }

            Invoker.invoke(this.result);
        }

        public void completed(int var1, boolean var2) {
            Throwable var3 = null;

            try {
                WindowsAsynchronousSocketChannelImpl.this.begin();
                this.afterConnect();
                this.result.setResult((Object)null);
            } catch (Throwable var8) {
                var3 = var8;
            } finally {
                WindowsAsynchronousSocketChannelImpl.this.end();
            }

            if (var3 != null) {
                this.closeChannel();
                this.result.setFailure(this.toIOException(var3));
            }

            if (var2) {
                Invoker.invokeUnchecked(this.result);
            } else {
                Invoker.invoke(this.result);
            }

        }

        public void failed(int var1, IOException var2) {
            if (WindowsAsynchronousSocketChannelImpl.this.isOpen()) {
                this.closeChannel();
                this.result.setFailure(var2);
            } else {
                this.result.setFailure(new AsynchronousCloseException());
            }

            Invoker.invoke(this.result);
        }
    }

    private class ReadTask<V, A> implements Runnable, ResultHandler {
        private final ByteBuffer[] bufs;
        private final int numBufs;
        private final boolean scatteringRead;
        private final PendingFuture<V, A> result;
        private ByteBuffer[] shadow;

        ReadTask(ByteBuffer[] var1, boolean var2, PendingFuture<V, A> var3) {
            this.bufs = var2;
            this.numBufs = var2.length > 16 ? 16 : var2.length;
            this.scatteringRead = var3;
            this.result = var4;
        }

        void prepareBuffers() {
            this.shadow = new ByteBuffer[this.numBufs];
            long var1 = WindowsAsynchronousSocketChannelImpl.this.readBufferArray;

            for(int var3 = 0; var3 < this.numBufs; ++var3) {
                ByteBuffer var4 = this.bufs[var3];
                int var5 = var4.position();
                int var6 = var4.limit();

                assert var5 <= var6;

                int var7 = var5 <= var6 ? var6 - var5 : 0;
                long var8;
                if (!(var4 instanceof DirectBuffer)) {
                    ByteBuffer var10 = Util.getTemporaryDirectBuffer(var7);
                    this.shadow[var3] = var10;
                    var8 = ((DirectBuffer)var10).address();
                } else {
                    this.shadow[var3] = var4;
                    var8 = ((DirectBuffer)var4).address() + (long)var5;
                }

                WindowsAsynchronousSocketChannelImpl.unsafe.putAddress(var1 + (long)WindowsAsynchronousSocketChannelImpl.OFFSETOF_BUF, var8);
                WindowsAsynchronousSocketChannelImpl.unsafe.putInt(var1 + 0L, var7);
                var1 += (long)WindowsAsynchronousSocketChannelImpl.SIZEOF_WSABUF;
            }

        }

        void updateBuffers(int var1) {
            int var2;
            for(var2 = 0; var2 < this.numBufs; ++var2) {
                ByteBuffer var3 = this.shadow[var2];
                int var4 = var3.position();
                int var5 = var3.remaining();
                int var6;
                if (var1 < var5) {
                    if (var1 > 0) {
                        assert (long)(var4 + var1) < 2147483647L;

                        var6 = var4 + var1;

                        try {
                            var3.position(var6);
                        } catch (IllegalArgumentException var9) {
                            ;
                        }
                    }
                    break;
                }

                var1 -= var5;
                var6 = var4 + var5;

                try {
                    var3.position(var6);
                } catch (IllegalArgumentException var10) {
                    ;
                }
            }

            for(var2 = 0; var2 < this.numBufs; ++var2) {
                if (!(this.bufs[var2] instanceof DirectBuffer)) {
                    this.shadow[var2].flip();

                    try {
                        this.bufs[var2].put(this.shadow[var2]);
                    } catch (BufferOverflowException var8) {
                        ;
                    }
                }
            }

        }

        void releaseBuffers() {
            for(int var1 = 0; var1 < this.numBufs; ++var1) {
                if (!(this.bufs[var1] instanceof DirectBuffer)) {
                    Util.releaseTemporaryDirectBuffer(this.shadow[var1]);
                }
            }

        }

        public void run() {
            long var1 = 0L;
            boolean var3 = false;
            boolean var4 = false;

            label151: {
                try {
                    WindowsAsynchronousSocketChannelImpl.this.begin();
                    this.prepareBuffers();
                    var3 = true;
                    var1 = WindowsAsynchronousSocketChannelImpl.this.ioCache.add(this.result);
                    int var11 = WindowsAsynchronousSocketChannelImpl.read0(WindowsAsynchronousSocketChannelImpl.this.handle, this.numBufs, WindowsAsynchronousSocketChannelImpl.this.readBufferArray, var1);
                    if (var11 != -2) {
                        if (var11 != -1) {
                            throw new InternalError("Read completed immediately");
                        }

                        WindowsAsynchronousSocketChannelImpl.this.enableReading();
                        if (this.scatteringRead) {
                            this.result.setResult(-1L);
                        } else {
                            this.result.setResult(-1);
                        }
                        break label151;
                    }

                    var4 = true;
                } catch (Throwable var9) {
                    Object var5 = var9;
                    WindowsAsynchronousSocketChannelImpl.this.enableReading();
                    if (var9 instanceof ClosedChannelException) {
                        var5 = new AsynchronousCloseException();
                    }

                    if (!(var5 instanceof IOException)) {
                        var5 = new IOException((Throwable)var5);
                    }

                    this.result.setFailure((Throwable)var5);
                    break label151;
                } finally {
                    if (!var4) {
                        if (var1 != 0L) {
                            WindowsAsynchronousSocketChannelImpl.this.ioCache.remove(var1);
                        }

                        if (var3) {
                            this.releaseBuffers();
                        }
                    }

                    WindowsAsynchronousSocketChannelImpl.this.end();
                }

                return;
            }

            Invoker.invoke(this.result);
        }

        public void completed(int var1, boolean var2) {
            if (var1 == 0) {
                var1 = -1;
            } else {
                this.updateBuffers(var1);
            }

            this.releaseBuffers();
            PendingFuture var3 = this.result;
            synchronized(this.result) {
                if (this.result.isDone()) {
                    return;
                }

                WindowsAsynchronousSocketChannelImpl.this.enableReading();
                if (this.scatteringRead) {
                    this.result.setResult((long)var1);
                } else {
                    this.result.setResult(var1);
                }
            }

            if (var2) {
                Invoker.invokeUnchecked(this.result);
            } else {
                Invoker.invoke(this.result);
            }

        }

        public void failed(int var1, IOException var2) {
            this.releaseBuffers();
            if (!WindowsAsynchronousSocketChannelImpl.this.isOpen()) {
                var2 = new AsynchronousCloseException();
            }

            PendingFuture var3 = this.result;
            synchronized(this.result) {
                if (this.result.isDone()) {
                    return;
                }

                WindowsAsynchronousSocketChannelImpl.this.enableReading();
                this.result.setFailure((Throwable)var2);
            }

            Invoker.invoke(this.result);
        }

        void timeout() {
            PendingFuture var1 = this.result;
            synchronized(this.result) {
                if (this.result.isDone()) {
                    return;
                }

                WindowsAsynchronousSocketChannelImpl.this.enableReading(true);
                this.result.setFailure(new InterruptedByTimeoutException());
            }

            Invoker.invoke(this.result);
        }
    }

    private class WriteTask<V, A> implements Runnable, ResultHandler {
        private final ByteBuffer[] bufs;
        private final int numBufs;
        private final boolean gatheringWrite;
        private final PendingFuture<V, A> result;
        private ByteBuffer[] shadow;

        WriteTask(ByteBuffer[] var1, boolean var2, PendingFuture<V, A> var3) {
            this.bufs = var2;
            this.numBufs = var2.length > 16 ? 16 : var2.length;
            this.gatheringWrite = var3;
            this.result = var4;
        }

        void prepareBuffers() {
            this.shadow = new ByteBuffer[this.numBufs];
            long var1 = WindowsAsynchronousSocketChannelImpl.this.writeBufferArray;

            for(int var3 = 0; var3 < this.numBufs; ++var3) {
                ByteBuffer var4 = this.bufs[var3];
                int var5 = var4.position();
                int var6 = var4.limit();

                assert var5 <= var6;

                int var7 = var5 <= var6 ? var6 - var5 : 0;
                long var8;
                if (!(var4 instanceof DirectBuffer)) {
                    ByteBuffer var10 = Util.getTemporaryDirectBuffer(var7);
                    var10.put(var4);
                    var10.flip();
                    var4.position(var5);
                    this.shadow[var3] = var10;
                    var8 = ((DirectBuffer)var10).address();
                } else {
                    this.shadow[var3] = var4;
                    var8 = ((DirectBuffer)var4).address() + (long)var5;
                }

                WindowsAsynchronousSocketChannelImpl.unsafe.putAddress(var1 + (long)WindowsAsynchronousSocketChannelImpl.OFFSETOF_BUF, var8);
                WindowsAsynchronousSocketChannelImpl.unsafe.putInt(var1 + 0L, var7);
                var1 += (long)WindowsAsynchronousSocketChannelImpl.SIZEOF_WSABUF;
            }

        }

        void updateBuffers(int var1) {
            for(int var2 = 0; var2 < this.numBufs; ++var2) {
                ByteBuffer var3 = this.bufs[var2];
                int var4 = var3.position();
                int var5 = var3.limit();
                int var6 = var4 <= var5 ? var5 - var4 : var5;
                int var7;
                if (var1 < var6) {
                    if (var1 > 0) {
                        assert (long)(var4 + var1) < 2147483647L;

                        var7 = var4 + var1;

                        try {
                            var3.position(var7);
                        } catch (IllegalArgumentException var9) {
                            ;
                        }
                    }
                    break;
                }

                var1 -= var6;
                var7 = var4 + var6;

                try {
                    var3.position(var7);
                } catch (IllegalArgumentException var10) {
                    ;
                }
            }

        }

        void releaseBuffers() {
            for(int var1 = 0; var1 < this.numBufs; ++var1) {
                if (!(this.bufs[var1] instanceof DirectBuffer)) {
                    Util.releaseTemporaryDirectBuffer(this.shadow[var1]);
                }
            }

        }

        public void run() {
            long var1 = 0L;
            boolean var3 = false;
            boolean var4 = false;
            boolean var5 = false;

            try {
                Object var6;
                try {
                    WindowsAsynchronousSocketChannelImpl.this.begin();
                    this.prepareBuffers();
                    var3 = true;
                    var1 = WindowsAsynchronousSocketChannelImpl.this.ioCache.add(this.result);
                    int var12 = WindowsAsynchronousSocketChannelImpl.write0(WindowsAsynchronousSocketChannelImpl.this.handle, this.numBufs, WindowsAsynchronousSocketChannelImpl.this.writeBufferArray, var1);
                    if (var12 != -2) {
                        if (var12 == -1) {
                            var5 = true;
                            throw new ClosedChannelException();
                        }

                        throw new InternalError("Write completed immediately");
                    }

                    var4 = true;
                    return;
                } catch (Throwable var10) {
                    var6 = var10;
                    WindowsAsynchronousSocketChannelImpl.this.enableWriting();
                    if (!var5 && var10 instanceof ClosedChannelException) {
                        var6 = new AsynchronousCloseException();
                    }
                }

                if (!(var6 instanceof IOException)) {
                    var6 = new IOException((Throwable)var6);
                }

                this.result.setFailure((Throwable)var6);
            } finally {
                if (!var4) {
                    if (var1 != 0L) {
                        WindowsAsynchronousSocketChannelImpl.this.ioCache.remove(var1);
                    }

                    if (var3) {
                        this.releaseBuffers();
                    }
                }

                WindowsAsynchronousSocketChannelImpl.this.end();
            }

            Invoker.invoke(this.result);
        }

        public void completed(int var1, boolean var2) {
            this.updateBuffers(var1);
            this.releaseBuffers();
            PendingFuture var3 = this.result;
            synchronized(this.result) {
                if (this.result.isDone()) {
                    return;
                }

                WindowsAsynchronousSocketChannelImpl.this.enableWriting();
                if (this.gatheringWrite) {
                    this.result.setResult((long)var1);
                } else {
                    this.result.setResult(var1);
                }
            }

            if (var2) {
                Invoker.invokeUnchecked(this.result);
            } else {
                Invoker.invoke(this.result);
            }

        }

        public void failed(int var1, IOException var2) {
            this.releaseBuffers();
            if (!WindowsAsynchronousSocketChannelImpl.this.isOpen()) {
                var2 = new AsynchronousCloseException();
            }

            PendingFuture var3 = this.result;
            synchronized(this.result) {
                if (this.result.isDone()) {
                    return;
                }

                WindowsAsynchronousSocketChannelImpl.this.enableWriting();
                this.result.setFailure((Throwable)var2);
            }

            Invoker.invoke(this.result);
        }

        void timeout() {
            PendingFuture var1 = this.result;
            synchronized(this.result) {
                if (this.result.isDone()) {
                    return;
                }

                WindowsAsynchronousSocketChannelImpl.this.enableWriting(true);
                this.result.setFailure(new InterruptedByTimeoutException());
            }

            Invoker.invoke(this.result);
        }
    }
}

客户端实现

import cn.mldn.info.HostInfo;
import cn.mldn.util.InputUtil;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

class ClientReadHandler implements CompletionHandler<Integer,ByteBuffer> {
    private AsynchronousSocketChannel clientChannel ;
    private CountDownLatch latch ;
    public ClientReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
        this.clientChannel = clientChannel ;
        this.latch = latch ;
    }
    @Override
    public void completed(Integer result, ByteBuffer buffer) {
        buffer.flip() ;
        String readMessage = new String(buffer.array(),0,buffer.remaining()) ;
        System.out.println(readMessage); // 输出读取内容
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        try {
            this.clientChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        this.latch.countDown();
    }
}

class ClientWriteHandler implements CompletionHandler<Integer,ByteBuffer> {
    private AsynchronousSocketChannel clientChannel ;
    private CountDownLatch latch ;
    public ClientWriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
        this.clientChannel = clientChannel ;
        this.latch = latch ;
    }

    @Override
    public void completed(Integer result, ByteBuffer buffer) {
        if(buffer.hasRemaining()) {
            this.clientChannel.write(buffer,buffer,this);
        } else {
            ByteBuffer readBuffer = ByteBuffer.allocate(100) ; // 读取服务端回应
            this.clientChannel.read(readBuffer,readBuffer,new ClientReadHandler(this.clientChannel,this.latch)) ;
        }
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        try {
            this.clientChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        this.latch.countDown();
    }
}

class AIOClientThread implements Runnable {
    private AsynchronousSocketChannel clientChannel;
    private CountDownLatch latch;

    public AIOClientThread() {
        try {
            this.clientChannel = AsynchronousSocketChannel.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
        this.clientChannel.connect(new InetSocketAddress(HostInfo.HOST_NAME, HostInfo.PORT));
        this.latch = new CountDownLatch(1);
    }

    @Override
    public void run() {
        try {
            this.latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public boolean sendMessage(String msg) {
        ByteBuffer buffer = ByteBuffer.allocate(100) ;
        buffer.put(msg.getBytes()) ;
        buffer.flip() ;
        this.clientChannel.write(buffer,buffer,new ClientWriteHandler(this.clientChannel,this.latch));
        if("byebye".equalsIgnoreCase(msg)) {
            return false ;
        }
        return true ;
    }
}

public class AIOEchoClient {
    public static void main(String[] args) {
        AIOClientThread client = new AIOClientThread() ;
        new Thread(client).start();
        while(client.sendMessage(InputUtil.getString("请输入要发送的内容:"))) {
            ;
        }
    }
}

上一篇 网络编程(三)NIO
下一篇 网络编程(五)netty