这里主要是记录一下如何实现在Java和Python进程之间的通信,并实现一种心跳包通信机制。
常见的进程间通信方式有:
- 只能用于有亲缘关系的进程间通信;
- 底层原理是通过文件系统实现信息交互,但是在内存中读写;
- 半全双工;
- 允许无亲缘关系的进程间通信;
- 但在Windows平台上使用可能不会太方便;
- 底层原理是通过文件系统实现信息交互,但是在内存中读写;
- 半全双工;
- 如Kafka消息系统等;
- 更为一般的进程间通信机制;
- 可以跨语言和跨平台;
- 可以远程通信;
- 全双工;
import lombok.extern.slf4j.Slf4j;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
@Slf4j
public class HeartbeatSocket implements Runnable{
public static final int PORT = 10000;
private ServerSocket serverSocket;
private Socket socket;
public HeartbeatSocket(ServerSocket serverSocket) {
this.serverSocket = serverSocket;
try {
System.out.println("Heartbeat socket is waiting accept.");
socket = serverSocket.accept();
System.out.println("Heartbeat socket accept.");
// 启动心跳线程
Thread thread = new Thread(this);
thread.start();
}
catch (InterruptedIOException e) {
log.error("Heartbeat socket accept time out error.", e);
}
catch (IOException e) {
log.error("Heartbeat socket accept error.", e);
}
}
@Override
public void run() {
// 等待建立连接
try{
// 处理连接数据流
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())), true);
while(true)
{
String str = in.readLine();
if(str.equals("END")) {
break;
}
System.out.println("Echoing:" + str);
// 注意设置了自动刷新之后,一定要用println函数才能生效,print是不行的
out.println(str + "-server");
}
}
catch (IOException e) {
log.error("Heartbeat socket connect error.", e);
}
finally {
System.out.println("closing socket");
try {
socket.close();
}
catch (IOException e) {
log.error("Heartbeat socket close error.", e);
}
}
}
public static void main(String[] args) {
// 调用心跳包监听等待建立心跳连接
HeartbeatSocket heartbeatSocket = new HeartbeatSocket(this.serverSocket);
}
}
import socket
import time
import threading
import json
class HeartbeatSocket:
def __init__(self):
self.host = 'localhost'
self.port = 10000
self.clientSocket = None
self.isHeartbeat = True
self.waitTime = 2 # 心跳包发送间隔时间
# 心跳包信息
self.heartbeatMessage = {
'message': 'still alive',
}
# 通知服务器终止信息
self.end_message = 'END'
def create_client_socket(self):
# socket.AF_INET: 服务器之间网络通信
# socket.SOCK_STREAM: 流式,TCP协议
self.clientSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.clientSocket.connect((self.host, self.port))
print('Heartbeat client is on.')
# 线程执行心跳
heartbeat_thread = threading.Thread(target=self.heartbeat, args=())
heartbeat_thread.start()
print('Heartbeat is on.')
def heartbeat(self):
while self.isHeartbeat:
# 将对象转换为String
message = json.dumps(self.heartbeatMessage)
# 一定要加'\n'才能让java成功读到,否则不会输出
self.clientSocket.sendall((message+'\n').encode()) # 发送数据
# 接收数据,decode之后是string类型
received_data = self.clientSocket.recv(1024).decode(encoding='utf-8') # 接收数据
print('Received: ' + received_data)
# 等待一段时间
time.sleep(self.waitTime)
# 一定要加'\n'才能让java成功读到,否则不会输出
self.clientSocket.sendall((self.end_message + '\n').encode()) # 发送数据
print('Heartbeat is end.')
def close_client_socket(self):
# 通过内存进行线程通信,停止发送心跳
self.isHeartbeat = False
# 等待发送完毕
time.sleep(2 * self.waitTime + 1)
# 关闭socket
self.clientSocket.close()
print('Heartbeat client is close.')
if __name__ == '__main__':
# 创建心跳线程
heartbeat_socket = HeartbeatSocket.HeartbeatSocket()
heartbeat_socket.create_client_socket()
try:
# 具体代码区域
pass
# 结束心跳线程
heartbeat_socket.close_client_socket()
except BaseException as e:
# 结束心跳线程
heartbeat_socket.close_client_socket()
# 打印错误,repr将对象转换成字符串
print('error: ' + repr(e))