SQLAlchemy是一个常用的数据库抽象层和数据库关系映射包(ORM),通常通过安装Flask-Alchemy的方式来安装Alchemy。
在venv虚拟环境中输入pip install Flask-SQLAlchemy
即可安装Flask-Alchemy。
在使用SQLAlchemy之前,首先需要先创建一个数据库,这里命名为db_flask,然后创建一个Python文件来编写代码。大体步骤为先在全局配置中配置相关属性,然后实例化Alchemy类,最后使用create_all()方法创建数据库。
python 代码:from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
# 配置相关属性
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:254456@localhost/db_flask'
# 实例化Alchemy类
db = SQLAlchemy(app)
# 创建数据库模型类
class User(db.Model):
id = db.Column(db.Integer, autoincrement=True, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
def __repr__(self):
return '<User %r>' % self.username
if __name__ == '__main__':
with app.app_context():
# 在应用程序上下文中执行需要的操作
db.create_all()
注:由于我在这里执行时出现了异常,因此最后一段代码使用了app.app_context()
来限定app的上下文。
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
表示令Flask框架跟踪对象的修改并且发送信号,这将需要额外的内存。app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:254456@localhost/db_flask'
表示用于连接的数据库,如:
sqlite:////temp/test.db
mysql+pymysql://root:password@server/db
接下来,实例化SQLAlchemy类并赋值给db变量,然后创建数据库模型类User类,它需要继承自db.Model类。类属性就代表数据表中的字段。如id字段使用db.Integer代表整型值,用primary_key=True
代表设置id为主键;username使用db.String(80)代表长度为80的字符串,使用unique=True代表设置该字段唯一,并且使用nullable表示字段非空。最后使用db.create_all()方法创建数据库。
使用SQLAlchemy还可以定义数据关系。数据表之间的关系通常包含一对一,一对多,多对多,如一个用户可以编写多个文章,下面就将使用这个关系做示范,演示如何使用SQLAlchemy定义这种关系。
python 代码:from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:254456@localhost/db_flask'
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
articles = db.relationship('Articles')
def __repr__(self):
return '<User %r>' % self.username
class Article(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(80), index=True)
content = db.Column(db.Text)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
def __repr__(self):
return '<User %r>' % self.username
if __name__ == '__main__':
with app.app_context():
db.create_all()
在上述代码中,User类(一对多关系的“一”)添加了一个articles属性,但是这个属性没有用Column声明为列,而是使用db.relationship()来定义关系属性,relationship()参数是另一侧的类名称,当调用User.articles时返回多个记录,也就是该用户所有的文章。
在Article类(一对多关系的“多”)添加了一个user_id属性,通过使用db.ForeignKey()将其设置为外键,它的参数user.id就是上面User类对应的user表中的主键id。
重新运行这个文件就可以看到重新创建的表了。
SQLAlchemy常用功能总结
以下内容来自SQLAlchemy官方文档。
创建模型以及获取句柄
python 代码:Create_Add_Date.py from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Sequence, ForeignKey from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, relationship engine = create_engine('sqlite:test.db', echo=True) # engine = create_engine('sqlite:///:memory:', echo=True) # 创建基类 Base = declarative_base() # 创建映射 class User(Base): __tablename__ = 'users' id = Column(Integer, Sequence('user_id_seq'), primary_key=True) name = Column(String) gender = Column(String) age = Column(Integer) # 设置级联关系 addresses = relationship("Address", bake_queries="user", cascade="all, delete, delete-orphan") def __repr__(self): return str(self.id)+"|"+self.name + "|" + self.gender + "|" + str(self.age) class Address(Base): __tablename__ = 'addresses' id = Column(Integer, primary_key=True) email_address = Column(String, nullable=False) # 设置外键 user_id = Column(Integer, ForeignKey("users.id")) # 连接到User,引用名为addresses user = relationship("User", back_populates="addresses") def __repr__(self): return self.email_address + '|' + str(self.user_id) # 指定外键,创建数据库 # User.addresses = relationship("Address", order_by=Address.id, back_populates="user") # Base.metadata.create_all(engine) # 拿到句柄 Session = sessionmaker() Session.configure(bind=engine) if __name__ == '__main__': User.addresses = relationship("Address", order_by=Address.id, back_populates="user") # 生成实例 users = [ User(name="张三", gender="女", age=10, addresses=[ Address(email_address="admin123@1.com"), Address(email_address="12@1.com") ]), User(name="王五", gender="男",age=12, addresses=[ Address(email_address="1@1.com"), Address(email_address="ww@1.com") ]), User(name="张柳", gender="女", age=58, addresses=[ Address(email_address="zl@1.com"), Address(email_address="zl.2@1.163") ]), User(name="德玛西亚", gender="男", age=66,addresses=[ Address(email_address="admin123@1.163"), Address(email_address="12@1.162") ]), User(name="码流", gender="女", age=66,addresses=[ Address(email_address="admin123@1.com"), Address(email_address="12@1.126") ]), User(name="王麻子", gender="男", age=66,addresses=[ Address(email_address="admin123@1.com"), Address(email_address="11@1.com") ]), User(name="张三", gender="女", age=38,addresses=[ Address(email_address="12345@1.com"), Address(email_address="12@1.com") ]), User(name="德玛西亚", gender="女", age=66,addresses=[ Address(email_address="ad12313min123@1.com"), Address(email_address="12@1.com") ]), ] # user = User(name="张三", gender="女", age=10) # user.addresses = [ # Address(email_address="1@1") # ] # # 创建一个会话 session = Session() # # # 添加数据 # session.add(user) session.add_all(users) # user_resp = session.query(User).filter_by(name="张三").first() # user_resp.address = "德国" # # 将数据库的更改进行提交 session.commit() # # print(user_resp)
按条件查询,以及简单的过滤数据
python 代码:from SQLAlchemy.Create_Add_Date import Session, User from sqlalchemy import and_, or_, func session = Session() # for instance in session.query(User).order_by(User.id): # print(instance.name, instance.address) # for instance in session.query(User.name, User.gender): # print(instance.name, instance.gender) # for row in session.query(User, User.name).all(): # print(row.User, row.name) # 自定义列名 # for row in session.query(User.name.label("name_col")).all(): # print(row.name_col) # 切片排序 # for u in session.query(User).order_by(User.id)[1:2]: # print(u) # 过滤 # for u in session.query(User).filter_by(name="德玛西亚"): # print(u) # 条件过滤--------------------------------------- # for u in session.query(User).filter(User.name=='张三'): # print(u) # 不区分大小写 # for u in session.query(User).filter(User.name.comparator().ilike("%三")): # print(u) # # 可能区分大小写 # for u in session.query(User).filter(User.name.like("%三")): # print(u) # in or not in 包含 # for u in session.query(User).filter(User.name.in_(['张三', '李四'])): # print(u) # # # 多重过滤(and)----------------------------------------------- # for u in session.query(User).filter(and_(User.name == "张三", User.gender=="女")): # print(u) # # for u in session.query(User).filter(User.name == "张三", User.gender=="女"): # print(u) # # # 也可连写多个filter来进行多重过滤 # # # 多重过滤 or # for u in session.query(User).filter(or_(User.name == "张三", User.gender=="男")): # print(u) # 计数 # print(session.query(User).filter(or_(User.name == "张三", User.gender=="男")).count()) # print(session.query(func.count(User.id)).scalar()) # 分组并统计计数 print(session.query(func.count(User.id), User.name).group_by(User.name).all())
一对多Join查询
python 代码:from SQLAlchemy.Create_Add_Date import User, Address, Session from sqlalchemy.orm import relationship session = Session() User.addresses = relationship("Address", order_by=Address.id, back_populates="user") # 普通连接查询 # for u, a in session.query(User, Address)\ # .filter(User.id == Address.user_id)\ # .filter(Address.email_address=="12@1.com"): # print(u, a) # for u in session.query(User).join(Address, User.id==Address.user_id).all(): # print(u) for u , a in session.query(User, Address).join(Address, User.addresses).all(): print(u, a) # # 特殊的查询 # for u , a in session.query(User, Address).join(User.addresses).all(): # print(u, a)
级联删除
python 代码:from SQLAlchemy.Create_Add_Date import Session, User from sqlalchemy import and_, or_, func session = Session() user = session.query(User).get(1) # 删除user session.delete(user) session.commit()