在一些特定的场景下面,需要使用线上的数据库。但是正常使用连接肯定是连接不上的,所以这里就需要使用ssh通道来连接线上的数据库。
pip install paramiko pymysql
下面是示例代码:
- import paramiko
- import pymysql
-
- # SSH隧道配置
- ssh_host = 'your_ssh_server_ip'
- ssh_port = 22
- ssh_user = 'your_ssh_username'
- ssh_password = 'your_ssh_password'
-
- # MySQL数据库配置
- mysql_host = 'your_mysql_host_or_ip' # 这里填写MySQL的实际地址,如果通过SSH隧道访问localhost即可
- mysql_port = 3306
- mysql_user = 'your_mysql_username'
- mysql_password = 'your_mysql_password'
- mysql_db = 'your_database_name'
-
- def create_ssh_tunnel(ssh_host, ssh_port, ssh_user, ssh_password, remote_bind_address, local_bind_port):
- """创建SSH隧道"""
- transport = paramiko.Transport((ssh_host, ssh_port))
- transport.connect(username=ssh_user, password=ssh_password)
-
- # 设置转发规则,将本地端口转发到远程MySQL服务器
- channel = transport.open_channel("direct-tcpip", (remote_bind_address, mysql_port), ('', local_bind_port))
- return channel
-
- def query_mysql_via_tunnel(mysql_user, mysql_password, mysql_db, local_bind_port):
- """通过SSH隧道连接MySQL并执行查询"""
- conn = pymysql.connect(host='127.0.0.1', port=local_bind_port, user=mysql_user, passwd=mysql_password, db=mysql_db)
- try:
- with conn.cursor() as cursor:
- cursor.execute('SELECT VERSION()')
- result = cursor.fetchone()
- print("MySQL version:", result)
- finally:
- conn.close()
-
- if __name__ == "__main__":
- # 创建SSH隧道
- local_bind_port = 3307 # 选择一个未被占用的本地端口
- tunnel = create_ssh_tunnel(ssh_host, ssh_port, ssh_user, ssh_password, (mysql_host, mysql_port), local_bind_port)
-
- try:
- # 通过隧道连接MySQL并执行查询
- query_mysql_via_tunnel(mysql_user, mysql_password, mysql_db, local_bind_port)
- finally:
- # 关闭SSH连接
- tunnel.close()
请注意,需要将上述代码中的your_ssh_server_ip
、your_ssh_username
、your_ssh_password
、your_mysql_host_or_ip
、your_mysql_username
、your_mysql_password
以及your_database_name
替换为实际的值。此外,确保你有权限通过SSH访问目标服务器,并且MySQL服务器允许来自SSH服务器的连接。