多线程收发并行
TCP多线程收发协作
TCP 服务端收发并行重构
原有的main逻辑如下:
重构后如下:
public class Server {
public static void main(String[] args) throws IOException {
TCPServer tcpServer = new TCPServer(TCPConstants.PORT_SERVER);
boolean isSucceed = tcpServer.start();
if(!isSucceed){
System.out.println("Start TCP server failed.");
}
UDPProvider.start(TCPConstants.PORT_SERVER);
// 键盘输入:
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
String str;
do {
str = bufferedReader.readLine();
tcpServer.broadcast(str);
} while (!"00bye00".equalsIgnoreCase(str));
UDPProvider.stop();
tcpServer.stop();
}
}
重构后,从while循环不断读取键盘输入信息,当输入“00bye00” 时退出读取。此处只读取键盘输入数据,客户端发送的数据在会重新拆分出来新的线程单独处理。
创建 ClientHandler.java 重构收发消息操作:
public class ClientHandler {
private final Socket socket;
private final ClientReadHandler readHandler;
private final ClientWriteHandler writeHandler;
private final CloseNotiry closeNotiry;
public ClientHandler(Socket socket, CloseNotiry closeNotiry ) throws IOException {
this.socket = socket;
this.readHandler = new ClientReadHandler(socket.getInputStream());
this.writeHandler = new ClientWriteHandler(socket.getOutputStream());
this.closeNotiry = closeNotiry;
System.out.println("新客户链接: " + socket.getInetAddress() + "\tP:" + socket.getPort());
}
}
/**
* 接收数据
*/
class ClientReadHandler extends Thread {
private boolean done = false;
private final InputStream inputStream;
ClientReadHandler(InputStream inputStream){
this.inputStream = inputStream;
}
@Override
public void run(){
super.run();
try {
// 得到输入流,用于接收数据
BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));
do {
// 客户端拿到一条数据
String str = socketInput.readLine();
if(str == null){
System.out.println("客户端已无法读取数据!");
// 退出当前客户端
ClientHandler.this.exitBySelf();
break;
}
// 打印到屏幕
System.out.println(str);
}while (!done);
socketInput.close();
}catch (IOException e){
if(!done){
System.out.println("连接异常断开");
ClientHandler.this.exitBySelf();
}
}finally {
// 连接关闭
CloseUtils.close(inputStream);
}
}
void exit(){
done = true;
CloseUtils.close(inputStream);
}
}
创建一个单独的线程进行接收消息,该线程不需要关闭。
/**
* 发送数据
*/
class ClientWriteHandler {
private boolean done = false;
private final PrintStream printStream;
private final ExecutorService executorService;
ClientWriteHandler(OutputStream outputStream) {
this.printStream = new PrintStream(outputStream);
// 发送消息使用线程池来实现
this.executorService = Executors.newSingleThreadExecutor();
}
void exit(){
done = true;
CloseUtils.close(printStream);
executorService.shutdown();
}
void send(String str) {
executorService.execute(new WriteRunnable(str));
}
class WriteRunnable implements Runnable{
private final String msg;
WriteRunnable(String msg){
this.msg = msg;
}
@Override
public void run(){
if(ClientWriteHandler.this.done){
return;
}
try {
ClientWriteHandler.this.printStream.println(msg);
}catch (Exception e){
e.printStackTrace();
}
}
}
}
public void broadcast(String str) {
for (ClientHandler client : clientHandlerList){
// 发送消息
client.send(str);
}
}
private List<ClientHandler> clientHandlerList = new ArrayList<>();
/**
* 监听客户端链接
*/
private class ClientListener extends Thread {
private ServerSocket server;
private boolean done = false;
private ClientListener(int port) throws IOException {
server = new ServerSocket(port);
System.out.println("服务器信息: " + server.getInetAddress() + "\tP:" + server.getLocalPort());
}
@Override
public void run(){
super.run();
System.out.println("服务器准备就绪~");
// 等待客户端连接
do{
// 得到客户端
Socket client;
try {
client = server.accept();
}catch (Exception e){
continue;
}
try {
// 客户端构建异步线程
ClientHandler clientHandler = new ClientHandler(client, handler -> clientHandlerList.remove(handler));
// 启动线程
clientHandler.readToPrint();
clientHandlerList.add(clientHandler);
} catch (IOException e) {
e.printStackTrace();
System.out.println("客户端连接异常: " + e.getMessage());
}
}while (!done);
System.out.println("服务器已关闭!");
}
void exit(){
done = true;
try {
server.close();
}catch (IOException e){
e.printStackTrace();
}
}
}
clientHandlerList作为已经建立了连接的客户端的集合,用于管理当前用户的信息。接收与发送都使用该集合。
/**
* 退出、关闭流
*/
public void exit(){
readHandler.exit();
writeHandler.exit();
CloseUtils.close(socket);
System.out.println("客户端已退出:" + socket.getInetAddress() + "\tP:" + socket.getPort());
}
/**
* 发送消息
* @param str
*/
public void send(String str){
writeHandler.send(str);
}
/**
* 接收消息
*/
public void readToPrint() {
readHandler.exit();
}
/**
* 接收、发送消息异常,自动关闭
*/
private void exitBySelf() {
exit();
closeNotiry.onSelfClosed(this);
}
/**
* 关闭流
*/
public interface CloseNotiry{
void onSelfClosed(ClientHandler handler);
}
public static void main(String[] args) {
// 定义10秒的搜索时间,如果超过10秒未搜索到,就认为服务器端没有开机
ServerInfo info = UDPSearcher.searchServer(10000);
System.out.println("Server:" + info);
if( info != null){
try {
TCPClient.linkWith(info);
}catch (IOException e){
e.printStackTrace();
}
}
}
static class ReadHandler extends Thread{
private boolean done = false;
private final InputStream inputStream;
ReadHandler(InputStream inputStream){
this.inputStream = inputStream;
}
@Override
public void run(){
try {
// 得到输入流,用于接收数据
BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));
do {
// 客户端拿到一条数据
String str = null;
try {
str = socketInput.readLine();
}catch (SocketTimeoutException e){
}
if(str == null){
System.out.println("连接已关闭,无法读取数据!");
break;
}
// 打印到屏幕
System.out.println(str);
}while (!done);
socketInput.close();
}catch (IOException e){
if(!done){
System.out.println("连接异常断开:" + e.getMessage());
}
}finally {
// 连接关闭
CloseUtils.close(inputStream);
}
}
void exit(){
done = true;
CloseUtils.close(inputStream);
}
}
创建ReadHandler用单独的线程去接收服务端的消息。连接关闭则exit() 关闭客户端。
private static void write(Socket client) throws IOException {
// 构建键盘输入流
InputStream in = System.in;
BufferedReader input = new BufferedReader(new InputStreamReader(in));
// 得到Socket输出流,并转换为打印流
OutputStream outputStream = client.getOutputStream();
PrintStream socketPrintStream = new PrintStream(outputStream);
boolean flag = true;
do {
// 键盘读取一行
String str = input.readLine();
// 发送到服务器
socketPrintStream.println(str);
// 从服务器读取一行
if("00bye00".equalsIgnoreCase(str)){
break;
}
}while(flag);
// 资源释放
socketPrintStream.close();
}
在linkWith() 中调用write() 发送方法,由 do-while 循环读取本地键盘输入信息进行发送操作。当满足 “00bye00” 时,关闭循环,关闭socket连接,结束该线程。
public static void linkWith(ServerInfo info) throws IOException {
Socket socket = new Socket();
// 超时时间
socket.setSoTimeout(3000);
// 端口2000;超时时间300ms
socket.connect(new InetSocketAddress(Inet4Address.getByName(info.getAddress()),info.getPort()));//
System.out.println("已发起服务器连接,并进入后续流程~");
System.out.println("客户端信息: " + socket.getLocalAddress() + "\tP:" + socket.getLocalPort());
System.out.println("服务器信息:" + socket.getInetAddress() + "\tP:" + socket.getPort());
try {
ReadHandler readHandler = new ReadHandler(socket.getInputStream());
readHandler.start();
// 发送接收数据
write(socket);
}catch (Exception e){
System.out.println("异常关闭");
}
// 释放资源
socket.close();
System.out.println("客户端已退出~");
}
原有的逻辑里,是调用 todo() 方法,在todo() 方法里同时进行收发操作。现在是进行读写分离。