socket 相关内容前面现已讲解了,还写到了一些 Service 内容,上一篇博客还将央求类和回复类规划好了,接下来我们需求规划的就是通讯模块的 socket 收发了,这儿写在 Service 里面。之所以写在 Service 里面,是因为我这和设备的联接是一个长期的使命,接收和发送线程存在时刻特别久,应该写在 Service 里面。

待处理问题

前面现已将 Service 内联接 socket 和接收数据的部分大致写了,剩下的问题如下:

  • Service 的联接

  • 参数的设置

  • 央求类的发送

  • 回复类、失常的传递

下面我们一个一个的处理上面的问题。

Service 的联接

Service 的联接很简单,四大组件嘛,关键是我们 Binder 里面该规划哪些函数,代码如下:

private ConnectBinder mBinder = new ConnectBinder();
class ConnectBinder extends Binder {
    //设置参数
    void setUpSocketConfigure(Bundle data) {
		...
    }
    //发送央求
    void sendMessage(BaseRequest request) {
		...
    }
    //回来 Service 方针,用于双向交互
    ConnectService getService() {
        return ConnectService.this;
    }
}
@Override
public IBinder onBind(Intent intent) {
    return mBinder;
}

设置参数和发送央求是我们必须的操作,很好了解,而回来 Service 方针是用于主线程和 Service 的双向通讯,需求拿到 Service 设置我们的回调接口。(ps. 原本我用的广播将数据传递出去,但是觉得不可典雅,运用广播的话可以不用获取 Service 方针)

参数的设置

参数的设置也简单,上面 setUpSocketConfigure 通过 Bundle 现已把数据传递进来了,我们设置到全局变量就行了:

private void getConfigure(Bundle data) {
    ip      = data.getString("IP", "192.168.1.110");
    port    = data.getInt("PORT", 2050);
    wait    = data.getInt("WAIT", 5000);
}

央求类的发送

接下来是央求类的发送,也就是我们这个 socket 通讯模块的发送部分,这儿需求注意下我们的发送央求要在异步线程处理,而且央求需求串行发送,socket 只有一个嘛。

    private Socket mSocket;
	//接收线程
    private Thread rxThread;
    //单线程线程池
    private ExecutorService mExecutor = Executors.newSingleThreadExecutor();
    class ConnectBinder extends Binder {
		...
        void sendMessage(BaseRequest request) {
			//将央求封装到 Runnable 中
            Runnable runnable = wrapperRunnable(request);
            //运用单线程线程池发送
            mExecutor.execute(runnable);
        }
        ...
    }
	//封装到 Runnable 中
    private Runnable wrapperRunnable(final BaseRequest request) {
        return new Runnable() {
            @Override
            public void run() {
                try {
                    doSent(request.data);
                } catch (IOException e) {
                    e.printStackTrace();
                    //将 Error 传递出去
                    onPostMessageListener.onPostError(request.requestMsgType);
                }
            }
        };
    }
    //发送信息
    public void doSent(byte[] data) throws IOException {
        if (mSocket == null) {
            setUpConnection();
        }
        //写入内容
        writeDataToStream(data);
    }
	//写入内容
    private void writeDataToStream(byte[] data) throws IOException {
        try {
            OutputStream os = mSocket.getOutputStream();
            os.write(data);
            os.flush();
        } catch(IOException e) {
            setUpConnection();
            //再发一次
            reSendData(data);
        }
    }
	//再发一次
    private void reSendData(byte[] data) throws IOException {
        if (mSocket != null) {
            try {
                OutputStream os = mSocket.getOutputStream();
                os.write(data);
                os.flush();
            } catch(IOException e) {
                e.printStackTrace();
                throw e;
            }
        }
    }

这儿代码多了点,但是不难,大致就是把央求封装成 Runnable,通过单线程线程池发送出去,保证了串行发送,而且发送部分加了许多查验代码,终究发送失败还会再发一次。关于发送失败传递问题,下面讲解。

回复类、失常的传递

因为我们的代码写在 Service 里面,如何将消息(回复及失常)传递出去是一个问题,下面介绍几种办法。

本地广播

广播是安卓供应的一种通讯办法,而本地广播可以保证广播的消息在本应用中传递,传递的过程中运用 Bundle 携带数据,通过 Reciever 在需求运用的当地,比对消息类型,将数据拿出来便可以,而且广播现已完成了从异步线程到 UI 线程的转化,用起来仍是挺便利的。

我原本就是运用本地广播传递数据的,后边不用了。一个问题是广播功用并不是很好,另一个问题是拿数据不是很便利,我希望的是通过回调去拿到数据,下面看我的改进办法。

回调传递消息

先看代码,这儿触及了一个回调接口

