python---sqlalchemy(一)

2024-08-31 22:18
文章标签 python sqlalchemy

本文主要是介绍python---sqlalchemy(一),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

      • 执行原生SQL语句
      • 创建删除表
      • 操作数据库表
      • scoped_session
      • 增删改查
      • 其他

执行原生SQL语句

import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engineengine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test?charset=utf8",max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)def task(arg):# 方式一# conn = engine.raw_connection()# cursor = conn.cursor()# cursor.execute(#     "select * from user "# )# result = cursor.fetchall()# cursor.close()# conn.close()# print(result)# print("------ # 方式二----------------")# cur = engine.execute("select * from user")# result = cur.fetchall()# cur.close()# print(result)# print("------ # 方式三----------------")conn = engine.contextual_connect()with conn:cur = conn.execute("select * from user")result = cur.fetchall()print(result)for i in range(5):t = threading.Thread(target=task, args=(i,))t.start()

方式一输出如下:

(('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3))
(('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3))
(('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3))
(('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3))
(('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3))

方式二如下:

[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]

方式三如下:

[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]

创建删除表

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationshipBase = declarative_base()# ##################### 单表示例 #########################
class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)age = Column(Integer, default=18)email = Column(String(32), unique=True)ctime = Column(DateTime, default=datetime.datetime.now)extra = Column(Text, nullable=True)__table_args__ = (# UniqueConstraint('id', 'name', name='uix_id_name'),# Index('ix_id_name', 'name', 'extra'),)class Hosts(Base):__tablename__ = 'hosts'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)ctime = Column(DateTime, default=datetime.datetime.now)# ##################### 一对多示例 #########################
class Hobby(Base):__tablename__ = 'hobby'id = Column(Integer, primary_key=True)caption = Column(String(50), default='篮球')class Person(Base):__tablename__ = 'person'nid = Column(Integer, primary_key=True)name = Column(String(32), index=True, nullable=True)hobby_id = Column(Integer, ForeignKey("hobby.id"))# 与生成表结构无关,仅用于查询方便hobby = relationship("Hobby", backref='pers')# ##################### 多对多示例 #########################class Server2Group(Base):__tablename__ = 'server2group'id = Column(Integer, primary_key=True, autoincrement=True)server_id = Column(Integer, ForeignKey('server.id'))group_id = Column(Integer, ForeignKey('group.id'))class Group(Base):__tablename__ = 'group'id = Column(Integer, primary_key=True)name = Column(String(64), unique=True, nullable=False)# 与生成表结构无关,仅用于查询方便servers = relationship('Server', secondary='server2group', backref='groups')class Server(Base):__tablename__ = 'server'id = Column(Integer, primary_key=True, autoincrement=True)hostname = Column(String(64), unique=True, nullable=False)def init_db():"""根据类创建数据库表:return:"""engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test?charset=utf8",max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置))Base.metadata.create_all(engine)def drop_db():"""根据类删除数据库表:return:"""engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test?charset=utf8",max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置))Base.metadata.drop_all(engine)if __name__ == '__main__':drop_db()init_db()

操作数据库表

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engineBase = declarative_base()
class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)age = Column(Integer, default=18)email = Column(String(32), unique=True)ctime = Column(DateTime, default=datetime.datetime.now)extra = Column(Text, nullable=True)__table_args__ = (# UniqueConstraint('id', 'name', name='uix_id_name'),# Index('ix_id_name', 'name', 'extra'),)engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)# 每次执行数据库操作时,都需要创建一个session
session = Session()# ############# 执行ORM操作 #############
obj1 = Users(name="safly1")
session.add(obj1)# 提交事务
session.commit()
# 关闭session
session.close()

scoped_session

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_sessionimport datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationshipBase = declarative_base()class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)age = Column(Integer, default=18)email = Column(String(32), unique=True)ctime = Column(DateTime, default=datetime.datetime.now)extra = Column(Text, nullable=True)__table_args__ = (# UniqueConstraint('id', 'name', name='uix_id_name'),# Index('ix_id_name', 'name', 'extra'),)engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)"""
# 线程安全,基于本地线程实现每个线程用同一个session
# 特殊的:scoped_session中有原来方法的Session中的一下方法:"""
session = scoped_session(Session)# ############# 执行ORM操作 #############
obj1 = Users(name="safly")
session.add(obj1)# 提交事务
session.commit()
# 关闭session
session.close()

总结:

SQLAlchemy两种创建session的方式:方式一:import modelsfrom threading import Threadfrom sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_engineengine =create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/s8day128db?charset=utf8",pool_size=2,max_overflow=0)XXXXXX = sessionmaker(bind=engine)def task():from sqlalchemy.orm.session import Sessionsession = XXXXXX()data = session.query(models.Classes).all()print(data)session.close()for i in range(10):t = Thread(target=task)t.start()方式二(推荐):import modelsfrom threading import Threadfrom sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_enginefrom sqlalchemy.orm import scoped_sessionengine =create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/s8day128db?charset=utf8",pool_size=2,max_overflow=0)XXXXXX = sessionmaker(bind=engine)session = scoped_session(XXXXXX)def task():# 1. 原来的session对象 = 执行session.registry()# 2. 原来session对象.querydata = session.query(models.Classes).all()print(data)session.remove()for i in range(10):t = Thread(target=task)t.start()flask-session默认也是使用的第二种方式:scoped_session

增删改查

import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationshipBase = declarative_base()class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)age = Column(Integer, default=18)email = Column(String(32), unique=True)ctime = Column(DateTime, default=datetime.datetime.now)extra = Column(Text, nullable=True)__table_args__ = (# UniqueConstraint('id', 'name', name='uix_id_name'),# Index('ix_id_name', 'name', 'extra'),)from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import textengine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)session = Session()# ################ 添加add\add_all ################
"""
obj1 = Users(name="wupeiqi")
session.add(obj1)session.add_all([Users(name="wupeiqi"),Users(name="alex"),])
session.commit()
"""# ################ 删除 ################
"""
session.query(Users).filter(Users.id > 2).delete()
session.commit()"""# ################ 修改 ################"""
session.query(Users).filter(Users.id > 0).update({"name" : "099"})
session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
session.commit()
"""
# ################ 查询 ################
"""
"""print("-------r1----------")
r1 = session.query(Users).all()for i in r1:print(i.name)print("-------r2----------")r2 = session.query(Users.name, Users.age).all()
for i in r2:print(i)print("-------r3----------")r3 = session.query(Users).filter(Users.name == "099099").all()for i in r3:print(i.name)print("-------r4----------")
r4 = session.query(Users).filter_by(name='099099').all()for i in r4:print(i.name)print("-------r5----------")
r5 = session.query(Users).filter_by(name='099099').first()print(r5.name)print("-------r6----------")
r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=5, name='099099').order_by(Users.id).all()
for i in r6:print(i.name)print("-------r7----------")
r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='099099').all()
for i in r7:print(i.name)session.close()

其他

import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationshipBase = declarative_base()class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)age = Column(Integer, default=18)email = Column(String(32), unique=True)ctime = Column(DateTime, default=datetime.datetime.now)extra = Column(Text, nullable=True)__table_args__ = (# UniqueConstraint('id', 'name', name='uix_id_name'),# Index('ix_id_name', 'name', 'extra'),)class Hosts(Base):__tablename__ = 'hosts'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)ctime = Column(DateTime, default=datetime.datetime.now)from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import textengine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)session = Session()################################   条件###############################
ret = session.query(Users).filter(Users.id > 1, Users.name == 'safly').all()ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == '099099').all()ret = session.query(Users).filter(Users.id.in_([1, 3, 4, 10])).all()ret = session.query(Users).filter(~Users.id.in_([1, 3, 4])).all()ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='099099'))).all()from sqlalchemy import and_, or_ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'xiaowan')).all()ret = session.query(Users).filter(or_(Users.id > 11, Users.name == 'eric')).all()ret = session.query(Users).filter(or_(Users.id > 2,and_(Users.name == 'eric', Users.id > 3),Users.extra != "")).all()# ################################  通配符###############################
ret = session.query(Users).filter(Users.name.like('09%')).all()ret = session.query(Users).filter(~Users.name.like('09%')).all()#
# ############################### 切片###############################
ret = session.query(Users)[1:4]################################  排序###############################
ret = session.query(Users).order_by(Users.name.desc()).all()ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()# ############################### 分组###############################
from sqlalchemy.sql import funcret = session.query(Users).group_by(Users.age).all()ret = session.query(func.max(Users.id),func.sum(Users.id),func.min(Users.id)).group_by(Users.name).all()ret = session.query(func.max(Users.id),func.sum(Users.id),func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) > 2).all()############################### 连表###############################
# (<__main__.Users object at 0x05934670>, <__main__.Hosts object at 0x05934530>)
# (<__main__.Users object at 0x05934610>, <__main__.Hosts object at 0x05934650>)
ret = session.query(Users, Hosts).filter(Users.id == Hosts.id).all()# for i in ret:
#     print(i)# ??
ret = session.query(Users).join(Hosts).all()
print(ret)
# ret = session.query(Users).join(Hosts, isouter=True).all()################################  组合###############################
# [('safly',), ('xiaowan',), ('sdfsf',), ('werwrw',), ('www',), ('wwerwerw',)]
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Hosts.name).filter(Hosts.id < 4)
ret = q1.union(q2).all()
print(ret)# [('safly',), ('xiaowan',), ('sdfsf',), ('werwrw',), ('www',), ('wwerwerw',), ('safly',)]
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Hosts.name).filter(Hosts.id < 4)
ret = q1.union_all(q2).all()
print(ret)session.close()

这篇关于python---sqlalchemy(一)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1125141

相关文章

基于Python开发Windows屏幕控制工具

《基于Python开发Windows屏幕控制工具》在数字化办公时代,屏幕管理已成为提升工作效率和保护眼睛健康的重要环节,本文将分享一个基于Python和PySide6开发的Windows屏幕控制工具,... 目录概述功能亮点界面展示实现步骤详解1. 环境准备2. 亮度控制模块3. 息屏功能实现4. 息屏时间

Python如何去除图片干扰代码示例

《Python如何去除图片干扰代码示例》图片降噪是一个广泛应用于图像处理的技术,可以提高图像质量和相关应用的效果,:本文主要介绍Python如何去除图片干扰的相关资料,文中通过代码介绍的非常详细,... 目录一、噪声去除1. 高斯噪声(像素值正态分布扰动)2. 椒盐噪声(随机黑白像素点)3. 复杂噪声(如伪

Python中图片与PDF识别文本(OCR)的全面指南

《Python中图片与PDF识别文本(OCR)的全面指南》在数据爆炸时代,80%的企业数据以非结构化形式存在,其中PDF和图像是最主要的载体,本文将深入探索Python中OCR技术如何将这些数字纸张转... 目录一、OCR技术核心原理二、python图像识别四大工具库1. Pytesseract - 经典O

基于Linux的ffmpeg python的关键帧抽取

《基于Linux的ffmpegpython的关键帧抽取》本文主要介绍了基于Linux的ffmpegpython的关键帧抽取,实现以按帧或时间间隔抽取关键帧,文中通过示例代码介绍的非常详细,对大家的学... 目录1.FFmpeg的环境配置1) 创建一个虚拟环境envjavascript2) ffmpeg-py

python使用库爬取m3u8文件的示例

《python使用库爬取m3u8文件的示例》本文主要介绍了python使用库爬取m3u8文件的示例,可以使用requests、m3u8、ffmpeg等库,实现获取、解析、下载视频片段并合并等步骤,具有... 目录一、准备工作二、获取m3u8文件内容三、解析m3u8文件四、下载视频片段五、合并视频片段六、错误

Python中提取文件名扩展名的多种方法实现

《Python中提取文件名扩展名的多种方法实现》在Python编程中,经常会遇到需要从文件名中提取扩展名的场景,Python提供了多种方法来实现这一功能,不同方法适用于不同的场景和需求,包括os.pa... 目录技术背景实现步骤方法一:使用os.path.splitext方法二:使用pathlib模块方法三

Python打印对象所有属性和值的方法小结

《Python打印对象所有属性和值的方法小结》在Python开发过程中,调试代码时经常需要查看对象的当前状态,也就是对象的所有属性和对应的值,然而,Python并没有像PHP的print_r那样直接提... 目录python中打印对象所有属性和值的方法实现步骤1. 使用vars()和pprint()2. 使

使用Python和OpenCV库实现实时颜色识别系统

《使用Python和OpenCV库实现实时颜色识别系统》:本文主要介绍使用Python和OpenCV库实现的实时颜色识别系统,这个系统能够通过摄像头捕捉视频流,并在视频中指定区域内识别主要颜色(红... 目录一、引言二、系统概述三、代码解析1. 导入库2. 颜色识别函数3. 主程序循环四、HSV色彩空间详解

一文深入详解Python的secrets模块

《一文深入详解Python的secrets模块》在构建涉及用户身份认证、权限管理、加密通信等系统时,开发者最不能忽视的一个问题就是“安全性”,Python在3.6版本中引入了专门面向安全用途的secr... 目录引言一、背景与动机:为什么需要 secrets 模块?二、secrets 模块的核心功能1. 基

python常见环境管理工具超全解析

《python常见环境管理工具超全解析》在Python开发中,管理多个项目及其依赖项通常是一个挑战,下面:本文主要介绍python常见环境管理工具的相关资料,文中通过代码介绍的非常详细,需要的朋友... 目录1. conda2. pip3. uvuv 工具自动创建和管理环境的特点4. setup.py5.