网站LOGO
博客 | 棋の小站
页面加载中
12月6日
达尔达尼亚瀑布,博洛尼亚,意大利 ...
网站LOGO 博客 | 棋の小站
记录学习,心得,状态,生活。
菜单
  • 博客 | 棋の小站
    记录学习,心得,状态,生活。
    用户的头像
    首次访问
    上次留言
    累计留言
    我的等级
    我的角色
    打赏二维码
    打赏博主
    Flask拓展框架1——Flask-SQLAlchemy
    点击复制本页地址
    微信扫一扫
    文章二维码
    文章图片 文章标题
    创建时间
  • 一 言
    确认删除此评论么? 确认
  • 本弹窗介绍内容来自,本网站不对其中内容负责。
    按住ctrl可打开默认菜单

    Flask拓展框架1——Flask-SQLAlchemy

    · 原创 ·
    学学编程 · FlaskPython
    共 9583 字 · 约 2 分钟 · 153

    SQLAlchemy是一个常用的数据库抽象层和数据库关系映射包(ORM),通常通过安装Flask-Alchemy的方式来安装Alchemy。

    在venv虚拟环境中输入pip install Flask-SQLAlchemy即可安装Flask-Alchemy。

    在使用SQLAlchemy之前,首先需要先创建一个数据库,这里命名为db_flask,然后创建一个Python文件来编写代码。大体步骤为先在全局配置中配置相关属性,然后实例化Alchemy类,最后使用create_all()方法创建数据库。

    python 代码:
    from flask import Flask
    from flask_sqlalchemy import SQLAlchemy
    
    app = Flask(__name__)
    
    # 配置相关属性
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
    app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:254456@localhost/db_flask'
    
    # 实例化Alchemy类
    db = SQLAlchemy(app)
    
    
    # 创建数据库模型类
    class User(db.Model):
        id = db.Column(db.Integer, autoincrement=True, primary_key=True)
        username = db.Column(db.String(80), unique=True, nullable=False)
        email = db.Column(db.String(120), unique=True, nullable=False)
    
        def __repr__(self):
            return '<User %r>' % self.username
    
    
    if __name__ == '__main__':
        with app.app_context():
            # 在应用程序上下文中执行需要的操作
            db.create_all()
    注:由于我在这里执行时出现了异常,因此最后一段代码使用了app.app_context()来限定app的上下文。

    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True表示令Flask框架跟踪对象的修改并且发送信号,这将需要额外的内存。app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:254456@localhost/db_flask'表示用于连接的数据库,如:

    sqlite:////temp/test.db
    mysql+pymysql://root:password@server/db

    接下来,实例化SQLAlchemy类并赋值给db变量,然后创建数据库模型类User类,它需要继承自db.Model类。类属性就代表数据表中的字段。如id字段使用db.Integer代表整型值,用primary_key=True代表设置id为主键;username使用db.String(80)代表长度为80的字符串,使用unique=True代表设置该字段唯一,并且使用nullable表示字段非空。最后使用db.create_all()方法创建数据库。

    使用SQLAlchemy还可以定义数据关系。数据表之间的关系通常包含一对一,一对多,多对多,如一个用户可以编写多个文章,下面就将使用这个关系做示范,演示如何使用SQLAlchemy定义这种关系。

    python 代码:
    from flask import Flask
    from flask_sqlalchemy import SQLAlchemy
    
    app = Flask(__name__)
    
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
    app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:254456@localhost/db_flask'
    
    db = SQLAlchemy(app)
    
    class User(db.Model):
        id = db.Column(db.Integer, primary_key=True)
        username = db.Column(db.String(80), unique=True, nullable=False)
        email = db.Column(db.String(120), unique=True, nullable=False)
        articles = db.relationship('Articles')
    
        def __repr__(self):
            return '<User %r>' % self.username
    
    class Article(db.Model):
        id = db.Column(db.Integer, primary_key=True)
        title = db.Column(db.String(80), index=True)
        content = db.Column(db.Text)
        user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
    
        def __repr__(self):
            return '<User %r>' % self.username
    
    if __name__ == '__main__':
        with app.app_context():
            db.create_all()

    在上述代码中,User类(一对多关系的“一”)添加了一个articles属性,但是这个属性没有用Column声明为列,而是使用db.relationship()来定义关系属性,relationship()参数是另一侧的类名称,当调用User.articles时返回多个记录,也就是该用户所有的文章。

    在Article类(一对多关系的“多”)添加了一个user_id属性,通过使用db.ForeignKey()将其设置为外键,它的参数user.id就是上面User类对应的user表中的主键id。

    重新运行这个文件就可以看到重新创建的表了。

    SQLAlchemy常用功能总结

    以下内容来自SQLAlchemy官方文档。

    创建模型以及获取句柄

    python 代码:
    Create_Add_Date.py
    
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, Sequence, ForeignKey
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker, relationship
    
    engine = create_engine('sqlite:test.db', echo=True)
    # engine = create_engine('sqlite:///:memory:', echo=True)
    
    # 创建基类
    Base = declarative_base()
    
    
    # 创建映射
    class User(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, Sequence('user_id_seq'), primary_key=True)
        name = Column(String)
        gender = Column(String)
        age = Column(Integer)
        # 设置级联关系
        addresses = relationship("Address", bake_queries="user", cascade="all, delete, delete-orphan")
        def __repr__(self):
            return str(self.id)+"|"+self.name + "|" + self.gender + "|" + str(self.age)
    
    
    class Address(Base):
        __tablename__ = 'addresses'
    
        id = Column(Integer, primary_key=True)
        email_address = Column(String, nullable=False)
        # 设置外键
        user_id = Column(Integer, ForeignKey("users.id"))
        # 连接到User,引用名为addresses
        user = relationship("User", back_populates="addresses")
    
        def __repr__(self):
            return self.email_address + '|' + str(self.user_id)
    
    # 指定外键,创建数据库
    # User.addresses = relationship("Address", order_by=Address.id, back_populates="user")
    # Base.metadata.create_all(engine)
    
    
    # 拿到句柄
    Session = sessionmaker()
    Session.configure(bind=engine)
    
    if __name__ == '__main__':
        User.addresses = relationship("Address", order_by=Address.id, back_populates="user")
    
        # 生成实例
        users = [
            User(name="张三", gender="女", age=10, addresses=[
                Address(email_address="admin123@1.com"),
                Address(email_address="12@1.com")
            ]),
            User(name="王五", gender="男",age=12, addresses=[
                Address(email_address="1@1.com"),
                Address(email_address="ww@1.com")
            ]),
            User(name="张柳", gender="女", age=58, addresses=[
                Address(email_address="zl@1.com"),
                Address(email_address="zl.2@1.163")
            ]),
            User(name="德玛西亚", gender="男", age=66,addresses=[
                Address(email_address="admin123@1.163"),
                Address(email_address="12@1.162")
            ]),
            User(name="码流", gender="女", age=66,addresses=[
                Address(email_address="admin123@1.com"),
                Address(email_address="12@1.126")
            ]),
            User(name="王麻子", gender="男", age=66,addresses=[
                Address(email_address="admin123@1.com"),
                Address(email_address="11@1.com")
            ]),
            User(name="张三", gender="女", age=38,addresses=[
                Address(email_address="12345@1.com"),
                Address(email_address="12@1.com")
            ]),
            User(name="德玛西亚", gender="女", age=66,addresses=[
                Address(email_address="ad12313min123@1.com"),
                Address(email_address="12@1.com")
            ]),
        ]
        # user = User(name="张三", gender="女", age=10)
    #     user.addresses = [
    #         Address(email_address="1@1")
    #     ]
        #
        # 创建一个会话
        session = Session()
        #
        # # 添加数据
        # session.add(user)
        session.add_all(users)
    # user_resp = session.query(User).filter_by(name="张三").first()
    # user_resp.address = "德国"
    #
        # 将数据库的更改进行提交
        session.commit()
        # # print(user_resp)

    按条件查询,以及简单的过滤数据

    python 代码:
    from SQLAlchemy.Create_Add_Date import Session, User
    from sqlalchemy import and_, or_, func
    
    
    session = Session()
    
    # for instance in session.query(User).order_by(User.id):
    #     print(instance.name, instance.address)
    
    
    # for instance in session.query(User.name, User.gender):
    #     print(instance.name, instance.gender)
    
    # for row in session.query(User, User.name).all():
    #     print(row.User, row.name)
    
    # 自定义列名
    # for row in session.query(User.name.label("name_col")).all():
    #     print(row.name_col)
    
    # 切片排序
    # for u in session.query(User).order_by(User.id)[1:2]:
    #     print(u)
    
    # 过滤
    # for u in session.query(User).filter_by(name="德玛西亚"):
    #     print(u)
    
    # 条件过滤---------------------------------------
    # for u in session.query(User).filter(User.name=='张三'):
    #     print(u)
    
    # 不区分大小写
    # for u in session.query(User).filter(User.name.comparator().ilike("%三")):
    #     print(u)
    # # 可能区分大小写
    # for u in session.query(User).filter(User.name.like("%三")):
    #     print(u)
    
    # in or not in 包含
    # for u in session.query(User).filter(User.name.in_(['张三', '李四'])):
    #     print(u)
    #
    # # 多重过滤(and)-----------------------------------------------
    # for u in session.query(User).filter(and_(User.name == "张三", User.gender=="女")):
    #     print(u)
    #
    # for u in session.query(User).filter(User.name == "张三", User.gender=="女"):
    #     print(u)
    #
    # # 也可连写多个filter来进行多重过滤
    #
    # # 多重过滤 or
    # for u in session.query(User).filter(or_(User.name == "张三", User.gender=="男")):
    #     print(u)
    
    # 计数
    # print(session.query(User).filter(or_(User.name == "张三", User.gender=="男")).count())
    # print(session.query(func.count(User.id)).scalar())
    # 分组并统计计数
    print(session.query(func.count(User.id), User.name).group_by(User.name).all())

    一对多Join查询

    python 代码:
    from SQLAlchemy.Create_Add_Date import User, Address, Session
    from sqlalchemy.orm import relationship
    
    session = Session()
    User.addresses = relationship("Address", order_by=Address.id, back_populates="user")
    
    # 普通连接查询
    # for u, a in session.query(User, Address)\
    #         .filter(User.id == Address.user_id)\
    #         .filter(Address.email_address=="12@1.com"):
    #     print(u, a)
    
    # for u in session.query(User).join(Address, User.id==Address.user_id).all():
    #     print(u)
    for u , a in session.query(User, Address).join(Address, User.addresses).all():
        print(u, a)
    
    
    # # 特殊的查询
    # for u , a in session.query(User, Address).join(User.addresses).all():
    #     print(u, a)

    级联删除

    python 代码:
    from SQLAlchemy.Create_Add_Date import Session, User
    from sqlalchemy import and_, or_, func
    
    
    session = Session()
    user = session.query(User).get(1)
    
    # 删除user
    session.delete(user)
    
    session.commit()
    声明:本文由 (博主)原创,依据 CC-BY-NC-SA 4.0 许可协议 授权,转载请注明出处。

    还没有人喜爱这篇文章呢

    发一条! 发一条!
    博客logo 博客 | 棋の小站 记录学习,心得,状态,生活。
    ICP 冀ICP备2023007665号 ICP 冀公网安备 13030202003453号

    🕛

    本站已运行 221 天 14 小时 44 分

    👁️

    今日访问量:322 昨日访问量:2564

    🌳

    建站:Typecho 主题:MyLife
    博客 | 棋の小站. © 2023 ~ 2023.
    网站logo

    博客 | 棋の小站 记录学习,心得,状态,生活。
     
     
     
     
    壁纸