SQLAlchemy

发布时间 2023-08-22 20:24:45作者: 星空看海

一 SQLAlchemy介绍和快速使用

SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

# orm 框架---->django orm-->只能用在django中,不能独立使用
# python界的orm框架
	-peewee
    -sqlalchemy:企业级
    -djagno rom
    -Tortoise ORM
    -GINO
    
# go 界orm框架
	-gorm  国人写的
    -Xorm
    
    
# java界orm框架
	-ssh 框架springmvc  structs   Hibernate(java的orm框架)
    -ssh  spring    springmvc   Hibernate
    -ssm  Spring    SpringMVC    MyBatis (orm框架)
    -springboot :sb框架 --->java工程师就是spring工程师
    -spring cloud
    
    
# 分层:
Engine,框架的引擎
Connection Pooling ,数据库连接池
Dialect,选择连接数据库的DB API种类(sqlite,mysql...)
Schema/Types,架构和类型
SQL Exprression Language,SQL表达式语言

#操作不同数据库
MySQL-Python
    mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
    
pymysql
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
    
MySQL-Connector
    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
    
cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
    
更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html


# 了解
	orm不能创建数据库--->只能创建表,删除表--->sqlalchemy不能增加删除字段-->借助于第三方插件实现

1.1 sqlalchemy的原生操作

import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine

# 第一步:创建engine对象
engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/cars?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# 第二步:通过engine获得链接
conn=engine.raw_connection()
cursor = conn.cursor()
cursor.execute(
    "select * from news"
)
result = cursor.fetchall()
print(result)
cursor.close()
conn.close()

二 sqlalchemy通过orm创建表删除表

sqlalchemy.py

import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
from models import Base
# 第一步:创建engine对象
engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/db001?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

#1 在数据库中创建表
# Base.metadata.create_all(engine)

# 2 删除表
# Base.metadata.drop_all(engine)

models.py

from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.ext.declarative import declarative_base
import datetime

Base = declarative_base()


# print(type(Base))
class User(Base):
    # 以__开头的是配置
    __tablename__ = 'users'  # 数据库表名称,如果不写,以类名作为表名

    id = Column(Integer, primary_key=True)  # 主键索引,聚簇索引
    name = Column(String(64), index=True, nullable=False)  # name字段加辅助索引
    email = Column(String(32), unique=True)
    # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
    ctime = Column(DateTime, default=datetime.datetime.now())
    extra = Column(Text, nullable=True)

    __table_args__ = (
        UniqueConstraint('id', 'name', name='uix_id_name'), # 联合唯一
        Index('ix_id_name', 'name', 'email'), # 索引
    )

示例

# 1 sqlalchemy,原生操作sql
# 2 sqlalchemy创建表删除表
	Base = declarative_base()
    # print(type(Base))
    class User(Base):
        # 以__开头的是配置
        __tablename__ = 'users'  # 数据库表名称,如果不写,以类名作为表名

        id = Column(Integer, primary_key=True)  # 主键索引,聚簇索引
        name = Column(String(64), index=True, nullable=False)  # name字段加辅助索引
        email = Column(String(32), unique=True)
        # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
        ctime = Column(DateTime, default=datetime.datetime.now())
        extra = Column(Text, nullable=True)

        __table_args__ = (
            UniqueConstraint('id', 'name', name='uix_id_name'), # 联合唯一
            Index('ix_id_name', 'name', 'email'), #索引
        )
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
from models import Base,User
from sqlalchemy.orm import sessionmaker
# 第一步:创建engine对象
engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/db001?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)


# 创建表,删除表
#1 在数据库中创建表
# Base.metadata.create_all(engine)

# 2 删除表
# Base.metadata.drop_all(engine)

##### 操作表中得数据
# 第二步:得到一个session对象--->不是flask的session--->会话--->链接
Session = sessionmaker(bind=engine) # 把引擎传入
session = Session() # 得到session对象

# 第三步:使用session对象操作数据
# 创建对象
user=User(name='lqz',email='33@qq.com')
# 保存到数据库
session.add(user)
session.commit()  # 提交事务
session.close() # 关闭会话

三 scoped_session线程安全

3.1 scoped_session线程对象

# 如果集成到flask中,session会话是要做成全局,还是每个视图函数有自己的一个

# 应该做成,每个视图函数,都新创建一个session对象

