本文将详细探讨如何在Python中连接全种类数据库以及实现相应的CRUD(创建,读取,更新,删除)操作。我们将逐一解析连接MySQL,SQL Server,Oracle,PostgreSQL,MongoDB,SQLite,DB2,Redis,Cassandra,Microsoft Access,ElasticSearch,Neo4j,InfluxDB,Snowflake,Amazon DynamoDB,Microsoft Azure CosMos DB数据库的方法,并演示相应的CRUD操作。
Python可以使用mysql-connector-python库连接MySQL数据库:
import mysql.connector conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database') print("Opened MySQL database successfully") conn.close()
接下来,我们将展示在MySQL中如何进行基本的CRUD操作。
conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database') cursor = conn.cursor() cursor.execute("CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME TEXT NOT NULL, AGE INT, ADDRESS CHAR(50), SALARY REAL)") print("Table created successfully") conn.close()
conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database') cursor = conn.cursor() cursor.execute("SELECT id, name, address, salary from Employees") rows = cursor.fetchall() for row in rows: print("ID = ", row[0]) print("NAME = ", row[1]) print("ADDRESS = ", row[2]) print("SALARY = ", row[3]) conn.close()
conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database') cursor = conn.cursor() cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1") conn.commit() print("Total number of rows updated :", cursor.rowcount) conn.close()
conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database') cursor = conn.cursor() cursor.execute("DELETE from Employees where ID = 1") conn.commit() print("Total number of rows deleted :", cursor.rowcount) conn.close()
Python可以使用pyodbc库连接SQL Server数据库:
import pyodbc conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password') print("Opened SQL Server database successfully") conn.close()
接下来,我们将展示在SQL Server中如何进行基本的CRUD操作。
conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password') cursor = conn.cursor() cursor.execute("CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME VARCHAR(20) NOT NULL, AGE INT, ADDRESS CHAR(50), SALARY REAL)") conn.commit() print("Table created successfully") conn.close()
conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password') cursor = conn.cursor() cursor.execute("SELECT id, name, address, salary from Employees") rows = cursor.fetchall() for row in rows: print("ID = ", row[0]) print("NAME = ", row[1]) print("ADDRESS = ", row[2]) print("SALARY = ", row[3]) conn.close()
conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password') cursor = conn.cursor() cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1") conn.commit() print("Total number of rows updated :", cursor.rowcount) conn.close()
conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password') cursor = conn.cursor() cursor.execute("DELETE from Employees where ID = 1") conn.commit() print("Total number of rows deleted :", cursor.rowcount) conn.close()
Python可以使用cx_Oracle库连接Oracle数据库:
import cx_Oracle dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns) print("Opened Oracle database successfully") conn.close()
接下来,我们将展示在Oracle中如何进行基本的CRUD操作。
dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns) cursor = conn.cursor() cursor.execute("CREATE TABLE Employees (ID NUMBER(10) NOT NULL PRIMARY KEY, NAME VARCHAR2(20) NOT NULL, AGE NUMBER(3), ADDRESS CHAR(50), SALARY NUMBER(10, 2))") conn.commit() print("Table created successfully") conn.close()
dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns) cursor = conn.cursor() cursor.execute("SELECT id, name, address, salary from Employees") rows = cursor.fetchall() for row in rows: print("ID = ", row[0]) print("NAME = ", row[1]) print("ADDRESS = ", row[2]) print("SALARY = ", row[3]) conn.close()
dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns) cursor = conn.cursor() cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1") conn.commit() print("Total number of rows updated :", cursor.rowcount) conn.close()
dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns) cursor = conn.cursor() cursor.execute("DELETE from Employees where ID = 1") conn.commit() print("Total number of rows deleted :", cursor.rowcount) conn.close()
Python可以使用psycopg2库连接PostgreSQL数据库:
import psycopg2 conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432") print("Opened PostgreSQL database successfully") conn.close()
接下来,我们将展示在PostgreSQL中如何进行基本的CRUD操作。
conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432") cursor = conn.cursor() cursor.execute('''CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME TEXT NOT NULL, AGE INT NOT NULL, ADDRESS CHAR(50), SALARY REAL);''') conn.commit() print("Table created successfully") conn.close()
conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432") cursor = conn.cursor() cursor.execute("SELECT id, name, address, salary from Employees") rows = cursor.fetchall() for row in rows: print("ID = ", row[0]) print("NAME = ", row[1]) print("ADDRESS = ", row[2]) print("SALARY = ", row[3]) conn.close()
conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432") cursor = conn.cursor() cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1") conn.commit() print("Total number of rows updated :", cursor.rowcount) conn.close()
conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432") cursor = conn.cursor() cursor.execute("DELETE from Employees where ID = 1") conn.commit() print("Total number of rows deleted :", cursor.rowcount) conn.close()
Python可以使用pymongo库连接MongoDB数据库:
from pymongo import MongoClient client = MongoClient("mongodb://localhost:27017/") db = client["my_database"] print("Opened MongoDB database successfully") client.close()
接下来,我们将展示在MongoDB中如何进行基本的CRUD操作。
在MongoDB中,文档的创建操作通常包含在插入操作中:
client = MongoClient("mongodb://localhost:27017/") db = client["my_database"] employees = db["Employees"] employee = {"id": "1", "name": "John", "age": "30", "address": "New York", "salary": "1000.00"} employees.insert_one(employee) print("Document inserted successfully") client.close()
client = MongoClient("mongodb://localhost:27017/") db = client["my_database"] employees = db["Employees"] cursor = employees.find() for document in cursor: print(document) client.close()
client = MongoClient("mongodb://localhost:27017/") db = client["my_database"] employees = db["Employees"] query = { "id": "1" } new_values = { "$set": { "salary": "25000.00" } } employees.update_one(query, new_values) print("Document updated successfully") client.close()
client = MongoClient("mongodb://localhost:27017/") db = client["my_database"] employees = db["Employees"] query = { "id": "1" } employees.delete_one(query) print("Document deleted successfully") client.close()
Python使用sqlite3库连接SQLite数据库:
import sqlite3 conn = sqlite3.connect('my_database.db') print("Opened SQLite database successfully") conn.close()
接下来,我们将展示在SQLite中如何进行基本的CRUD操作。
conn = sqlite3.connect('my_database.db') cursor = conn.cursor() cursor.execute('''CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME TEXT NOT NULL, AGE INT NOT NULL, ADDRESS CHAR(50), SALARY REAL);''') conn.commit() print("Table created successfully") conn.close()
conn = sqlite3.connect('my_database.db') cursor = conn.cursor() cursor.execute("SELECT id, name, address, salary from Employees") rows = cursor.fetchall() for row in rows: print("ID = ", row[0]) print("NAME = ", row[1]) print("ADDRESS = ", row[2]) print("SALARY = ", row[3]) conn.close()
conn = sqlite3.connect('my_database.db') cursor = conn.cursor() cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1") conn.commit() print("Total number of rows updated :", cursor.rowcount) conn.close()
conn = sqlite3.connect('my_database.db') cursor = conn.cursor() cursor.execute("DELETE from Employees where ID = 1") conn.commit() print("Total number of rows deleted :", cursor.rowcount) conn.close()
Python可以使用ibm_db库连接DB2数据库:
import ibm_db dsn = ( "DRIVER={{IBM DB2 ODBC DRIVER}};" "DATABASE=my_database;" "HOSTNAME=127.0.0.1;" "PORT=50000;" "PROTOCOL=TCPIP;" "UID=username;" "PWD=password;" ) conn = ibm_db.connect(dsn, "", "") print("Opened DB2 database successfully") ibm_db.close(conn)
接下来,我们将展示在DB2中如何进行基本的CRUD操作。
conn = ibm_db.connect(dsn, "", "") sql = '''CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME VARCHAR(20) NOT NULL, AGE INT NOT NULL, ADDRESS CHAR(50), SALARY DECIMAL(9, 2));''' stmt = ibm_db.exec_immediate(conn, sql) print("Table created successfully") ibm_db.close(conn)
conn = ibm_db.connect(dsn, "", "") sql = "SELECT id, name, address, salary from Employees" stmt = ibm_db.exec_immediate(conn, sql) while ibm_db.fetch_row(stmt): print("ID = ", ibm_db.result(stmt, "ID")) print("NAME = ", ibm_db.result(stmt, "NAME")) print("ADDRESS = ", ibm_db.result(stmt, "ADDRESS")) print("SALARY = ", ibm_db.result(stmt, "SALARY")) ibm_db.close(conn)
conn = ibm_db.connect(dsn, "", "") sql = "UPDATE Employees set SALARY = 25000.00 where ID = 1" stmt = ibm_db.exec_immediate(conn, sql) ibm_db.commit(conn) print("Total number of rows updated :", ibm_db.num_rows(stmt)) ibm_db.close(conn)
conn = ibm_db.connect(dsn, "", "") sql = "DELETE from Employees where ID = 1" stmt = ibm_db.exec_immediate(conn, sql) ibm_db.commit(conn) print("Total number of rows deleted :", ibm_db.num_rows(stmt)) ibm_db.close(conn)
Python可以使用pyodbc库连接Microsoft Access数据库:
import pyodbc conn_str = ( r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};' r'DBQ=path_to_your_access_file.accdb;' ) conn = pyodbc.connect(conn_str) print("Opened Access database successfully") conn.close()
接下来,我们将展示在Access中如何进行基本的CRUD操作。
conn = pyodbc.connect(conn_str) cursor = conn.cursor() cursor.execute('''CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME TEXT NOT NULL, AGE INT NOT NULL, ADDRESS CHAR(50), SALARY DECIMAL(9, 2));''') conn.commit() print("Table created successfully") conn.close()
conn = pyodbc.connect(conn_str) cursor = conn.cursor() cursor.execute("SELECT id, name, address, salary from Employees") rows = cursor.fetchall() for row in rows: print("ID = ", row[0]) print("NAME = ", row[1]) print("ADDRESS = ", row[2]) print("SALARY = ", row[3]) conn.close()
conn = pyodbc.connect(conn_str) cursor = conn.cursor() cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1") conn.commit() print("Total number of rows updated :", cursor.rowcount) conn.close()
conn = pyodbc.connect(conn_str) cursor = conn.cursor() cursor.execute("DELETE from Employees where ID = 1") conn.commit() print("Total number of rows deleted :", cursor.rowcount) conn.close()
Python可以使用cassandra-driver库连接Cassandra数据库:
from cassandra.cluster import Cluster cluster = Cluster(['127.0.0.1']) session = cluster.connect('my_keyspace') print("Opened Cassandra database successfully") cluster.shutdown()
接下来,我们将展示在Cassandra中如何进行基本的CRUD操作。
cluster = Cluster(['127.0.0.1']) session = cluster.connect('my_keyspace') session.execute(""" CREATE TABLE Employees ( id int PRIMARY KEY, name text, age int, address text, salary decimal ) """) print("Table created successfully") cluster.shutdown()
cluster = Cluster(['127.0.0.1']) session = cluster.connect('my_keyspace') rows = session.execute('SELECT id, name, address, salary FROM Employees') for row in rows: print("ID = ", row.id) print("NAME = ", row.name) print("ADDRESS = ", row.address) print("SALARY = ", row.salary) cluster.shutdown()
cluster = Cluster(['127.0.0.1']) session = cluster.connect('my_keyspace') session.execute("UPDATE Employees SET salary = 25000.00 WHERE id = 1") print("Row updated successfully") cluster.shutdown()
cluster = Cluster(['127.0.0.1']) session = cluster.connect('my_keyspace') session.execute("DELETE FROM Employees WHERE id = 1") print("Row deleted successfully") cluster.shutdown()
Python可以使用redis-py库连接Redis数据库:
import redis r = redis.Redis(host='localhost', port=6379, db=0) print("Opened Redis database successfully")
接下来,我们将展示在Redis中如何进行基本的CRUD操作。
r = redis.Redis(host='localhost', port=6379, db=0) r.set('employee:1:name', 'John') r.set('employee:1:age', '30') r.set('employee:1:address', 'New York') r.set('employee:1:salary', '1000.00') print("Keys created successfully")
r = redis.Redis(host='localhost', port=6379, db=0) print("NAME = ", r.get('employee:1:name').decode('utf-8')) print("AGE = ", r.get('employee:1:age').decode('utf-8')) print("ADDRESS = ", r.get('employee:1:address').decode('utf-8')) print("SALARY = ", r.get('employee:1:salary').decode('utf-8'))
r = redis.Redis(host='localhost', port=6379, db=0) r.set('employee:1:salary', '25000.00') print("Key updated successfully")
r = redis.Redis(host='localhost', port=6379, db=0) r.delete('employee:1:name', 'employee:1:age', 'employee:1:address', 'employee:1:salary') print("Keys deleted successfully")
Python可以使用elasticsearch库连接ElasticSearch数据库:
from elasticsearch import Elasticsearch es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) print("Opened ElasticSearch database successfully")
接下来,我们将展示在ElasticSearch中如何进行基本的CRUD操作。
es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) employee = { 'name': 'John', 'age': 30, 'address': 'New York', 'salary': 1000.00 } res = es.index(index='employees', doc_type='employee', id=1, body=employee) print("Document created successfully")
es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) res = es.get(index='employees', doc_type='employee', id=1) print("Document details:") for field, details in res['_source'].items(): print(f"{field.upper()} = ", details)
es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) res = es.update(index='employees', doc_type='employee', id=1, body={ 'doc': { 'salary': 25000.00 } }) print("Document updated successfully")
es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) res = es.delete(index='employees', doc_type='employee', id=1) print("Document deleted successfully")
Python可以使用neo4j库连接Neo4j数据库:
from neo4j import GraphDatabase driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password")) print("Opened Neo4j database successfully") driver.close()
接下来,我们将展示在Neo4j中如何进行基本的CRUD操作。
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password")) with driver.session() as session: session.run("CREATE (:Employee {id: 1, name: 'John', age: 30, address: 'New York', salary: 1000.00})") print("Node created successfully") driver.close()
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password")) with driver.session() as session: result = session.run("MATCH (n:Employee) WHERE n.id = 1 RETURN n") for record in result: print("ID = ", record["n"]["id"]) print("NAME = ", record["n"]["name"]) print("ADDRESS = ", record["n"]["address"]) print("SALARY = ", record["n"]["salary"]) driver.close()
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password")) with driver.session() as session: session.run("MATCH (n:Employee) WHERE n.id = 1 SET n.salary = 25000.00") print("Node updated successfully") driver.close()
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password")) with driver.session() as session: session.run("MATCH (n:Employee) WHERE n.id = 1 DETACH DELETE n") print("Node deleted successfully") driver.close()
Python可以使用InfluxDB-Python库连接InfluxDB数据库:
from influxdb import InfluxDBClient client = InfluxDBClient(host='localhost', port=8086) print("Opened InfluxDB database successfully") client.close()
接下来,我们将展示在InfluxDB中如何进行基本的CRUD操作。
client = InfluxDBClient(host='localhost', port=8086) json_body = [ { "measurement": "employees", "tags": { "id": "1" }, "fields": { "name": "John", "age": 30, "address": "New York", "salary": 1000.00 } } ] client.write_points(json_body) print("Point created successfully") client.close()
client = InfluxDBClient(host='localhost', port=8086) result = client.query('SELECT "name", "age", "address", "salary" FROM "employees"') for point in result.get_points(): print("ID = ", point['id']) print("NAME = ", point['name']) print("AGE = ", point['age']) print("ADDRESS = ", point['address']) print("SALARY = ", point['salary']) client.close()
InfluxDB的数据模型和其他数据库不同,它没有更新操作。但是你可以通过写入一个相同的数据点(即具有相同的时间戳和标签)并改变字段值,实现类似更新操作的效果。
同样,InfluxDB也没有提供删除单个数据点的操作。然而,你可以删除整个系列(即表)或者删除某个时间段的数据。
client = InfluxDBClient(host='localhost', port=8086) # 删除整个系列 client.query('DROP SERIES FROM "employees"') # 删除某个时间段的数据 # client.query('DELETE FROM "employees" WHERE time < now() - 1d') print("Series deleted successfully") client.close()
Python可以使用snowflake-connector-python库连接Snowflake数据库:
from snowflake.connector import connect con = connect( user='username', password='password', account='account_url', warehouse='warehouse', database='database', schema='schema' ) print("Opened Snowflake database successfully") con.close()
接下来,我们将展示在Snowflake中如何进行基本的CRUD操作。
con = connect( user='username', password='password', account='account_url', warehouse='warehouse', database='database', schema='schema' ) cur = con.cursor() cur.execute(""" CREATE TABLE EMPLOYEES ( ID INT, NAME STRING, AGE INT, ADDRESS STRING, SALARY FLOAT ) """) cur.execute(""" INSERT INTO EMPLOYEES (ID, NAME, AGE, ADDRESS, SALARY) VALUES (1, 'John', 30, 'New York', 1000.00) """) print("Table created and row inserted successfully") con.close()
con = connect( user='username', password='password', account='account_url', warehouse='warehouse', database='database', schema='schema' ) cur = con.cursor() cur.execute("SELECT * FROM EMPLOYEES WHERE ID = 1") rows = cur.fetchall() for row in rows: print("ID = ", row[0]) print("NAME = ", row[1]) print("AGE = ", row[2]) print("ADDRESS = ", row[3]) print("SALARY = ", row[4]) con.close()
con = connect( user='username', password='password', account='account_url', warehouse='warehouse', database='database', schema='schema' ) cur = con.cursor() cur.execute("UPDATE EMPLOYEES SET SALARY = 25000.00 WHERE ID = 1") print("Row updated successfully") con.close()
con = connect( user='username', password='password', account='account_url', warehouse='warehouse', database='database', schema='schema' ) cur = con.cursor() cur.execute("DELETE FROM EMPLOYEES WHERE ID = 1") print("Row deleted successfully") con.close()
Python可以使用boto3库连接Amazon DynamoDB:
import boto3 dynamodb = boto3.resource('dynamodb', region_name='us-west-2', aws_access_key_id='Your AWS Access Key', aws_secret_access_key='Your AWS Secret Key') print("Opened DynamoDB successfully")
接下来,我们将展示在DynamoDB中如何进行基本的CRUD操作。
table = dynamodb.create_table( TableName='Employees', KeySchema=[ { 'AttributeName': 'id', 'KeyType': 'HASH' }, ], AttributeDefinitions=[ { 'AttributeName': 'id', 'AttributeType': 'N' }, ], ProvisionedThroughput={ 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 } ) table.put_item( Item={ 'id': 1, 'name': 'John', 'age': 30, 'address': 'New York', 'salary': 1000.00 } ) print("Table created and item inserted successfully")
table = dynamodb.Table('Employees') response = table.get_item( Key={ 'id': 1, } ) item = response['Item'] print(item)
table = dynamodb.Table('Employees') table.update_item( Key={ 'id': 1, }, UpdateExpression='SET salary = :val1', ExpressionAttributeValues={ ':val1': 25000.00 } ) print("Item updated successfully")
table = dynamodb.Table('Employees') table.delete_item( Key={ 'id': 1, } ) print("Item deleted successfully")
Python可以使用azure-cosmos库连接Microsoft Azure CosMos DB:
from azure.cosmos import CosmosClient, PartitionKey, exceptions url = 'Cosmos DB Account URL' key = 'Cosmos DB Account Key' client = CosmosClient(url, credential=key) database_name = 'testDB' database = client.get_database_client(database_name) container_name = 'Employees' container = database.get_container_client(container_name) print("Opened CosMos DB successfully")
接下来,我们将展示在CosMos DB中如何进行基本的CRUD操作。
database = client.create_database_if_not_exists(id=database_name) container = database.create_container_if_not_exists( id=container_name, partition_key=PartitionKey(path="/id"), offer_throughput=400 ) container.upsert_item({ 'id': '1', 'name': 'John', 'age': 30, 'address': 'New York', 'salary': 1000.00 }) print("Container created and item upserted successfully")
for item in container.read_all_items(): print(item)
for item in container.read_all_items(): if item['id'] == '1': item['salary'] = 25000.00 container.upsert_item(item) print("Item updated successfully")
for item in container.read_all_items(): if item['id'] == '1': container.delete_item(item, partition_key='1') print("Item deleted successfully")
如有帮助,请多关注
TeahLead_KrisChang,10+年的互联网和人工智能从业经验,10年+技术和业务团队管理经验,同济软件工程本科,复旦工程管理硕士,阿里云认证云服务资深架构师,上亿营收AI产品业务负责人。