Java BIO就是传统的java io 编程,其相关的类和接口在java.io
BlO(blocking I/O):同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器)
服务端:
客户端:
以下同步阻塞实例说明服务端与客户端之间互相等待的机制,当客户端发送给服务端消息时,服务端就会接收,接收完毕后服务端进入等待状态(阻塞),等待客户端继续发送,直到发现客户端断开,服务端也会断开
package example.client;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.net.Socket;
/**
* 客户端,负责发送消息
*/
public class Client1 {
public static void main(String[] args) throws IOException {
//客户端创建Socket,对应IP地址以及端口号
Socket socket = new Socket("127.0.0.1", 9000);
//设置字节输出流,写数据
OutputStream outputStream = socket.getOutputStream();
//通过使用打印流的方式进行写数据
PrintStream printStream = new PrintStream(outputStream);
printStream.println("client is asking for connect ...");
printStream.flush();
}
}
package example.server;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
/**
* 服务端,负责接受消息
*/
public class Server1 {
public static void main(String[] args) throws IOException {
//定义ServerSocket
ServerSocket serverSocket = new ServerSocket(9000);
//监听客户端Socket请求
Socket accept = serverSocket.accept();
//获取输入流
InputStream inputStream = accept.getInputStream();
//读取输入流数据,使用缓冲字符输入流,当然这里要通过InputStreamReader进行转换
//InputStreamReader可以将字节转换为字符流
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
String line;
System.out.println("=============Server read :=============");
while ((line = reader.readLine())!=null){
System.out.println(line);
}
}
}
通过程序阻塞等待用户输入发方式实现多发多收
package example.client;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;
/**
* keepalive
*/
public class Client2 {
public static void main(String[] args) throws IOException {
System.out.println("client is asking for connect ...");
Socket socket = new Socket("127.0.0.1", 8999);
OutputStream outputStream = socket.getOutputStream();
PrintStream printStream = new PrintStream(outputStream);
//通过程序阻塞等待用户输入发方式实现keepalive
Scanner scanner = new Scanner(System.in);
while (true){
System.out.println("client:");
String str = scanner.nextLine();
printStream.println(str);
printStream.flush();
if ("bye".equals(str)){
System.out.println("connect end============>");
break;
}
}
}
}
package example.server;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
/**
* keepalive
*/
public class Server2 {
public static void main(String[] args) {
System.out.println("=============Server read :=============");
try {
ServerSocket serverSocket = new ServerSocket(8999);
Socket accept = serverSocket.accept();
InputStream inputStream = accept.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
String line;
while ((line = reader.readLine()) != null) {
System.out.println(line);
}
} catch (Exception e) {
System.out.println("connect reset!");
}
}
}
目的是让服务端接收多个客户端的消息
打开编辑配置
package example.client;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;
public class Client3 {
public static void main(String[] args) throws IOException {
System.out.println("client is asking for connect ...");
Socket socket = new Socket("127.0.0.1", 8888);
OutputStream outputStream = socket.getOutputStream();
PrintStream printStream = new PrintStream(outputStream);
Scanner scanner = new Scanner(System.in);
while (true){
System.out.print("client:");
String str = scanner.nextLine();
printStream.println(str);
printStream.flush();
}
}
}
客户端每发起一个请求,服务端就创建一个新线程来处理这个客户端
package example.server;
import example.ThreadServerReader;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class Server3 {
public static void main(String[] args) {
try {
ServerSocket serverSocket = new ServerSocket(8888);
//利用循环,接收客户端的Socket链接请求
while (true){
Socket accept = serverSocket.accept();
new ThreadServerReader(accept).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
package example;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;
public class ThreadServerReader extends Thread {
private Socket socket;
public ThreadServerReader(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
//获取字节输入流
try {
InputStream inputStream = socket.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
String line;
while ((line = reader.readLine())!=null){
System.out.println(line);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
采用一个伪异步I/O的通信框架,采用线程池和任务队列实现,当客户端接入时,将客户端的Socket封装成一个Task(该任务实现java.lang.Runnable线程任务接口)交给后端的线程池中进行处理。JDK的线程池维护一个消息队列和N个活跃的线程,对消息队列中Socket任务进行处理,由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机
package example.client;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;
public class Client3 {
public static void main(String[] args) throws IOException {
System.out.println("client is asking for connect ...");
Socket socket = new Socket("127.0.0.1", 8888);
OutputStream outputStream = socket.getOutputStream();
PrintStream printStream = new PrintStream(outputStream);
Scanner scanner = new Scanner(System.in);
while (true){
System.out.print("client:");
String str = scanner.nextLine();
printStream.println(str);
printStream.flush();
}
}
}
package example;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class SocketPool {
//创建线程池成员变量
private ExecutorService executorService;
// 创建时初始化线程池
public SocketPool(int maxThread, int queueSize) {
//参数1:核心线程数量
//参数2:最大线程数量
//参数3:存活时间
//参数4:存活时间的单位吗,使用TimeUnit直接声明
//参数5:队列,ArrayBlockingQueue阻塞式数组队列
this.executorService = new ThreadPoolExecutor(2,maxThread,10, TimeUnit.MINUTES,new ArrayBlockingQueue<Runnable>(queueSize));
}
//提交任务至线程池任务队列进行暂存,等待线程池进行处理
public void execute(Runnable mission){
this.executorService.execute(mission);
}
}
package example;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;
public class ServerRunableMission implements Runnable{
private Socket socket;
public ServerRunableMission(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
InputStream inputStream = socket.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
String line;
while ((line = reader.readLine())!=null){
System.out.println(line);
}
} catch (IOException e) {
System.out.println("connect end ....");
}
}
}
package example.server;
import example.ServerRunableMission;
import example.SocketPool;
import example.ThreadServerReader;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class Server4 {
public static void main(String[] args) {
try {
ServerSocket serverSocket = new ServerSocket(8888);
//初始化线程池对象
SocketPool socketPool = new SocketPool(3,5);
//利用循环,接收客户端的Socket链接请求
while (true){
Socket accept = serverSocket.accept();
//将socket交给线程池进行处理
Runnable serverRunableMission = new ServerRunableMission(accept);
socketPool.execute(serverRunableMission);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
我们通过线程池对线程队列中的线程进行处理以及限制,将每个线程任务交由线程队列进行存储,线程池会处理定义的核心线程数的线程,例如我设置了核心线程数量为3,那么在同一时刻可以同时并发的处理三个线程的任务,最大线程数要大于等于核心线程数,当同一时刻的线程数量大于我定义 的核心线程数的时候,线程池就不会处理,等待一定的时间后,线程池空闲了才会去处理
package example.file;
import java.io.*;
import java.net.Socket;
public class Client {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1", 8888);
//使用数据输出流DataOutputStream
DataOutputStream dataOutputStream = new DataOutputStream(socket.getOutputStream());
//先发送文件的后缀
dataOutputStream.writeUTF(".png");
//发送文件数据
String path = "D:\\Pictures\\logo.png";
FileInputStream fileInputStream = new FileInputStream(path);
byte[] bytes = new byte[1024];
int len;
while ((len = fileInputStream.read(bytes))!=-1){
dataOutputStream.write(bytes,0,len);
}
dataOutputStream.flush();
socket.shutdownOutput();
// dataOutputStream.close();
}
}
package example.file;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
public class Server {
public static void main(String[] args) {
try {
ServerSocket serverSocket = new ServerSocket(8888);
Socket accept = serverSocket.accept();
new ThreadServerReader(accept).start();
} catch (IOException e) {
System.out.println("connect end ....");
}
}
}
package example.file;
import java.io.*;
import java.net.Socket;
import java.time.LocalDateTime;
public class ThreadServerReader extends Thread {
private Socket socket;
public ThreadServerReader(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
InputStream inputStream = socket.getInputStream();
DataInputStream dataInputStream = new DataInputStream(inputStream);
//读取类型
String suffix = dataInputStream.readUTF();
if (suffix.equals(null)) {
return;
}
System.out.println("========服务端接收成功=======");
System.out.println(suffix);
//读取数据
LocalDateTime now = LocalDateTime.now();
int minute = now.getMinute();
int second = now.getSecond();
int hour = now.getHour();
String phName = hour + "-" + minute + "-" + second;
FileOutputStream fileOutputStream = new FileOutputStream("D:\\Pictures\\test\\" + phName + suffix);
//写到服务器中
byte[] bytes = new byte[1024];
int len;
while ((len = dataInputStream.read(bytes)) != -1) {
fileOutputStream.write(bytes, 0, len);
}
fileOutputStream.close();
System.out.println("接收完成");
} catch (IOException e) {
e.printStackTrace();
}
}
}
群聊的思想,广播式转发,一个客户端消息可以发送和给所有客户端接收
服务端:
package cn.example;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
public class Server {
//List存储socket使用static保证只加载一份
public static ArrayList<Socket> socketList = new ArrayList<>();
public static void main(String[] args) {
try {
ServerSocket serverSocket = new ServerSocket(9000);
while (true){
Socket accept = serverSocket.accept();
//存入List中
socketList.add(accept);
//分配线程处理
new ThreadServerReader(accept).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
package cn.example;
import java.io.*;
import java.net.Socket;
public class ThreadServerReader extends Thread{
private Socket socket;
public ThreadServerReader(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String line;
while ((line= reader.readLine())!=null){
//推送给所有的客户端
this.sendMsgToAll(line);
}
} catch (IOException e) {
System.out.println("当前有连接断开");
Server.socketList.remove(socket);
}
}
//遍历所有客户端使用PrintStream写信息
private void sendMsgToAll(String line) throws IOException {
for (Socket socket : Server.socketList){
PrintStream printStream = new PrintStream(socket.getOutputStream());
printStream.println(line);
printStream.flush();
}
}
}