# 这样每次都要加括号得到session对象

# scoped_session 全局只有一个session对象,在不同视图函数就用这一个--->保证线程安全
	-做成了每个线程自己一个单独的session对象
    
    
# scoped_session 总结:
	1 以后scoped_session的对象,就像使用Session的对象一样用--->装饰器放进去了
    2 scoped_session 是线程安全的,如何做到的
    	-每个线程自己的一个session对象
        -self.registry = ThreadLocalRegistry(session_factory)
        
    3 t=threading.local()  很神奇,多线程并发操作,不需要加锁,不会出现并发安全问题,每个线程用的都是自己的那个数据
    	-核心原理是:通过线程id号做个区分
        -线程1 t.a=88  内部 --->{线程id号:{a:88}}
        -线程2 t.a=77  内部 --->{线程id号1:{a:88},线程id号2:{a:77}}
     线程1   t.a=100  --->在当前线程中  print(t.a)   --->100
     线程2   t.a=99  --->在当前线程中  print(t.a)    --->99
    不同线程用的都是t对象threading.local(),但是每个线程用自己的数据
import sqlalchemy
from sqlalchemy.orm import scoped_session
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
from models import Base,User
from sqlalchemy.orm import sessionmaker
# 第一步:创建engine对象
engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/db001?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# 第二步:得到一个session对象--->不是flask的session--->会话--->链接
Session = sessionmaker(bind=engine)# 把引擎传入

# session = Session() # 得到session对象
# print(type(session))
session = scoped_session(Session) # 这个session全局用一个即可  :sqlalchemy.orm.scoping.scoped_session
print(type(session))

# 第三步:使用session对象操作数据
# 创建对象
user=User(name='lq12z',email='3333@qq.com')
# 保存到数据库
session.add(user)
session.commit()  # 提交事务
session.close() #关闭会话



# 研究:Session 的区别和联系scoped_session
from sqlalchemy.orm.session import Session
from sqlalchemy.orm.scoping import scoped_session

# scoped_session 没有add方法,调用的时候,会有

3.2 类装饰器

###1  加在类上的装饰器
def auth(func):
    def inner(*args,**kwargs):
        res=func(*args,**kwargs)
        res.add='999'
        return res
    return inner

@auth  # Person=auth(Person)
class Person():
    pass

# 执行:Person()----->在执行--->inner
p=Person()  # inner()  inner的返回值给了p
print(p.add)


# 2 类作为装饰器

四 基本增删查改

4.1 基本增删查改

from sqlalchemy.orm import scoped_session
from sqlalchemy import create_engine
from models import Base, User
from sqlalchemy.orm import sessionmaker

engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/db001?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Session = sessionmaker(bind=engine)  # 把引擎传入
session = Session()
# session = scoped_session(Session) # 这个session全局用一个即可  :sqlalchemy.orm.scoping.scoped_session

# 1 增加数据 add  add_all
# session.add(User(name="xxx",email='55'))
# user1=User(name="123",email='12')
# user2=User(name="44",email='2323')
# session.add_all([user1,user2])

# 2 查  filter传的是表达式,filter_by传的是参数
# all()  出来的是列表--->不是qs对象,没有这个东西
# 想拿单条 .first()
# select * from User where User.id>2 limit 1;
# res=session.query(User).filter(User.id>2).first()
# res=session.query(User).filter(User.name=='lqz').all()
# res=session.query(User).filter_by(id=3).all()
# res=session.query(User).filter_by(name='lqz').all()
# print(res)


# 删除
# delete * from User where id >6;
# res = session.query(User).filter(User.id >= 6).delete()
# print(res)  # 影响的行数

# 不能删除,没有方法
# user=session.query(User).filter(User.id == 5).first()
# user.delete()  # 它没有单独删对象的



# 改
# res=session.query(User).filter(User.id > 0).update({"name" : "lqz"})
#类似于django的F查询
# session.query(User).filter(User.id > 0).update({User.name: User.name + "099"}, synchronize_session=False) # 字符串相加
# session.query(User).filter(User.id > 0).update({"age": User.age + 1}, synchronize_session="evaluate") # 数字相加
# 查到单个对象,修改属性-->add增加进去--->修改?   add 只要有id,就是修改
res=session.query(User).filter(User.id == 1).first()
# print(res)
res.name='yyyy'
session.add(res)


