• Python史上最全种类数据库操作方法,你能想到的数据库类型都在里面!甚至还有云数据库!


    本文将详细探讨如何在Python中连接全种类数据库以及实现相应的CRUD(创建,读取,更新,删除)操作。我们将逐一解析连接MySQL,SQL Server,Oracle,PostgreSQL,MongoDB,SQLite,DB2,Redis,Cassandra,Microsoft Access,ElasticSearch,Neo4j,InfluxDB,Snowflake,Amazon DynamoDB,Microsoft Azure CosMos DB数据库的方法,并演示相应的CRUD操作。

    MySQL

    连接数据库

    Python可以使用mysql-connector-python库连接MySQL数据库:

    import mysql.connector
    
    conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
    print("Opened MySQL database successfully")
    conn.close()
    

    CRUD操作

    接下来,我们将展示在MySQL中如何进行基本的CRUD操作。

    创建(Create)

    conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
    
    cursor = conn.cursor()
    cursor.execute("CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME TEXT NOT NULL, AGE INT, ADDRESS CHAR(50), SALARY REAL)")
    print("Table created successfully")
    
    conn.close()
    

    读取(Retrieve)

    conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
    
    cursor = conn.cursor()
    cursor.execute("SELECT id, name, address, salary from Employees")
    rows = cursor.fetchall()
    for row in rows:
        print("ID = ", row[0])
        print("NAME = ", row[1])
        print("ADDRESS = ", row[2])
        print("SALARY = ", row[3])
    
    conn.close()
    

    更新(Update)

    conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
    
    cursor = conn.cursor()
    cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
    conn.commit()
    
    print("Total number of rows updated :", cursor.rowcount)
    
    conn.close()
    

    删除(Delete)

    conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
    
    cursor = conn.cursor()
    cursor.execute("DELETE from Employees where ID = 1")
    conn.commit()
    
    print("Total number of rows deleted :", cursor.rowcount)
    
    conn.close()
    

    SQL Server

    连接数据库

    Python可以使用pyodbc库连接SQL Server数据库:

    import pyodbc
    
    conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
    print("Opened SQL Server database successfully")
    conn.close()
    

    CRUD操作

    接下来,我们将展示在SQL Server中如何进行基本的CRUD操作。

    创建(Create)

    conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
    
    cursor = conn.cursor()
    cursor.execute("CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME VARCHAR(20) NOT NULL, AGE INT, ADDRESS CHAR(50), SALARY REAL)")
    conn.commit()
    print("Table created successfully")
    
    conn.close()
    

    读取(Retrieve)

    conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
    
    cursor = conn.cursor()
    cursor.execute("SELECT id, name, address, salary from Employees")
    rows = cursor.fetchall()
    for row in rows:
        print("ID = ", row[0])
        print("NAME = ", row[1])
        print("ADDRESS = ", row[2])
        print("SALARY = ", row[3])
    
    conn.close()
    

    更新(Update)

    conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
    
    cursor = conn.cursor()
    cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
    conn.commit()
    
    print("Total number of rows updated :", cursor.rowcount)
    
    conn.close()
    

    删除(Delete)

    conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
    
    cursor = conn.cursor()
    cursor.execute("DELETE from Employees where ID = 1")
    conn.commit()
    
    print("Total number of rows deleted :", cursor.rowcount)
    
    conn.close()
    

    Oracle

    连接数据库

    Python可以使用cx_Oracle库连接Oracle数据库:

    import cx_Oracle
    
    dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') 
    conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
    print("Opened Oracle database successfully")
    conn.close()
    

    CRUD操作

    接下来,我们将展示在Oracle中如何进行基本的CRUD操作。

    创建(Create)

    dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') 
    conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
    
    cursor = conn.cursor()
    cursor.execute("CREATE TABLE Employees (ID NUMBER(10) NOT NULL PRIMARY KEY, NAME VARCHAR2(20) NOT NULL, AGE NUMBER(3), ADDRESS CHAR(50), SALARY NUMBER(10, 2))")
    conn.commit()
    print("Table created successfully")
    
    conn.close()
    

    读取(Retrieve)

    dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') 
    conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
    
    cursor = conn.cursor()
    cursor.execute("SELECT id, name, address, salary from Employees")
    rows = cursor.fetchall()
    for row in rows:
        print("ID = ", row[0])
        print("NAME = ", row[1])
        print("ADDRESS = ", row[2])
        print("SALARY = ", row[3])
    
    conn.close()
    

    更新(Update)

    dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') 
    conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
    
    cursor = conn.cursor()
    cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
    conn.commit()
    
    print("Total number of rows updated :", cursor.rowcount)
    
    conn.close()
    

    删除(Delete)

    dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') 
    conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
    
    cursor = conn.cursor()
    cursor.execute("DELETE from Employees where ID = 1")
    conn.commit()
    
    print("Total number of rows deleted :", cursor.rowcount)
    
    conn.close()
    

    PostgreSQL

    连接数据库

    Python可以使用psycopg2库连接PostgreSQL数据库:

    import psycopg2
    
    conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
    print("Opened PostgreSQL database successfully")
    conn.close()
    

    CRUD操作

    接下来,我们将展示在PostgreSQL中如何进行基本的CRUD操作。

    创建(Create)

    conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
    
    cursor = conn.cursor()
    cursor.execute('''CREATE TABLE Employees
          (ID INT PRIMARY KEY     NOT NULL,
          NAME           TEXT    NOT NULL,
          AGE            INT     NOT NULL,
          ADDRESS        CHAR(50),
          SALARY         REAL);''')
    conn.commit()
    print("Table created successfully")
    
    conn.close()
    

    读取(Retrieve)

    conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
    
    cursor = conn.cursor()
    cursor.execute("SELECT id, name, address, salary from Employees")
    rows = cursor.fetchall()
    for row in rows:
        print("ID = ", row[0])
        print("NAME = ", row[1])
        print("ADDRESS = ", row[2])
        print("SALARY = ", row[3])
    
    conn.close()
    

    更新(Update)

    conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
    
    cursor = conn.cursor()
    cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
    conn.commit()
    
    print("Total number of rows updated :", cursor.rowcount)
    
    conn.close()
    

    删除(Delete)

    conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
    
    cursor = conn.cursor()
    cursor.execute("DELETE from Employees where ID = 1")
    conn.commit()
    
    print("Total number of rows deleted :", cursor.rowcount)
    
    conn.close()
    

    MongoDB

    连接数据库

    Python可以使用pymongo库连接MongoDB数据库:

    from pymongo import MongoClient
    
    client = MongoClient("mongodb://localhost:27017/")
    db = client["my_database"]
    print("Opened MongoDB database successfully")
    client.close()
    

    CRUD操作

    接下来,我们将展示在MongoDB中如何进行基本的CRUD操作。

    创建(Create)

    在MongoDB中,文档的创建操作通常包含在插入操作中:

    client = MongoClient("mongodb://localhost:27017/")
    db = client["my_database"]
    
    employees = db["Employees"]
    employee = {"id": "1", "name": "John", "age": "30", "address": "New York", "salary": "1000.00"}
    
    employees.insert_one(employee)
    print("Document inserted successfully")
    
    client.close()
    

    读取(Retrieve)

    client = MongoClient("mongodb://localhost:27017/")
    db = client["my_database"]
    
    employees = db["Employees"]
    cursor = employees.find()
    for document in cursor:
        print(document)
    
    client.close()
    

    更新(Update)

    client = MongoClient("mongodb://localhost:27017/")
    db = client["my_database"]
    
    employees = db["Employees"]
    query = { "id": "1" }
    new_values = { "$set": { "salary": "25000.00" } }
    
    employees.update_one(query, new_values)
    
    print("Document updated successfully")
    
    client.close()
    

    删除(Delete)

    client = MongoClient("mongodb://localhost:27017/")
    db = client["my_database"]
    
    employees = db["Employees"]
    query = { "id": "1" }
    
    employees.delete_one(query)
    
    print("Document deleted successfully")
    
    client.close()
    

    SQLite

    连接数据库

    Python使用sqlite3库连接SQLite数据库:

    import sqlite3
    
    conn = sqlite3.connect('my_database.db')
    print("Opened SQLite database successfully")
    conn.close()
    

    CRUD操作

    接下来,我们将展示在SQLite中如何进行基本的CRUD操作。

    创建(Create)

    conn = sqlite3.connect('my_database.db')
    
    cursor = conn.cursor()
    cursor.execute('''CREATE TABLE Employees
          (ID INT PRIMARY KEY     NOT NULL,
          NAME           TEXT    NOT NULL,
          AGE            INT     NOT NULL,
          ADDRESS        CHAR(50),
          SALARY         REAL);''')
    conn.commit()
    print("Table created successfully")
    
    conn.close()
    

    读取(Retrieve)

    conn = sqlite3.connect('my_database.db')
    
    cursor = conn.cursor()
    cursor.execute("SELECT id, name, address, salary from Employees")
    rows = cursor.fetchall()
    for row in rows:
        print("ID = ", row[0])
        print("NAME = ", row[1])
        print("ADDRESS = ", row[2])
        print("SALARY = ", row[3])
    
    conn.close()
    

    更新(Update)

    conn = sqlite3.connect('my_database.db')
    
    cursor = conn.cursor()
    cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
    conn.commit()
    
    print("Total number of rows updated :", cursor.rowcount)
    
    conn.close()
    

    删除(Delete)

    conn = sqlite3.connect('my_database.db')
    
    cursor = conn.cursor()
    cursor.execute("DELETE from Employees where ID = 1")
    conn.commit()
    
    print("Total number of rows deleted :", cursor.rowcount)
    
    conn.close()
    

    DB2

    连接数据库

    Python可以使用ibm_db库连接DB2数据库:

    import ibm_db
    
    dsn = (
        "DRIVER={{IBM DB2 ODBC DRIVER}};"
        "DATABASE=my_database;"
        "HOSTNAME=127.0.0.1;"
        "PORT=50000;"
        "PROTOCOL=TCPIP;"
        "UID=username;"
        "PWD=password;"
    )
    conn = ibm_db.connect(dsn, "", "")
    print("Opened DB2 database successfully")
    ibm_db.close(conn)
    

    CRUD操作

    接下来,我们将展示在DB2中如何进行基本的CRUD操作。

    创建(Create)

    conn = ibm_db.connect(dsn, "", "")
    
    sql = '''CREATE TABLE Employees
          (ID INT PRIMARY KEY     NOT NULL,
          NAME           VARCHAR(20)    NOT NULL,
          AGE            INT     NOT NULL,
          ADDRESS        CHAR(50),
          SALARY         DECIMAL(9, 2));'''
    stmt = ibm_db.exec_immediate(conn, sql)
    print("Table created successfully")
    
    ibm_db.close(conn)
    

    读取(Retrieve)

    conn = ibm_db.connect(dsn, "", "")
    
    sql = "SELECT id, name, address, salary from Employees"
    stmt = ibm_db.exec_immediate(conn, sql)
    while ibm_db.fetch_row(stmt):
        print("ID = ", ibm_db.result(stmt, "ID"))
        print("NAME = ", ibm_db.result(stmt, "NAME"))
        print("ADDRESS = ", ibm_db.result(stmt, "ADDRESS"))
        print("SALARY = ", ibm_db.result(stmt, "SALARY"))
    
    ibm_db.close(conn)
    

    更新(Update)

    conn = ibm_db.connect(dsn, "", "")
    
    sql = "UPDATE Employees set SALARY = 25000.00 where ID = 1"
    stmt = ibm_db.exec_immediate(conn, sql)
    ibm_db.commit(conn)
    
    print("Total number of rows updated :", ibm_db.num_rows(stmt))
    
    ibm_db.close(conn)
    

    删除(Delete)

    conn = ibm_db.connect(dsn, "", "")
    
    sql = "DELETE from Employees where ID = 1"
    stmt = ibm_db.exec_immediate(conn, sql)
    ibm_db.commit(conn)
    
    print("Total number of rows deleted :", ibm_db.num_rows(stmt))
    
    ibm_db.close(conn)
    

    Microsoft Access

    连接数据库

    Python可以使用pyodbc库连接Microsoft Access数据库:

    import pyodbc
    
    conn_str = (
        r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};'
        r'DBQ=path_to_your_access_file.accdb;'
    )
    conn = pyodbc.connect(conn_str)
    print("Opened Access database successfully")
    conn.close()
    

    CRUD操作

    接下来,我们将展示在Access中如何进行基本的CRUD操作。

    创建(Create)

    conn = pyodbc.connect(conn_str)
    
    cursor = conn.cursor()
    cursor.execute('''CREATE TABLE Employees
          (ID INT PRIMARY KEY     NOT NULL,
          NAME           TEXT    NOT NULL,
          AGE            INT     NOT NULL,
          ADDRESS        CHAR(50),
          SALARY         DECIMAL(9, 2));''')
    conn.commit()
    print("Table created successfully")
    
    conn.close()
    

    读取(Retrieve)

    conn = pyodbc.connect(conn_str)
    
    cursor = conn.cursor()
    cursor.execute("SELECT id, name, address, salary from Employees")
    rows = cursor.fetchall()
    for row in rows:
        print("ID = ", row[0])
        print("NAME = ", row[1])
        print("ADDRESS = ", row[2])
        print("SALARY = ", row[3])
    
    conn.close()
    

    更新(Update)

    conn = pyodbc.connect(conn_str)
    
    cursor = conn.cursor()
    cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
    conn.commit()
    
    print("Total number of rows updated :", cursor.rowcount)
    
    conn.close()
    

    删除(Delete)

    conn = pyodbc.connect(conn_str)
    
    cursor = conn.cursor()
    cursor.execute("DELETE from Employees where ID = 1")
    conn.commit()
    
    print("Total number of rows deleted :", cursor.rowcount)
    
    conn.close()
    

    Cassandra

    连接数据库

    Python可以使用cassandra-driver库连接Cassandra数据库:

    from cassandra.cluster import Cluster
    
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect('my_keyspace')
    print("Opened Cassandra database successfully")
    cluster.shutdown()
    

    CRUD操作

    接下来,我们将展示在Cassandra中如何进行基本的CRUD操作。

    创建(Create)

    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect('my_keyspace')
    
    session.execute("""
        CREATE TABLE Employees (
            id int PRIMARY KEY,
            name text,
            age int,
            address text,
            salary decimal
        )
    """)
    print("Table created successfully")
    
    cluster.shutdown()
    

    读取(Retrieve)

    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect('my_keyspace')
    
    rows = session.execute('SELECT id, name, address, salary FROM Employees')
    for row in rows:
        print("ID = ", row.id)
        print("NAME = ", row.name)
        print("ADDRESS = ", row.address)
        print("SALARY = ", row.salary)
    
    cluster.shutdown()
    

    更新(Update)

    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect('my_keyspace')
    
    session.execute("UPDATE Employees SET salary = 25000.00 WHERE id = 1")
    print("Row updated successfully")
    
    cluster.shutdown()
    

    删除(Delete)

    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect('my_keyspace')
    
    session.execute("DELETE FROM Employees WHERE id = 1")
    print("Row deleted successfully")
    
    cluster.shutdown()
    

    Redis

    连接数据库

    Python可以使用redis-py库连接Redis数据库:

    import redis
    
    r = redis.Redis(host='localhost', port=6379, db=0)
    print("Opened Redis database successfully")
    

    CRUD操作

    接下来,我们将展示在Redis中如何进行基本的CRUD操作。

    创建(Create)

    r = redis.Redis(host='localhost', port=6379, db=0)
    
    r.set('employee:1:name', 'John')
    r.set('employee:1:age', '30')
    r.set('employee:1:address', 'New York')
    r.set('employee:1:salary', '1000.00')
    
    print("Keys created successfully")
    

    读取(Retrieve)

    r = redis.Redis(host='localhost', port=6379, db=0)
    
    print("NAME = ", r.get('employee:1:name').decode('utf-8'))
    print("AGE = ", r.get('employee:1:age').decode('utf-8'))
    print("ADDRESS = ", r.get('employee:1:address').decode('utf-8'))
    print("SALARY = ", r.get('employee:1:salary').decode('utf-8'))
    

    更新(Update)

    r = redis.Redis(host='localhost', port=6379, db=0)
    
    r.set('employee:1:salary', '25000.00')
    
    print("Key updated successfully")
    

    删除(Delete)

    r = redis.Redis(host='localhost', port=6379, db=0)
    
    r.delete('employee:1:name', 'employee:1:age', 'employee:1:address', 'employee:1:salary')
    
    print("Keys deleted successfully")
    

    ElasticSearch

    连接数据库

    Python可以使用elasticsearch库连接ElasticSearch数据库:

    from elasticsearch import Elasticsearch
    
    es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
    print("Opened ElasticSearch database successfully")
    

    CRUD操作

    接下来,我们将展示在ElasticSearch中如何进行基本的CRUD操作。

    创建(Create)

    es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
    
    employee = {
        'name': 'John',
        'age': 30,
        'address': 'New York',
        'salary': 1000.00
    }
    res = es.index(index='employees', doc_type='employee', id=1, body=employee)
    
    print("Document created successfully")
    

    读取(Retrieve)

    es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
    
    res = es.get(index='employees', doc_type='employee', id=1)
    print("Document details:")
    for field, details in res['_source'].items():
        print(f"{field.upper()} = ", details)
    

    更新(Update)

    es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
    
    res = es.update(index='employees', doc_type='employee', id=1, body={
        'doc': {
            'salary': 25000.00
        }
    })
    
    print("Document updated successfully")
    

    删除(Delete)

    es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
    
    res = es.delete(index='employees', doc_type='employee', id=1)
    
    print("Document deleted successfully")
    

    Neo4j

    连接数据库

    Python可以使用neo4j库连接Neo4j数据库:

    from neo4j import GraphDatabase
    
    driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
    print("Opened Neo4j database successfully")
    driver.close()
    

    CRUD操作

    接下来,我们将展示在Neo4j中如何进行基本的CRUD操作。

    创建(Create)

    driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
    
    with driver.session() as session:
        session.run("CREATE (:Employee {id: 1, name: 'John', age: 30, address: 'New York', salary: 1000.00})")
    
    print("Node created successfully")
    
    driver.close()
    

    读取(Retrieve)

    driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
    
    with driver.session() as session:
        result = session.run("MATCH (n:Employee) WHERE n.id = 1 RETURN n")
        for record in result:
            print("ID = ", record["n"]["id"])
            print("NAME = ", record["n"]["name"])
            print("ADDRESS = ", record["n"]["address"])
            print("SALARY = ", record["n"]["salary"])
    
    driver.close()
    

    更新(Update)

    driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
    
    with driver.session() as session:
        session.run("MATCH (n:Employee) WHERE n.id = 1 SET n.salary = 25000.00")
    
    print("Node updated successfully")
    
    driver.close()
    

    删除(Delete)

    driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
    
    with driver.session() as session:
        session.run("MATCH (n:Employee) WHERE n.id = 1 DETACH DELETE n")
    
    print("Node deleted successfully")
    
    driver.close()
    

    InfluxDB

    连接数据库

    Python可以使用InfluxDB-Python库连接InfluxDB数据库:

    from influxdb import InfluxDBClient
    
    client = InfluxDBClient(host='localhost', port=8086)
    print("Opened InfluxDB database successfully")
    client.close()
    

    CRUD操作

    接下来,我们将展示在InfluxDB中如何进行基本的CRUD操作。

    创建(Create)

    client = InfluxDBClient(host='localhost', port=8086)
    
    json_body = [
        {
            "measurement": "employees",
            "tags": {
                "id": "1"
            },
            "fields": {
                "name": "John",
                "age": 30,
                "address": "New York",
                "salary": 1000.00
            }
        }
    ]
    
    client.write_points(json_body)
    
    print("Point created successfully")
    
    client.close()
    

    读取(Retrieve)

    client = InfluxDBClient(host='localhost', port=8086)
    
    result = client.query('SELECT "name", "age", "address", "salary" FROM "employees"')
    
    for point in result.get_points():
        print("ID = ", point['id'])
        print("NAME = ", point['name'])
        print("AGE = ", point['age'])
        print("ADDRESS = ", point['address'])
        print("SALARY = ", point['salary'])
    
    client.close()
    

    更新(Update)

    InfluxDB的数据模型和其他数据库不同,它没有更新操作。但是你可以通过写入一个相同的数据点(即具有相同的时间戳和标签)并改变字段值,实现类似更新操作的效果。

    删除(Delete)

    同样,InfluxDB也没有提供删除单个数据点的操作。然而,你可以删除整个系列(即表)或者删除某个时间段的数据。

    client = InfluxDBClient(host='localhost', port=8086)
    
    # 删除整个系列
    client.query('DROP SERIES FROM "employees"')
    
    # 删除某个时间段的数据
    # client.query('DELETE FROM "employees" WHERE time < now() - 1d')
    
    print("Series deleted successfully")
    
    client.close()
    

    Snowflake

    连接数据库

    Python可以使用snowflake-connector-python库连接Snowflake数据库:

    from snowflake.connector import connect
    
    con = connect(
        user='username',
        password='password',
        account='account_url',
        warehouse='warehouse',
        database='database',
        schema='schema'
    )
    print("Opened Snowflake database successfully")
    con.close()
    

    CRUD操作

    接下来,我们将展示在Snowflake中如何进行基本的CRUD操作。

    创建(Create)

    con = connect(
        user='username',
        password='password',
        account='account_url',
        warehouse='warehouse',
        database='database',
        schema='schema'
    )
    
    cur = con.cursor()
    cur.execute("""
    CREATE TABLE EMPLOYEES (
        ID INT,
        NAME STRING,
        AGE INT,
        ADDRESS STRING,
        SALARY FLOAT
    )
    """)
    
    cur.execute("""
    INSERT INTO EMPLOYEES (ID, NAME, AGE, ADDRESS, SALARY) VALUES
    (1, 'John', 30, 'New York', 1000.00)
    """)
    
    print("Table created and row inserted successfully")
    
    con.close()
    

    读取(Retrieve)

    con = connect(
        user='username',
        password='password',
        account='account_url',
        warehouse='warehouse',
        database='database',
        schema='schema'
    )
    
    cur = con.cursor()
    cur.execute("SELECT * FROM EMPLOYEES WHERE ID = 1")
    
    rows = cur.fetchall()
    
    for row in rows:
        print("ID = ", row[0])
        print("NAME = ", row[1])
        print("AGE = ", row[2])
        print("ADDRESS = ", row[3])
        print("SALARY = ", row[4])
    
    con.close()
    

    更新(Update)

    con = connect(
        user='username',
        password='password',
        account='account_url',
        warehouse='warehouse',
        database='database',
        schema='schema'
    )
    
    cur = con.cursor()
    cur.execute("UPDATE EMPLOYEES SET SALARY = 25000.00 WHERE ID = 1")
    
    print("Row updated successfully")
    
    con.close()
    

    删除(Delete)

    con = connect(
        user='username',
        password='password',
        account='account_url',
        warehouse='warehouse',
        database='database',
        schema='schema'
    )
    
    cur = con.cursor()
    cur.execute("DELETE FROM EMPLOYEES WHERE ID = 1")
    
    print("Row deleted successfully")
    
    con.close()
    

    Amazon DynamoDB

    连接数据库

    Python可以使用boto3库连接Amazon DynamoDB:

    import boto3
    
    dynamodb = boto3.resource('dynamodb', region_name='us-west-2',
                              aws_access_key_id='Your AWS Access Key',
                              aws_secret_access_key='Your AWS Secret Key')
    
    print("Opened DynamoDB successfully")
    

    CRUD操作

    接下来,我们将展示在DynamoDB中如何进行基本的CRUD操作。

    创建(Create)

    table = dynamodb.create_table(
        TableName='Employees',
        KeySchema=[
            {
                'AttributeName': 'id',
                'KeyType': 'HASH'
            },
        ],
        AttributeDefinitions=[
            {
                'AttributeName': 'id',
                'AttributeType': 'N'
            },
        ],
        ProvisionedThroughput={
            'ReadCapacityUnits': 5,
            'WriteCapacityUnits': 5
        }
    )
    
    table.put_item(
       Item={
            'id': 1,
            'name': 'John',
            'age': 30,
            'address': 'New York',
            'salary': 1000.00
        }
    )
    
    print("Table created and item inserted successfully")
    

    读取(Retrieve)

    table = dynamodb.Table('Employees')
    
    response = table.get_item(
       Key={
            'id': 1,
        }
    )
    
    item = response['Item']
    print(item)
    

    更新(Update)

    table = dynamodb.Table('Employees')
    
    table.update_item(
        Key={
            'id': 1,
        },
        UpdateExpression='SET salary = :val1',
        ExpressionAttributeValues={
            ':val1': 25000.00
        }
    )
    
    print("Item updated successfully")
    

    删除(Delete)

    table = dynamodb.Table('Employees')
    
    table.delete_item(
        Key={
            'id': 1,
        }
    )
    
    print("Item deleted successfully")
    

    Microsoft Azure CosMos DB

    连接数据库

    Python可以使用azure-cosmos库连接Microsoft Azure CosMos DB:

    from azure.cosmos import CosmosClient, PartitionKey, exceptions
    
    url = 'Cosmos DB Account URL'
    key = 'Cosmos DB Account Key'
    client = CosmosClient(url, credential=key)
    
    database_name = 'testDB'
    database = client.get_database_client(database_name)
    
    container_name = 'Employees'
    container = database.get_container_client(container_name)
    
    print("Opened CosMos DB successfully")
    

    CRUD操作

    接下来,我们将展示在CosMos DB中如何进行基本的CRUD操作。

    创建(Create)

    database = client.create_database_if_not_exists(id=database_name)
    
    container = database.create_container_if_not_exists(
        id=container_name, 
        partition_key=PartitionKey(path="/id"),
        offer_throughput=400
    )
    
    container.upsert_item({
        'id': '1',
        'name': 'John',
        'age': 30,
        'address': 'New York',
        'salary': 1000.00
    })
    
    print("Container created and item upserted successfully")
    

    读取(Retrieve)

    for item in container.read_all_items():
        print(item)
    

    更新(Update)

    for item in container.read_all_items():
        if item['id'] == '1':
            item['salary'] = 25000.00
            container.upsert_item(item)
            
    print("Item updated successfully")
    

    删除(Delete)

    for item in container.read_all_items():
        if item['id'] == '1':
            container.delete_item(item, partition_key='1')
            
    print("Item deleted successfully")
    

    如有帮助,请多关注
    个人微信公众号:【Python全视角】
    TeahLead_KrisChang,10+年的互联网和人工智能从业经验,10年+技术和业务团队管理经验,同济软件工程本科,复旦工程管理硕士,阿里云认证云服务资深架构师,上亿营收AI产品业务负责人。

  • 相关阅读:
    使用OAuth2实现授权服务
    基于Rplidar二维雷达使用Hector_SLAM算法在ROS中建图
    图论基础(python蓝桥杯)
    如何使用 OpenSSL 来检查证书,来确保网络通信的安全性?
    汽车行业调研:特种车市场发展趋势及现状分析
    G-DetKD
    【Html/Css】Https证书申请、安装和使用(新手IIS安装参考)
    导入excel工具类
    提高爬虫效率之多线程、多进程的使用
    Linux新版内核升级后问题
  • 原文地址:https://www.cnblogs.com/xfuture/p/17528203.html