private OnPostMessageListener onPostMessageListener;
public void setOnPostMessageListener(OnPostMessageListener onPostMessageListener) {
    this.onPostMessageListener = onPostMessageListener;
}
public interface OnPostMessageListener {
    void onPostError(int typeId);
    void onPostResponse(BaseResponse response);
}

下面是设置回调接口,需求从 Service 外部设置

    class ConnectBinder extends Binder {
		...
        ConnectService getService() {
            return ConnectService.this;
        }
    }
	//在外部设置
	private ServiceConnection connection = new ServiceConnection() {
        @Override
        public void onServiceConnected(ComponentName componentName, IBinder iBinder) {
            mBinder = (ConnectService.ConnectBinder) iBinder;
            mBinder.getService().setOnPostMessageListener(new ConnectService.OnPostMessageListener() {
                @Override
                public void onPostError(int typeId) {
                    ...
                }
                @Override
                public void onPostResponse(BaseResponse response) {
                    ...
                }
            });
        }
        @Override
        public void onServiceDisconnected(ComponentName componentName) {
        }
    };

失常的传递

private Runnable wrapperRunnable(final BaseRequest request) {
    return new Runnable() {
        @Override
        public void run() {
            try {
                doSent(request.data);
            } catch (IOException e) {
                e.printStackTrace();
                onPostMessageListener.onPostError(request.requestMsgType);
            }
        }
    };
}

回复的传递

//接受线程
private Thread initReceiveThread() {
    return new Thread(new Runnable() {
        @SuppressWarnings("ResultOfMethodCallIgnored")
        @Override
        public void run() {
            try {
                InputStream is;
                while (mSocket != null) {
                    is = mSocket.getInputStream();
                    byte[] ackbuf    = new byte[4];  //消息号
                    byte[] serialbuf = new byte[4];  //流水号
                    byte[] lenbuf    = new byte[4];  //长度
                    byte[] datasbuf;                 //数据
                    int ackrs       = is.read(ackbuf, 0, 4);
                    if (ackrs < 0) {
                        Thread.sleep(1000);
                        continue;
                    }
                    is.read(serialbuf, 0, 4);
                    is.read(lenbuf, 0, 4);
                    int len = ByteBuffer.wrap(lenbuf, 0, 4).getInt();        //长度
                    datasbuf = new byte[len];
                    is.read(datasbuf, 0, len);
                    BaseResponse response = new BaseResponse();
                    response.responseMsgType = ByteBuffer.wrap(ackbuf, 0, 4).getInt();
                    response.serialNumber = ByteBuffer.wrap(serialbuf, 0, 4).getInt();
                    response.data = datasbuf;
                    onPostMessageListener.onPostResponse(response);
                }
            }catch(IOException e) {
                e.printStackTrace();
            } catch(Exception e) {
                e.printStackTrace();
            }
        }
    });
}

这儿的回复类消息处理时按字节读取的,和业务有关,读者可以按需求批改。

掉线重连问题

这儿还有一个掉线重连问题,就是 socket 断了,但是我发送消息的时分,希望测验从头联接,联接上了再见消息发送出去。

实际上,我们上面的代码在发送消息之前现已检查 socket 了,掉线了会从头联接。但是有的设备联接需求我们发送登录央求,这个就有点耦合性滋味了,

思来想去,联接央求不就是一串字节数据嘛,在设置参数里面传进来,后边直接用不就可以嘛,具体操作看下面代码:

	private byte[]  connect = new byte[0];
    private void getConfigure(Bundle data) {
        ip      = data.getString("IP", "192.168.1.110");
        port    = data.getInt("PORT", 2050);
        wait    = data.getInt("WAIT", 5000);
        connect = data.getByteArray("ConnectData");
    }
    //发送信息
    public void doSent(byte[] data) throws IOException {
        if (mSocket == null) {
            setUpConnection();
            writeDataToStream(connect);
        }
        writeDataToStream(data);
    }
    private void writeDataToStream(byte[] data) throws IOException {
        try {
            OutputStream os = mSocket.getOutputStream();
            os.write(data);
            os.flush();
        } catch(IOException e) {
            setUpConnection();
            writeDataToStream(connect);
            reSendData(data);
        }
    }

上面代码就是将登录消息以字节数组的方式传递进来,在发送和重发消息的初始化 socket 之后发送一次。

这儿虽然还有一点耦合性滋味,但是假如你没有登录央求的需求,字节数组长度为零,底子不会对整个流程形成什么影响。

完好代码