session.commit()  # 提交事务
session.close()  # 关闭会话

五 表关系:一对多(一对一)

5.1 表模型

# 一对一
# 一对多
# 多对多
----都是外键关系-----  一对一其实是 一对多 的一种特例
class Hobby(Base):
    __tablename__ = 'hobby'
    id = Column(Integer, primary_key=True)
    caption = Column(String(50), default='篮球')

    def __str__(self):
        return self.caption

    def __repr__(self):
        return self.caption


class Person(Base):
    __tablename__ = 'person'
    id = Column(Integer, primary_key=True)  # 不会自动生成id
    name = Column(String(32), index=True, nullable=True)
    # hobby指的是tablename而不是类名
    # 一对多关系一旦确立,关联关系写在多的一方--->物理外键
    hobby_id = Column(Integer, ForeignKey("hobby.id"))

    # 跟数据库无关,不会新增字段,只用于快速链表操作
    # 类名,backref用于反向查询
    hobby = relationship('Hobby', backref='pers')  # 以后 person.hobby 就是hobby对象

    def __str__(self):
        return self.name

    def __repr__(self):
        return self.name

5.2 新增和基于对象的查询

import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
from models import Base, User, Person, Hobby
from sqlalchemy.orm import sessionmaker

# 第一步:创建engine对象
engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/db001?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)  # 把引擎传入
session = Session()

# 1 增加数据方式一
# session.add(Hobby())
# session.add(Person(name='彭于晏',hobby_id=1))

# session.add(Person(name='彭于晏',hobby_id=2))  # 报错
# 2 增加数据方式二
# session.add(Person(name='彭于晏', hobby=Hobby(caption='足球')))  # 新增hobby和person


# 3 查询

# 通过hobby查询 person     反
hobby = session.query(Hobby).filter_by(id=2).first()

print(hobby)
# 所有喜欢足球的人 relationship('Hobby', backref='pers')   按 backref
print(hobby.pers)

# 4 修改 和删除 跟之前一样


# 通过person查询hobby      正
# p = session.query(Person).filter_by(id=1).first()
# print(p)  # 获取彭于晏的爱好--->正向查询按字段
# print(p.hobby_id)
# print(p.hobby)
# print(p.hobby.caption)

session.commit()
session.close()

六 表关系:多对多

6.1 表模型

# 多对多关系
# 中间表  手动创建
class Boy2Girl(Base):
    __tablename__ = 'boy2girl'
    id = Column(Integer, primary_key=True, autoincrement=True)
    girl_id = Column(Integer, ForeignKey('girl.id'))
    boy_id = Column(Integer, ForeignKey('boy.id'))


class Girl(Base):
    __tablename__ = 'girl'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)

    def __str__(self):
        return self.name

    def __repr__(self):
        return self.name


class Boy(Base):
    __tablename__ = 'boy'
    id = Column(Integer, primary_key=True, autoincrement=True) #autoincrement 默认就是true
    name = Column(String(64), unique=True, nullable=False)

    # 就是咱们之前的ManyToMany,不会在表中生成字段--->因为它是个表---->这个字段可以放在Girl表
    girls = relationship('Girl', secondary='boy2girl', backref='boys')

    def __str__(self):
        return self.name

    def __repr__(self):
        return self.name

6.2 增加和基于对象的跨表查询

import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
from models import Base, User, Person, Hobby,Girl,Boy,Boy2Girl
from sqlalchemy.orm import sessionmaker

# 第一步:创建engine对象
engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/db001?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)  # 把引擎传入
session = Session()

# 增加记录
# 新增
# 1 笨办法新增
# girl=Girl(name='刘亦菲')
# boy=Boy(name='彭于晏')
# session.add_all([girl,boy])
# 操作中间表(纯手动操作中间表)
# session.add(Boy2Girl(girl_id=1,boy_id=1))


# 2 使用relationship
# boy = Boy(name='lqz') # 增加了一个boy
# boy.girls = [Girl(name='迪丽热巴'), Girl(name='景田')]  # 增加了俩girl
# #给这一个boy,增加了两条约会记录
# session.add(boy)


##查询
# 基于对象的跨表查询
# 正向
# boy = session.query(Boy).filter(Boy.id==2).first()
# print(boy.girls)

# 反向
girl = session.query(Girl).filter(Girl.id==2).first()
print(girl.boys)

session.commit()
session.close()