• IDEA06:Java和Python的进程间通信和心跳包机制


    写在前面

    这里主要是记录一下如何实现在Java和Python进程之间的通信,并实现一种心跳包通信机制。

    一、进程间通信方式

    常见的进程间通信方式有:

    1. 匿名管道 (pipe)
    • 只能用于有亲缘关系的进程间通信;
    • 底层原理是通过文件系统实现信息交互,但是在内存中读写;
    • 半全双工;
    1. 命名管道 (named pipe)
    • 允许无亲缘关系的进程间通信;
    • 但在Windows平台上使用可能不会太方便;
    • 底层原理是通过文件系统实现信息交互,但是在内存中读写;
    • 半全双工;
    1. 信号 (signal)
    2. 消息队列 (message queue)
    • 如Kafka消息系统等;
    1. 共享内存 (shared memory)
    2. 内存映射 (mapped memory)
    3. 信号量 (semaphore)
    4. 套接口 (socket)
    • 更为一般的进程间通信机制;
    • 可以跨语言和跨平台;
    • 可以远程通信;
    • 全双工;
    • 由于大多数进程间通信的方式都需要系统调用来实现,所以在C语言上才可能容易实现;
    • 系统调用过程通常会因为操作系统不同而需要不同的操作;
    • 在Java和Python语言下,考虑到平台通用性和连接稳健性,推荐使用socket来实现进程间的通信。

    二、Java的ServerSocket

    • 在这里使用Java进程作为心跳连接的服务器端;
    • 代码如下:
    import lombok.extern.slf4j.Slf4j;
    
    import java.io.*;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    @Slf4j
    public class HeartbeatSocket implements Runnable{
    
        public static final int PORT = 10000;
        private ServerSocket serverSocket;
        private Socket socket;
    
        public HeartbeatSocket(ServerSocket serverSocket) {
    
            this.serverSocket = serverSocket;
    
            try {
                System.out.println("Heartbeat socket is waiting accept.");
                socket = serverSocket.accept();
                System.out.println("Heartbeat socket accept.");
    
                // 启动心跳线程
                Thread thread = new Thread(this);
                thread.start();
            }
            catch (InterruptedIOException e) {
                log.error("Heartbeat socket accept time out error.", e);
            }
            catch (IOException e) {
                log.error("Heartbeat socket accept error.", e);
            }
        }
    
        @Override
        public void run() {
            // 等待建立连接
            try{
                // 处理连接数据流
                BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())), true);
                while(true)
                {
                    String str = in.readLine();
                    if(str.equals("END")) {
                        break;
                    }
                    System.out.println("Echoing:" + str);
                    // 注意设置了自动刷新之后,一定要用println函数才能生效,print是不行的
                    out.println(str + "-server");
                }
            }
            catch (IOException e) {
                log.error("Heartbeat socket connect error.", e);
            }
            finally {
                System.out.println("closing socket");
                try {
                    socket.close();
                }
                catch (IOException e) {
                    log.error("Heartbeat socket close error.", e);
                }
            }
        }
        
        public static void main(String[] args) {
            // 调用心跳包监听等待建立心跳连接
            HeartbeatSocket heartbeatSocket = new HeartbeatSocket(this.serverSocket);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    三、Python的ClientSocket

    • 这里使用Python进程作为心跳连接的客户端;
    • 建立连接后,两个进程可以相互通信;
    • 一旦心跳连接的某个进程结束,socket会抛出对应的错误提示对方进程;
    • Socket部分可以参考博客:Python socket模块
    • 代码如下:
    import socket
    import time
    import threading
    import json
    
    
    class HeartbeatSocket:
    
        def __init__(self):
            self.host = 'localhost'
            self.port = 10000
            self.clientSocket = None
            self.isHeartbeat = True
            self.waitTime = 2  # 心跳包发送间隔时间
            # 心跳包信息
            self.heartbeatMessage = {
                'message': 'still alive',
            }
            # 通知服务器终止信息
            self.end_message = 'END'
    
        def create_client_socket(self):
            # socket.AF_INET: 服务器之间网络通信
            # socket.SOCK_STREAM: 流式,TCP协议
            self.clientSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.clientSocket.connect((self.host, self.port))
            print('Heartbeat client is on.')
    
            # 线程执行心跳
            heartbeat_thread = threading.Thread(target=self.heartbeat, args=())
            heartbeat_thread.start()
            print('Heartbeat is on.')
    
        def heartbeat(self):
            while self.isHeartbeat:
                # 将对象转换为String
                message = json.dumps(self.heartbeatMessage)
                # 一定要加'\n'才能让java成功读到,否则不会输出
                self.clientSocket.sendall((message+'\n').encode())  # 发送数据
                # 接收数据,decode之后是string类型
                received_data = self.clientSocket.recv(1024).decode(encoding='utf-8')  # 接收数据
                print('Received: ' + received_data)
                # 等待一段时间
                time.sleep(self.waitTime)
            # 一定要加'\n'才能让java成功读到,否则不会输出
            self.clientSocket.sendall((self.end_message + '\n').encode())  # 发送数据
            print('Heartbeat is end.')
    
        def close_client_socket(self):
            # 通过内存进行线程通信,停止发送心跳
            self.isHeartbeat = False
            # 等待发送完毕
            time.sleep(2 * self.waitTime + 1)
            # 关闭socket
            self.clientSocket.close()
            print('Heartbeat client is close.')
    
    
    if __name__ == '__main__':
        # 创建心跳线程
        heartbeat_socket = HeartbeatSocket.HeartbeatSocket()
        heartbeat_socket.create_client_socket()
    
        try:
            # 具体代码区域
            pass
            # 结束心跳线程
            heartbeat_socket.close_client_socket()
        except BaseException as e:
            # 结束心跳线程
            heartbeat_socket.close_client_socket()
    
            # 打印错误,repr将对象转换成字符串
            print('error: ' + repr(e))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
  • 相关阅读:
    TypeScript后端http请求
    LeetCode Cookbook 树(3)
    神经网络学说的主要观点,神经网络宏观解释包括
    牛客网语法篇练习复合类型(一)
    ChatGPT从⼊⻔到精通
    【Linux】yum及vim
    Nginx配置虚拟主机
    Oracle 服务器日常巡检
    vue(十三)——vue3新特性之组合式api(一)
    linux 夺命100问
  • 原文地址:https://blog.csdn.net/weixin_43992162/article/details/126234701