public class ConnectService extends Service {
    /** Socket相关,IP及端口号 **/
    private String  ip      = "192.168.1.110";
    private int     port    = 2050;
    private int     wait    = 5000;
    private byte[]  connect = new byte[0];
    private Socket mSocket;
    private Thread rxThread;
    private ExecutorService mExecutor = Executors.newSingleThreadExecutor();
    private ConnectBinder mBinder = new ConnectBinder();
    class ConnectBinder extends Binder {
        void setUpSocketConfigure(Bundle data) {
            getConfigure(data);
        }
        void sendMessage(BaseRequest request) {
            Runnable runnable = wrapperRunnable(request);
            mExecutor.execute(runnable);
        }
        ConnectService getService() {
            return ConnectService.this;
        }
    }
    @Override
    public IBinder onBind(Intent intent) {
        return mBinder;
    }
    private void getConfigure(Bundle data) {
        ip      = data.getString("IP", "192.168.1.110");
        port    = data.getInt("PORT", 2050);
        wait    = data.getInt("WAIT", 5000);
        connect = data.getByteArray("ConnectData");
    }
    public void setUpConnection() throws IOException {
        stopExistSocket();
        createSocket();
        startRxThread();
    }
    private void stopExistSocket() throws IOException {
        if (mSocket != null) {
            try {
                mSocket.close();
                mSocket = null;
            } catch (IOException e) {
                e.printStackTrace();
                throw e;
            }
        }
    }
    private void createSocket() throws IOException {
        SocketAddress sa = new InetSocketAddress(ip, port);
        mSocket = new Socket();
        try{
            //设置信息及超时时刻
            mSocket.connect(sa, wait);
            boolean isConnect = mSocket.isConnected();
            mSocket.setKeepAlive(isConnect);
        } catch(IOException e) {
            mSocket = null;
            throw e;
        }
    }
    private void startRxThread() {
        if (rxThread == null || !rxThread.isAlive()) {
            rxThread = initReceiveThread();
            rxThread.start();
        }
    }
    //发送信息
    public void doSent(byte[] data) throws IOException {
        if (mSocket == null) {
            setUpConnection();
            writeDataToStream(connect);
        }
        writeDataToStream(data);
    }
    private void writeDataToStream(byte[] data) throws IOException {
        try {
            OutputStream os = mSocket.getOutputStream();
            os.write(data);
            os.flush();
        } catch(IOException e) {
            setUpConnection();
            writeDataToStream(connect);
            reSendData(data);
        }
    }
    private void reSendData(byte[] data) throws IOException {
        if (mSocket != null) {
            try {
                OutputStream os = mSocket.getOutputStream();
                os.write(data);
                os.flush();
            } catch(IOException e) {
                e.printStackTrace();
                throw e;
            }
        }
    }
    private Runnable wrapperRunnable(final BaseRequest request) {
        return new Runnable() {
            @Override
            public void run() {
                try {
                    doSent(request.data);
                } catch (IOException e) {
                    e.printStackTrace();
                    onPostMessageListener.onPostError(request.requestMsgType);
                }
            }
        };
    }
    //接受线程
    private Thread initReceiveThread() {
        return new Thread(new Runnable() {
            @SuppressWarnings("ResultOfMethodCallIgnored")
            @Override
            public void run() {
                try {
                    InputStream is;
                    while (mSocket != null) {
                        is = mSocket.getInputStream();
                        byte[] ackbuf    = new byte[4];  //消息号
                        byte[] serialbuf = new byte[4];  //流水号
                        byte[] lenbuf    = new byte[4];  //长度
                        byte[] datasbuf;                 //数据
                        int ackrs       = is.read(ackbuf, 0, 4);
                        if (ackrs < 0) {
                            Thread.sleep(1000);
                            continue;
                        }
                        is.read(serialbuf, 0, 4);
                        is.read(lenbuf, 0, 4);
                        int len = ByteBuffer.wrap(lenbuf, 0, 4).getInt();        //长度
                        datasbuf = new byte[len];
                        is.read(datasbuf, 0, len);
                        BaseResponse response = new BaseResponse();
                        response.responseMsgType = ByteBuffer.wrap(ackbuf, 0, 4).getInt();
                        response.serialNumber = ByteBuffer.wrap(serialbuf, 0, 4).getInt();
                        response.data = datasbuf;
                        onPostMessageListener.onPostResponse(response);
                    }
                }catch(IOException e) {
                    e.printStackTrace();
                } catch(Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
    private OnPostMessageListener onPostMessageListener;
    public void setOnPostMessageListener(OnPostMessageListener onPostMessageListener) {
        this.onPostMessageListener = onPostMessageListener;
    }
    public interface OnPostMessageListener {
        void onPostError(int typeId);
        void onPostResponse(BaseResponse response);
    }
}

结语

好了,到这儿 socket 接收和发送的 Service 就规划完了,我觉得这篇博客写的还挺清楚的,希望对读者有帮忙。

end