import cx_Oracle
# 1.基本信息
user = 'scott' # 用户名
password = '123456' # 密码
host = '127.0.0.1:1521' # 主机:端口
instance = 'orcl' # 服务名
# 2.格式:user/password@host/service_name
connect_str = f'{user}/{password}@{host}/{instance}'
# 3.测试
try:
connect_info = cx_Oracle.connect(connect_str)
print(f'[提示]连接成功,信息:{connect_info}')
except Exception as e:
print(f'[提示]连接失败,报错:{e}')
执行结果:
<cx_Oracle.Connection to scott@127.0.0.1:1521/orcl>
import cx_Oracle
class Oracle(object):
def __init__(self, user_name, password, host, instance):
self._conn = cx_Oracle.connect(f'{user_name}/{password}@{host}/{instance}')
self.cursor = self._conn.cursor()
print('[提示]: 连接成功')
def query_all(self, sql):
"""查询所有记录"""
self.cursor.execute(sql)
return self.cursor.fetchall()
if __name__ == '__main__':
test = Oracle('scott', '123456', '127.0.0.1:1521', 'orcl')
# 测试:查询所有记录
test_sql = 'select * from scott.dept'
result = test.query_all(test_sql)
print(result)
执行结果:
[提示]: 连接成功
[(10, 'ACCOUNTING', 'NEW YORK'),
(20, 'RESEARCH', 'DALLAS'),
(30, 'SALES', 'CHICAGO'),
(40, 'OPERATIONS', 'BOSTON')]
import cx_Oracle
class Oracle(object):
def __init__(self, user_name, password, host, instance):
self._conn = cx_Oracle.connect(f'{user_name}/{password}@{host}/{instance}')
self.cursor = self._conn.cursor()
print('[提示]: 连接成功')
def query_one(self, sql):
"""查询单条记录"""
self.cursor.execute(sql)
return self.cursor.fetchone()
if __name__ == '__main__':
test = Oracle('scott', '123456', '127.0.0.1:1521', 'orcl')
test_sql = 'select * from scott.dept'
# 测试:查询单条记录
result = test.query_one(test_sql)
print(result)
执行结果:
[提示]: 连接成功
(10, 'ACCOUNTING', 'NEW YORK')
import cx_Oracle
class Oracle(object):
def __init__(self, user_name, password, host, instance):
self._conn = cx_Oracle.connect(f'{user_name}/{password}@{host}/{instance}')
self.cursor = self._conn.cursor()
print('[提示]: 连接成功')
def query_by(self, sql, params):
"""根据条件查询记录"""
self.cursor.execute(sql, params)
return self.cursor.fetchall()
if __name__ == '__main__':
test = Oracle('scott', '123456', '127.0.0.1:1521', 'orcl')
# 测试:根据条件查询记录
result = 'select * from scott.dept where deptno = :deptno and dname = :dname'
result = test.query_by(result, {'deptno': 10, 'dname': 'ACCOUNTING'})
print(result)
执行结果:
[提示]: 连接成功
[(10, 'ACCOUNTING', 'NEW YORK')]
import cx_Oracle
class Oracle(object):
def __init__(self, user_name, password, host, instance):
self._conn = cx_Oracle.connect(f'{user_name}/{password}@{host}/{instance}')
self.cursor = self._conn.cursor()
print('[提示]: 连接成功')
def insert(self, sql, params):
"""插入数据"""
self.cursor.execute(sql, params)
self._conn.commit()
print('[提示]:执行成功!')
if __name__ == '__main__':
test = Oracle('scott', '123456', '127.0.0.1:1521', 'orcl')
# 测试
test_seq = "insert into scott.dept_bak(deptno, dname, loc) values(:deptno, :dname, :loc)"
test.insert(test_seq, {'deptno': 50, 'dname': 'a', 'loc': 'b'})
import cx_Oracle
class Oracle(object):
def __init__(self, user_name, password, host, instance):
self._conn = cx_Oracle.connect(f'{user_name}/{password}@{host}/{instance}')
self.cursor = self._conn.cursor()
print('[提示]: 连接成功')
def update(self, sql, params):
"""修改数据"""
self.cursor.execute(sql, params)
self._conn.commit()
print('[提示]: 修改完成!')
if __name__ == '__main__':
test = Oracle('scott', '123456', '127.0.0.1:1521', 'orcl')
# 测试
test_seq = "update scott.dept_bak t set t.dname = :dname, t.loc = :loc where t.deptno = :deptno"
test.update(test_seq, {'deptno': 50, 'dname': 'aaa', 'loc': 'bbbb'})
import cx_Oracle
class Oracle(object):
def __init__(self, user_name, password, host, instance):
self._conn = cx_Oracle.connect(f'{user_name}/{password}@{host}/{instance}')
self.cursor = self._conn.cursor()
print('[提示]: 连接成功')
def truncate(self, table_name):
"""清空表数据"""
sql = 'truncate table ' + table_name
self.cursor.execute(sql)
self._conn.commit()
print(f'[提示]: 清空表 {table_name} 成功')
if __name__ == '__main__':
test = Oracle('scott', '123456', '127.0.0.1:1521', 'orcl')
# 测试
test.truncate('scott.dept_bak')
import cx_Oracle
class Oracle(object):
def __init__(self, user_name, password, host, instance):
self._conn = cx_Oracle.connect(f'{user_name}/{password}@{host}/{instance}')
self.cursor = self._conn.cursor()
print('[提示]: 连接成功')
def delete(self, params):
"""根据条件,删除表数据"""
sql = "delete scott.dept_bak t where t.deptno = :deptno"
self.cursor.execute(sql, params)
self._conn.commit()
print(f'[提示]: 删除成功,条件:{params}')
if __name__ == '__main__':
test = Oracle('scott', '123456', '127.0.0.1:1521', 'orcl')
# 测试
test.delete({'deptno': 10})
import cx_Oracle
class Oracle(object):
def __init__(self, user_name, password, host, instance):
self._conn = cx_Oracle.connect(f'{user_name}/{password}@{host}/{instance}')
self.cursor = self._conn.cursor()
print('[提示]: 连接成功')
def procedure(self, params):
"""存储过程"""
sql = "begin pkg_test.pro_test(:i_deptno, :i_dname, :i_loc); end;"
self.cursor.execute(sql, params)
self._conn.commit()
print(f'[提示]: 调用存储过程成功,参数:{params}')
if __name__ == '__main__':
test = Oracle('scott', '123456', '127.0.0.1:1521', 'orcl')
# 测试
test.procedure({'i_deptno': 60, 'i_dname': 'aaa', 'i_loc': 'bbb'})
pkg head:
create or replace package pkg_test is
procedure pro_test(i_deptno in number,
i_dname in varchar2,
i_loc in varchar2);
end pkg_test;
pkg body:
create or replace package body pkg_test is
procedure pro_test(i_deptno in number,
i_dname in varchar2,
i_loc in varchar2) is
begin
execute immediate 'insert into scott.dept_bak(deptno, dname, loc) values(:b1, :b2, :b3)'
using i_deptno, i_dname, i_loc;
commit;
end pro_test;
end pkg_test;