本文共 20285 字,大约阅读时间需要 67 分钟。
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | public class EchoServer { private final ServerSocket mServerSocket; public EchoServer( int port) throws IOException { // 1. 创建一个 ServerSocket 并监听端口 port mServerSocket = new ServerSocket(port); } public void run() throws IOException { // 2. 开始接受客户连接 Socket client = mServerSocket.accept(); handleClient(client); } private void handleClient(Socket socket) { // 3. 使用 socket 进行通信 ... } public static void main(String[] argv) { try { EchoServer server = new EchoServer( 9877 ); server.run(); } catch (IOException e) { e.printStackTrace(); } } } |
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | public class EchoClient { private final Socket mSocket; public EchoClient(String host, int port) throws IOException { // 创建 socket 并连接服务器 mSocket = new Socket(host, port); } public void run() { // 和服务端进行通信 } public static void main(String[] argv) { try { // 由于服务端运行在同一主机,这里我们使用 localhost EchoClient client = new EchoClient( "localhost" , 9877 ); client.run(); } catch (IOException e) { e.printStackTrace(); } } } |
01 02 03 04 05 06 07 08 09 10 11 12 13 | public class EchoServer { // ... private void handleClient(Socket socket) throws IOException { InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream(); byte [] buffer = new byte [ 1024 ]; int n; while ((n = in.read(buffer)) > 0 ) { out.write(buffer, 0 , n); } } } |
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | public class EchoClient { // ... public void run() throws IOException { Thread readerThread = new Thread( this ::readResponse); readerThread.start(); OutputStream out = mSocket.getOutputStream(); byte [] buffer = new byte [ 1024 ]; int n; while ((n = System.in.read(buffer)) > 0 ) { out.write(buffer, 0 , n); } } private void readResponse() { try { InputStream in = mSocket.getInputStream(); byte [] buffer = new byte [ 1024 ]; int n; while ((n = in.read(buffer)) > 0 ) { System.out.write(buffer, 0 , n); } } catch (IOException e) { e.printStackTrace(); } } } |
1 2 3 4 5 6 | Thread readerThread = new Thread( new Runnable() { @Override public void run() { readResponse(); } }); |
1 2 3 4 5 6 7 8 9 | $ javac EchoServer.java $ java EchoServer $ javac EchoClient.java $ java EchoClient hello Server hello Server foo foo |
在 Java 的 SDK 中,socket 的共有两个接口:用于监听客户连接的 ServerSocket 和用于通信的 Socket。
注意:我只说 ServerSocket 是用于监听客户连接,而没有说它也可以用来通信。下面我们来详细了解一下他们的区别。注:以下描述使用的是 UNIX/Linux 系统的 API。首先,我们创建 ServerSocket 后,内核会创建一个 socket。这个 socket 既可以拿来监听客户连接,也可以连接远端的服务。由于 ServerSocket 是用来监听客户连接的,紧接着它就会对内核创建的这个 socket 调用 listen 函数。这样一来,这个 socket 就成了所谓的 listening socket,它开始监听客户的连接。接下来,我们的客户端创建一个 Socket,同样的,内核也创建一个 socket 实例。内核创建的这个 socket 跟 ServerSocket 一开始创建的那个没有什么区别。不同的是,接下来 Socket 会对它执行 connect,发起对服务端的连接。前面我们说过,socket API 其实是 TCP 层的封装,所以 connect 后,内核会发送一个 SYN 给服务端。现在,我们切换角色到服务端。服务端的主机在收到这个 SYN 后,会创建一个新的 socket,这个新创建的 socket 跟客户端继续执行三次握手过程。三次握手完成后,我们执行的 serverSocket.accept() 会返回一个 Socket 实例,这个 socket 就是上一步内核自动帮我们创建的。所以说:在一个客户端连接的情况下,其实有 3 个 socket。关于内核自动创建的这个 socket,还有一个很有意思的地方。它的端口号跟 ServerSocket 是一毛一样的。咦!!不是说,一个端口只能绑定一个 socket 吗?其实这个说法并不够准确。前面我说的TCP 通过端口号来区分数据属于哪个进程的说法,在 socket 的实现里需要改一改。Socket 并不仅仅使用端口号来区别不同的 socket 实例,而是使用 <peer addr:peer port, local addr:local port> 这个四元组。在上面的例子中,我们的 ServerSocket 长这样:<*:*, *:9877>。意思是,可以接受任何的客户端,和本地任何 IP。accept 返回的 Socket 则是这样:<127.0.0.1:xxxx, 127.0.0.1:9877>。其中,xxxx 是客户端的端口号。如果数据是发送给一个已连接的 socket,内核会找到一个完全匹配的实例,所以数据准确发送给了对端。如果是客户端要发起连接,这时候只有 <*:*, *:9877> 会匹配成功,所以 SYN 也准确发送给了监听套接字。Socket/ServerSocket 的区别我们就讲到这里。如果读者觉得不过瘾,可以参考《TCP/IP 详解》、卷2。
1 | socket.setKeepAlive( true ); |
1 2 3 4 5 | { "type" : 0, // 0 表示心跳 // ... } |
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | public final class LongLiveSocket { /** * 错误回调 */ public interface ErrorCallback { /** * 如果需要重连,返回 true */ boolean onError(); } /** * 读数据回调 */ public interface DataCallback { void onData( byte [] data, int offset, int len); } /** * 写数据回调 */ public interface WritingCallback { void onSuccess(); void onFail( byte [] data, int offset, int len); } public LongLiveSocket(String host, int port, DataCallback dataCallback, ErrorCallback errorCallback) { } public void write( byte [] data, WritingCallback callback) { } public void write( byte [] data, int offset, int len, WritingCallback callback) { } public void close() { } } |
001 002 003 004 005 006 007 008 009 010 011 012 013 014 015 016 017 018 019 020 021 022 023 024 025 026 027 028 029 030 031 032 033 034 035 036 037 038 039 040 041 042 043 044 045 046 047 048 049 050 051 052 053 054 055 056 057 058 059 060 061 062 063 064 065 066 067 068 069 070 071 072 073 074 075 076 077 078 079 080 081 082 083 084 085 086 087 088 089 090 091 092 093 094 095 096 097 098 099 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 | public final class LongLiveSocket { private static final String TAG = "LongLiveSocket" ; private static final long RETRY_INTERVAL_MILLIS = 3 * 1000 ; private static final long HEART_BEAT_INTERVAL_MILLIS = 5 * 1000 ; private static final long HEART_BEAT_TIMEOUT_MILLIS = 2 * 1000 ; /** * 错误回调 */ public interface ErrorCallback { /** * 如果需要重连,返回 true */ boolean onError(); } /** * 读数据回调 */ public interface DataCallback { void onData( byte [] data, int offset, int len); } /** * 写数据回调 */ public interface WritingCallback { void onSuccess(); void onFail( byte [] data, int offset, int len); } private final String mHost; private final int mPort; private final DataCallback mDataCallback; private final ErrorCallback mErrorCallback; private final HandlerThread mWriterThread; private final Handler mWriterHandler; private final Handler mUIHandler = new Handler(Looper.getMainLooper()); private final Object mLock = new Object(); private Socket mSocket; // guarded by mLock private boolean mClosed; // guarded by mLock private final Runnable mHeartBeatTask = new Runnable() { private byte [] mHeartBeat = new byte [ 0 ]; @Override public void run() { // 我们使用长度为 0 的数据作为 heart beat write(mHeartBeat, new WritingCallback() { @Override public void onSuccess() { // 每隔 HEART_BEAT_INTERVAL_MILLIS 发送一次 mWriterHandler.postDelayed(mHeartBeatTask, HEART_BEAT_INTERVAL_MILLIS); mUIHandler.postDelayed(mHeartBeatTimeoutTask, HEART_BEAT_TIMEOUT_MILLIS); } @Override public void onFail( byte [] data, int offset, int len) { // nop // write() 方法会处理失败 } }); } }; private final Runnable mHeartBeatTimeoutTask = () -> { Log.e(TAG, "mHeartBeatTimeoutTask#run: heart beat timeout" ); closeSocket(); }; public LongLiveSocket(String host, int port, DataCallback dataCallback, ErrorCallback errorCallback) { mHost = host; mPort = port; mDataCallback = dataCallback; mErrorCallback = errorCallback; mWriterThread = new HandlerThread( "socket-writer" ); mWriterThread.start(); mWriterHandler = new Handler(mWriterThread.getLooper()); mWriterHandler.post( this ::initSocket); } private void initSocket() { while ( true ) { if (closed()) return ; try { Socket socket = new Socket(mHost, mPort); synchronized (mLock) { // 在我们创建 socket 的时候,客户可能就调用了 close() if (mClosed) { silentlyClose(socket); return ; } mSocket = socket; // 每次创建新的 socket,会开一个线程来读数据 Thread reader = new Thread( new ReaderTask(socket), "socket-reader" ); reader.start(); mWriterHandler.post(mHeartBeatTask); } break ; } catch (IOException e) { Log.e(TAG, "initSocket: " , e); if (closed() || !mErrorCallback.onError()) { break ; } try { TimeUnit.MILLISECONDS.sleep(RETRY_INTERVAL_MILLIS); } catch (InterruptedException e1) { // interrupt writer-thread to quit break ; } } } } public void write( byte [] data, WritingCallback callback) { write(data, 0 , data.length, callback); } public void write( byte [] data, int offset, int len, WritingCallback callback) { mWriterHandler.post(() -> { Socket socket = getSocket(); if (socket == null ) { // initSocket 失败而客户说不需要重连,但客户又叫我们给他发送数据 throw new IllegalStateException( "Socket not initialized" ); } try { OutputStream outputStream = socket.getOutputStream(); DataOutputStream out = new DataOutputStream(outputStream); out.writeInt(len); out.write(data, offset, len); callback.onSuccess(); } catch (IOException e) { Log.e(TAG, "write: " , e); closeSocket(); callback.onFail(data, offset, len); if (!closed() && mErrorCallback.onError()) { initSocket(); } } }); } private boolean closed() { synchronized (mLock) { return mClosed; } } private Socket getSocket() { synchronized (mLock) { return mSocket; } } private void closeSocket() { synchronized (mLock) { closeSocketLocked(); } } private void closeSocketLocked() { if (mSocket == null ) return ; silentlyClose(mSocket); mSocket = null ; mWriterHandler.removeCallbacks(mHeartBeatTask); } public void close() { if (Looper.getMainLooper() == Looper.myLooper()) { new Thread() { @Override public void run() { doClose(); } }.start(); } else { doClose(); } } private void doClose() { synchronized (mLock) { mClosed = true ; // 关闭 socket,从而使得阻塞在 socket 上的线程返回 closeSocketLocked(); } mWriterThread.quit(); // 在重连的时候,有个 sleep mWriterThread.interrupt(); } private static void silentlyClose(Closeable closeable) { if (closeable != null ) { try { closeable.close(); } catch (IOException e) { Log.e(TAG, "silentlyClose: " , e); // error ignored } } } private class ReaderTask implements Runnable { private final Socket mSocket; public ReaderTask(Socket socket) { mSocket = socket; } @Override public void run() { try { readResponse(); } catch (IOException e) { Log.e(TAG, "ReaderTask#run: " , e); } } private void readResponse() throws IOException { // For simplicity, assume that a msg will not exceed 1024-byte byte [] buffer = new byte [ 1024 ]; InputStream inputStream = mSocket.getInputStream(); DataInputStream in = new DataInputStream(inputStream); while ( true ) { int nbyte = in.readInt(); if (nbyte == 0 ) { Log.i(TAG, "readResponse: heart beat received" ); mUIHandler.removeCallbacks(mHeartBeatTimeoutTask); continue ; } if (nbyte > buffer.length) { throw new IllegalStateException( "Receive message with len " + nbyte + " which exceeds limit " + buffer.length); } if (readn(in, buffer, nbyte) != 0 ) { // Socket might be closed twice but it does no harm silentlyClose(mSocket); // Socket will be re-connected by writer-thread if you want break ; } mDataCallback.onData(buffer, 0 , nbyte); } } private int readn(InputStream in, byte [] buffer, int n) throws IOException { int offset = 0 ; while (n > 0 ) { int readBytes = in.read(buffer, offset, n); if (readBytes < 0 ) { // EoF break ; } n -= readBytes; offset += readBytes; } return n; } } } |
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | public class EchoClient { private static final String TAG = "EchoClient" ; private final LongLiveSocket mLongLiveSocket; public EchoClient(String host, int port) { mLongLiveSocket = new LongLiveSocket( host, port, (data, offset, len) -> Log.i(TAG, "EchoClient: received: " + new String(data, offset, len)), // 返回 true,所以只要出错,就会一直重连 () -> true ); } public void send(String msg) { mLongLiveSocket.write(msg.getBytes(), new LongLiveSocket.WritingCallback() { @Override public void onSuccess() { Log.d(TAG, "onSuccess: " ); } @Override public void onFail( byte [] data, int offset, int len) { Log.w(TAG, "onFail: fail to write: " + new String(data, offset, len)); // 连接成功后,还会发送这个消息 mLongLiveSocket.write(data, offset, len, this ); } }); } } |
01 02 03 04 05 06 07 08 09 10 11 | 03:54:55.583 12691-12713 /com .example. echo I /LongLiveSocket : readResponse: heart beat received 03:55:00.588 12691-12713 /com .example. echo I /LongLiveSocket : readResponse: heart beat received 03:55:05.594 12691-12713 /com .example. echo I /LongLiveSocket : readResponse: heart beat received 03:55:09.638 12691-12710 /com .example. echo D /EchoClient : onSuccess: 03:55:09.639 12691-12713 /com .example. echo I /EchoClient : EchoClient: received: hello 03:55:10.595 12691-12713 /com .example. echo I /LongLiveSocket : readResponse: heart beat received 03:55:14.652 12691-12710 /com .example. echo D /EchoClient : onSuccess: 03:55:14.654 12691-12713 /com .example. echo I /EchoClient : EchoClient: received: echo 03:55:15.596 12691-12713 /com .example. echo I /LongLiveSocket : readResponse: heart beat received 03:55:20.597 12691-12713 /com .example. echo I /LongLiveSocket : readResponse: heart beat received 03:55:25.602 12691-12713 /com .example. echo I /LongLiveSocket : readResponse: heart beat received |
如果数据比较多,可以使用一个最小堆。
转载地址:http://oyemi.baihongyu.com/