最新公告
  • 欢迎您光临起源地模板网,本站秉承服务宗旨 履行“站长”责任,销售只是起点 服务永无止境!立即加入钻石VIP
  • Helo 快速上手指南

    正文概述 掘金(at7h)   2020-11-23   921

    Helo 是本人业余开发的一个简单小型低级别的异步(asyncio) Python ORM。它几乎没有什么概念,非常直白,容易上手使用。

    Helo 可以在你的异步应用中帮助你轻松的构建出富有表达力的常用 SQL 语句,你只需以友好的对象化 API 来操作数据,而不用关心 SQL 语句编写、数据处理等细节。

    开始使用

    安装 helo 可以通过以下两种方式:

    1. 安装 pypi 的稳定发布版本, 在终端中运行此命令:

    $ pip install helo
    

    2. 从最新的源码安装

    $ git clone https://github.com/at7h/helo.git
    $ cd helo
    $ python setup.py install
    

    安装完成后就开始下面的入坑之旅了 ? 。


    使用 helo, 首先你需要引入 helo 并使用 helo.G 实例化一个全局变量,假定称其为 db:

    import helo
    
    db = helo.G()
    

    db 是一个全局单例对象,下面的介绍中我们将多次使用到它。

    模型声明

    使用 helo 声明模型很简单,只需从 helo.Model 继承即可。下面给出几个模型声明的简单的例子:

    class Person(helo.Model):
        id = helo.BigAuto()
        name = helo.VarChar(length=45, null=False)
    
    class User(Person):
        email = helo.Email(default='')
        password = helo.VarChar(length=100, null=False)
        create_at = helo.Timestamp(default=helo.ON_CREATE)
    
        class Meta:
            indexes = [helo.K('idx_ep', ['email', 'password'])]
    
    class Employee(Person):
        department = helo.Smallint()
        salary = helo.Float(default=0)
    
    class Post(helo.Model):
        id = helo.Auto(comment='auto increment pk')
        title = helo.VarChar(length=100)
        content = helo.Text(encoding=helo.ENCODING.UTF8MB4)
        author = helo.Int(default=0)
        create_at = helo.Timestamp(default=helo.ON_CREATE)
        update_at = helo.Timestamp(default=helo.ON_UPDATE)
    
        class Meta:
            indexes = [
                helo.K('idx_title', 'title'),
                helo.K('idx_author', 'author'),
            ]
    

    内部类 Meta 可用于指定 db_name, table_name, engine, indexes, charset, commentTable 的元属性。

    class Meta:
        db = 'db_name'
        name = 'table_name'
        engine = helo.ENGINE.innodb
        charset = helo.ENCODING.utf8mb4
        indexes = []
        comment = 'table comment'
    

    其中 table_name 默认为 model 类名的 snake_case 风格名称,engine 默认为 InnoDBcharset 默认为 utf8mb4

    建立连接

    前面的模型声明只是定义了模型与真实表结构的映射关系,并非实际在数据库中创建了这些表结构。为此,我们需要先使用 helo 来与数据库建立连接,这里我们创建一个 MySQL 的数据库实例:

    >>> await db.bind('mysql://user:pwd@localhost:3306/helo')
    

    或者传递配置参数:

    >>> await db.bind(user='user', password='pwd', db='helo')
    

    如果你设置了环境变量 HELO_DATABASE_URL,那么你不用再传递 url:

    >>> await db.bind()
    

    bind 实际上为我们创建了一个数据库连接池:

    >>> db.state
    {'minsize': 1, 'maxsize': 15, 'size': 1, 'freesize': 1}
    

    bind 给我们提供了很多关键字参数来允许我们自定义设置,详见 helo.db.Pool 类。例如:

    >>> await db.bind('mysql://user:pwd@127.0.0.1:3306/db', maxsize=10, connect_timeout=15)
    

    已经创建的连接池对象将是一个全局的单例对象,也就是说如果你已经为你的应用程序调用 bind 绑定了数据库,在此之前如果你没有使用 unbind 进行解绑,你将不能再继续使用 bind 再次绑定另一个数据库,否则你将会得到一个 helo.err.DuplicateBinding 错误。

    如果你需要显式地断开与数据库的连接,关闭连接池,可以使用 unbind:

    >>> await db.unbind()
    

    在小型的脚本中你可以使用 db.binder 来自动处理上下文:

    >>> async with db.binder():
    ...     pass
    

    数据操作

    与数据库建立了连接之后,我们需要在数据库创建我们的表,以便于接下来进行数据的操作。

    在真正的应用中,数据库表的设计创建与维护是单独分开,一般由专门的 DBA 来管理。当然 helo 也提供了基础的 DDL 支持。

    下面我们在数据库创建它们:

    >>> await db.create_tables([User, Employee, Post])
    

    在应用项目中,我们通常将所有的模型声明单独放在一个模块中,在此假设模块名为 models,则可以使用 create_all 为模块中所有的 model 创建表:

    >>> from your.application import models
    >>> await db.create_all(models)
    

    当然你也可以使用 Model 的方法来单独创建:

    >>> await User.create()
    

    Helo 提供了基本的操作数据库中数据的能力,支持丰富的可组合的逻辑运算表达,你可以轻松的完成富有表达力的 queries,以实现通过对象化的 API 来构建你想要的 SQL 语句(DML 和 DQL)的能力。

    下面示例基本的增删改查的操作。

    使用 helo 你可以有多种插入数据的方式选择,我们从创建一个 User 对象开始:

    user = User(name='at7h', password='1111')
    print(user.name, user.password)  # at7h, 1111
    # Now user.id is None, because it is not saved to the database
    assert user.id is None
    

    此时的 user 仅是内存中的一个对象,你需要通过 save 方法持久化到数据库中:

    user_id = await user.save()
    assert user_id == user.id == 1
    

    我们可以修改它,并保存更改:

    user.name = 'at8h'
    user.email = 'g@at7h.com'
    user_id = await user.save()
    assert user_id == user.id == 1
    

    推荐使用下面几种方式来插入数据。

    方法 add, madd 可以用来添加单条或多条数据,它们是 insert, minsert 的简单快捷方式:

    user_id = await User.add(name='bobo', password='2222')
    # Or: user_id = await User.add({'name': 'bobo', 'password': '2222'})
    print(user_id)  # 2
    
    users = [{'name': 'mingz', 'password': '3333'},
             {'name': 'xy69z', 'password': '4444'}]
    # Or using user object list:
    # users = [User(name='mingz', password='3333'),
    #          User(name='xy69z', password='4444')]
    count = await User.madd(users)
    print(count)  # 2
    

    方法 insertminsert 是最正确的数据插入姿势,它们可以胜任多种数据形式,它们将返回一个 Insert 对象,要执行此操作,请不要忘了写 do() 哦 ?:

    ret = await User.insert(name='poper', password='5555').do()
    # Or: ret = await User.insert({'name': 'bingo', 'password': '8888'}).do()
    assert ret.affected == 1
    assert ret.last_id == 5
    print(ret)  # (1, 5)
    
    # Inserting multiple
    employees = [
        {'name': 'at7h', 'department': 1},
        {'name': 'bobo', 'department': 2},
    ]
    ret = await Employee.minsert(employees).do()
    print(ret)  # (2, 1)
    
    # Specify row tuples columns the tuple values correspond to
    posts = [
        ('post1', 1),
        ('post2', 2),
    ]
    ret = await Post.minsert(
        posts, columns=[Post.title, Post.author]
    ).do()
    print(ret)  # (2, 1)
    

    使用 insert_from 支持表间数据填充:

    select = User.select(User.name).where(User.id.in_([3, 4, 5]))
    ret = await Employee.insert_from(select, [Employee.name]).do()
    print(ret)  # (3, 3)
    

    Helo 也有多种获取数据的方式选择,如简单获取单条数据可以使用 get 方法:

    # By id
    user = await User.get(1)
    assert isinstance(user, User)
    print(user.id, user.name, user.password)  # 1, at7h, 1111
    
    # Or by query
    assert (await User.get(User.name == user.name)) == user
    

    获取多条数据可以使用 mget 方法:

    # By id list
    uid_list = [1, 2, 3]
    users = await User.mget(uid_list)
    print(users.count)  # 3
    print(users)  # [<User object at 1>, <User object at 2>, <User object at 3>]
    # Specify columns
    users = await User.mget(uid_list, columns=[User.id, User.name])
    assert users[0].password is None
    
    # Or by query
    users = await User.mget((User.id < 2) | (User.name == 'mingz'))
    print(users)  # [<User object at 1>, <User object at 3>]
    

    同样的,方法 getmget 也是 select 的简单快捷版本,其只适合于已知主键或查询条件比较简单的场景,更多的时候我们还是需要使用 select

    使用 select 方法可以帮助你以对象化 API 的方式轻松的构造你的 DQL,其支持丰富的可组合的逻辑条件表达式。

    users = await User.select().order_by(
        User.id.desc()
    ).limit(3).offset(2).all()
    print(users) # [<User object at 5>, <User object at 4>, <User object at 3>]
    

    比如我需要知道有没有使用 gmail 邮箱的用户:

    is_exist = await User.select().where(
        User.email.endswith('gmail.com')
    ).exist()
    print(is_exist)  # False
    

    比如我想知道 2019 年 7 月以来共新增了多少用户:

    user_count = await User.select().where(
        User.create_at > datetime(2019, 7, 1)
    ).count()
    print(user_count)  # 4
    

    再比如我们需要分页的获取今年写了 Python(title) 相关文章的用户:

    users = await User.select().where(
        User.id.in_(
            Post.select(Post.author).where(
                Post.update_at > datetime(2019, 1, 1),
                Post.title.contains('Python')
            ).order_by(
                Post.update_at.desc()
            )
        )
    ).paginate(1, 10)
    print(users)  # [<User object at 1>]
    

    再比如我们想知道每个用户都写了多少篇文章:

    user_posts = await User.select(
        User.name, helo.F.COUNT(helo.SQL('1')).as_('posts')
    ).join(
        Post, helo.JOINTYPE.LEFT, on=(User.id == Post.author)
    ).group_by(
        User.name
    ).rows(100)
    print(user_posts)  # [{'name': 'at7h', 'posts': 1}]
    

    如上所示,我们可以通过 helo.F 来使用 SQL 函数,比如我需要计算出每个月所有雇员薪资的总和:

    salary_sum = await Employee.select(
        helo.F.SUM(Employee.salary).as_('salary_sum')
    ).scalar()
    print(salary_sum)  # 30000.0
    

    接下来,让我们尝试对数据库中的数据做一些修改操作。

    比如你要更改某一位雇员的薪资 ?:

    ret = await Employee.update(salary=20000).where(
        Employee.name == 'at7h'
    ).do()
    print(ret.affected)  # 1
    

    或者,整体涨工资啦 ?:

    ret = await Employee.update(
        salary=Employee.salary + 1000
    ).where(
        (Employee.department.in_([1, 2])) | (Employee.name == 'at7h')
    ).do()
    

    最后我们来尝试删除表中的数据。

    第一种方式,你可以使用 model 对象的 remove 方法来删除它对应于数据库中这一行的数据:

    user = User(name='at7h', password='1111')
    await user.save()
    user = await User.get(user_id)
    print(user.id)  # 1
    await user.remove()
    user = await User.get(user_id)
    print(user)  # None
    

    另一种更为通常的方式是使用 delete 方法:

    ret = await Post.delete().where(
        Post.create_at < datetime(2010, 1, 1)
    ).limit(
        100
    ).do()
    

    Replace

    另外,helo 支持 MySQL REPLACE 语句,提供了 replacemreplace 两个方法,其用法与 insertminsert 类似。当然,在使用它们之前你需要了解 MySQL REPLACE 语句的工作原理。

    Quart 应用

    如果你正在使用 quart, 一个最小的应用示例是:

    import quart
    import helo
    
    app = quart.Quart(__name__)
    app.config["HELO_DATABASE_URL"] = "mysql://user:password@127.0.0.1:3306/db"
    
    db = helo.G(app)
    
    
    @app.route('/api/users')
    async def users():
        await User.insert(
            name='at7h', email='g@test.com', password='xxxx'
        ).do()
        user_list = await User.select().all(False)
        return quart.jsonify(user_list)
    
    
    app.run()
    

    此时你不需要再显示的执行 db.bind,binding 操作将会在你应用的第一个请求之前自动完成。

    启动此服务:

    $ curl http://127.0.0.1:5000/api/users
    [{"email":"g@test.com","id":1,"name":"at7h","password":"xxxx"}]
    

    其他

    Model Iteration

    Helo 中的 ModelSelect 都支持迭代,helo 会自动帮你处理分页问题,以避免频繁的 IO 操作和过大的数据量获取。

    async for post in Post:
        print(post)
    # <Post object at 1>
    # <Post object at 2>
    # <Post object at 3>
    # <Post object at 4>
    
    users = User.select().where(User.id < 5).order_by(User.id.desc())
    async for user in users:
        print(user)
    # <User object at 4>
    # <User object at 3>
    # <User object at 2>
    # <User object at 1>
    

    Row Type

    当你使用 select 获取数据时,helo 默认会将行数据包装成为对应的 Model 对象,但是,当你使用了 helo.F 函数和 join 时可能会放弃加载到 Model 对象而使用原始的 helo.adict 字典。当然,你也可以通过 wrap 参数来显式指定使用字典类型的 row type。在大型项目中,这可能会显著提高速度并减少内存的使用。

    users = await User.select(User.id, User.name).limit(2).all(wrap=False)
    print(users)  # [{'id': 1, 'name': 'at7h'}, {'id': 2, 'name': 'bobo'}]
    assert users[0].name == 'at7h'
    
    employee = await Employee.select().order_by(
        Employee.salary.desc()
    ).first(False)
    print(employee)
    # {'id': 1, 'name': 'at7h', 'department': 1, 'salary': 15000.0}
    

    SQL

    执行 SQL

    有时你可能迫不得已想要执行一些原始的 SQL 语句,那么你可以使用 db.raw 函数来实现。

    await db.raw("SELECT * FROM `user` WHERE `id` < %s;", params=[10])
    

    查看 SQL

    为方便调试,有时候需要查看执行的 SQL,在 helo 中,你可以:

    第一种方式,在初始化 db 时,设置 debugTrue 即可,这样你将在日志输出中看到执行过的所有 SQL 语句。

    db = helo.G(debug=True)
    

    第二种方式主要是方便学习和调试,你可以使用 repr 函数(或在 REPR 环境中)和 str 函数来查看 Insert, Update, Select 等对象,我们拿上面的示例来举个例子:

    >>> q1 = Employee.update(
    ...     salary=Employee.salary + 1000
    ... ).where(
    ...    (Employee.department.in_([1, 2])) | (Employee.name == 'at7h')
    ... )
    >>> q1
    Query(UPDATE `employee` SET `salary` = (`salary` + %s) WHERE ((`department` IN %s) OR (`name` = %s)); % ((1000.0,), (1, 2), 'at7h'))
    
    >>> q2 = User.select(
    ...     User.name, helo.F.COUNT(helo.SQL('1')).as_('posts')
    ... ).join(
    ...     Post, helo.JOINTYPE.LEFT, on=(User.id == Post.author)
    ... ).group_by(
    ...     User.name
    ... )
    >>> print(q2)
    SELECT `t1`.`name`, COUNT(1) AS `posts` FROM `user` AS `t1` LEFT JOIN `post` AS `t2` ON (`t1`.`id` = `t2`.`author`) GROUP BY `t1`.`name`; % ()
    

    本篇上手指南就到此结束。

    十分欢迎大家的使用,有任何问题可随时与我交流或到 项目仓库 反馈,欢迎以任何形式提出任何问题或建议。

    感谢 ?


    同名公众号:


    起源地下载网 » Helo 快速上手指南

    常见问题FAQ

    免费下载或者VIP会员专享资源能否直接商用?
    本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担。更多说明请参考 VIP介绍。
    提示下载完但解压或打开不了?
    最常见的情况是下载不完整: 可对比下载完压缩包的与网盘上的容量,若小于网盘提示的容量则是这个原因。这是浏览器下载的bug,建议用百度网盘软件或迅雷下载。若排除这种情况,可在对应资源底部留言,或 联络我们.。
    找不到素材资源介绍文章里的示例图片?
    对于PPT,KEY,Mockups,APP,网页模版等类型的素材,文章内用于介绍的图片通常并不包含在对应可供下载素材包内。这些相关商业图片需另外购买,且本站不负责(也没有办法)找到出处。 同样地一些字体文件也是这种情况,但部分素材会在素材包内有一份字体下载链接清单。
    模板不会安装或需要功能定制以及二次开发?
    请QQ联系我们

    发表评论

    还没有评论,快来抢沙发吧!

    如需帝国cms功能定制以及二次开发请联系我们

    联系作者

    请选择支付方式

    ×
    迅虎支付宝
    迅虎微信
    支付宝当面付
    余额支付
    ×
    微信扫码支付 0 元