BigQuant使用文档

10分钟上手模块开发

由small_q创建,最终由small_q 被浏览 231 用户

这是对bigmodule模块开发流程的简单引导,主要面向新手开发者(需要具有一定的python基础知识)。跟随以下步骤,你将一步步实现可用的简单模块,并逐渐熟悉模块的开发流程。

若想了解更多深入的内容,请随时查阅以下相关文章📄:

\

1. 模块功能设计

在正式开发一个模块之前,我们需要明确待开发的模块应该实现什么功能。作为上手练习,将开发两个模块,数据获取模块数据筛选模块

数据获取模块的功能设计具体如下:

  1. 开发一个名叫 data_get 的模块,可以从指定数据表中读取数据;
  2. 对数据继续封装,使其可以传递给下一个模块。

数据筛选模块的功能设计具体如下:

  1. 开发一个名叫 data_select 的模块,接收来自数据获取模块传出的数据,按指定字段从大到小或从小到大排序,筛选出最大或最小的前n条数据;
  2. 对结果继续封装,使其可以传递给下一个模块;
  3. 在后置运行函数中,打印部分结果。

\

2. 创建数据获取模块模板

具体请参考 📄BigModule简介与入门

进入到线上开发环境中 BigQuant AIStudio,在命令行终端中输入以下命令创建模板。

bq module init data_get

在输入上述命令后,便会开始执行模板创建的流程,在该过程中我们还需要依据提示在命令行终端中输入模块描述。

get data

\

3. 编写数据获取模块

具体请参考 📄BigModule模块模板

找到刚创建出来的模块模板所在的目录:

打开文件 ~/data_get/src/data_get__init__.py

\

3.1 传入参数定义

具体请参考 📄BigModule中的基本类型

定位到 run()函数,可用看到,模板已经为我们生成了一些参数定义的例子。

现在我们需要将其全部删去,定义我们自己的参数,具体需求如下。

传入参数列表

参数 类型 说明
table str 数据表名称
date str 指定获取某一天的数据,格式要求为yyyy-mm-dd,默认为今天
columns str 指定获取的字段,字段间使用逗号隔开,默认为所有字段

实现该定义的代码如下:

from datetime import datetime as dt


_today = dt.today().strftime("%Y-%m-%d")

def run(
	table: I.str(desc="数据表名称"),
    date: I.str(desc="指定获取某一天的数据") = _today,
    columns: I.str(desc="指定获取的字段") = "*",
) -> [
    # TODO
]:
    # TODO
    pass

\

3.2 传出参数定义

传出参数列表

参数 类型 说明
data port(端口) 被封装成DataSource的数据

实现该定义的代码如下:

def run(
	table: I.str(desc="数据表名称"),
    date: I.str(desc="指定获取某一天的数据") = _today,
    columns: I.str(desc="指定获取的字段") = "*",
) -> [
    I.port(desc="输出数据", name="data")
]:
    # TODO
    pass

\

3.3 功能实现

关于数据SQL查询,具体请参考 📄数据平台/DAI

关于DataSource,具体请参考📄DataSource—通用数据类型

代码如下:

import dai

def run(
	table: I.str(desc="数据表名称"),
    date: I.str(desc="指定获取某一天的数据") = _today,
    columns: I.str(desc="指定获取的字段") = "*",
) -> [
    I.port(desc="输出数据", name="data")
]:
    sql = f"SELECT {columns} FROM {table} WHERE date='{date}';"
    df = dai.query(sql).df()
    ds = dai.DataSource.write_bdb(df)
    return I.Outputs(data=ds)

\

3.4 完整代码

完整的代码如下:

"""data_get package.

get data
"""

from datetime import datetime as dt

import dai
from bigmodule import I

# 需要安装的第三方依赖包
# from bigmodule import R
# R.require("requests>=2.0", "isort==5.13.2")

# metadata
# 模块作者
author = "BigQuant"
# 模块分类
category = "示例模块"
# 模块显示名
friendly_name = "data_get"
# 文档地址, optional
doc_url = "https://bigquant.com/wiki/"
# 是否自动缓存结果
cacheable = True


_today = dt.today().strftime("%Y-%m-%d")


def run(
    table: I.str(desc="数据表名称"),
    date: I.str(desc="指定获取某一天的数据") = _today,
    columns: I.str(desc="指定获取的字段") = "*",
)->[
    I.port(desc="输出数据", name="data")
]:
    sql = f"SELECT {columns} FROM {table} WHERE date='{date}';"
    df = dai.query(sql).df()
    ds = dai.DataSource.write_bdb(df)
    return I.Outputs(data=ds)


def post_run(outputs):
    """后置运行函数"""
    return outputs

\

4. 安装数据获取模块

具体请参考 📄BigModule简介与入门

打开命令行终端,进入到模块文件夹的根目录

cd data_get

输入以下命令,在本地安装我们刚刚编写好的模块,这样后续才可以使用:

bq module install --dev

\

5. 创建数据筛选模块模板

请参考2. 创建数据获取模块模板,创建一个新的模块模板,名为data_select。

\

6. 编写数据筛选模块

6.1 传入参数定义

传入参数列表

参数 类型 说明
input_data port(端口) 接收数据,要求为DataSource格式,必选
columns str 在指定字段上执行筛选,字段间以逗号分隔。允许指定多个字段,不允许指定不存在的字段。
top_n int 筛选出值最大或最小的前 n 条数据,最小为1,最大为100,默认为10。
order choice(枚举) 选择最大还是最小,默认值为最大

代码实现如下:

ORDER_MAP = {
    "最大" : False,
    "最小" : True,
}

def run(
	input_data: I.port(desc="待筛选的数据", specific_type_name="DataSource", optional=False),
    columns: I.str(desc="指定字段") = None,
    top_n: I.int(desc="前 n 条数据", min=1, max=100) = 10,
    order: I.choice(desc="选择最大还是最小", values=list(ORDER_MAP.keys())) = "最大",
) -> [
    # TODO
]:
    # TODO
    pass

\

6.2 传出参数定义

传出参数列表

参数 类型 说明
data port(端口) 被封装成DataSource的数据

代码实现如下

def run(
	input_data: I.port(desc="待筛选的数据", specific_type_name="DataSource", optional=False),
    columns: I.str(desc="指定字段") = None,
    top_n: I.int(desc="前 n 条数据", min=1, max=100) = 10,
    order: I.choice(desc="选择最大还是最小", values=list(ORDER_MAP.keys())) = "最大",
) -> [
    I.port(desc="输出数据", name="data")
]:
    # TODO
    pass

\

6.3 功能实现

run()函数代码如下,对数据进行筛选:

import dai

def run(
	input_data: I.port(desc="待筛选的数据", specific_type_name="DataSource", optional=False),
    columns: I.str(desc="指定某个字段") = None,
    top_n: I.int(desc="前 n 条数据", min=1, max=100) = 10,
    order: I.choice(desc="选择最大还是最小", values=list(ORDER_MAP.keys())) = "最大",
) -> [
    I.port(desc="输出数据", name="data")
]:
    df = input_data.read()
    columns = columns.split(",")
    order = ORDER_MAP[order]
    rank_order = [order for _ in range(len(columns))]
    df = df.sort_values(by=columns, ascending=rank_order)
    if top_n > len(df):
        top_n = len(df)
    result = df.head(top_n)
    ds = dai.DataSource.write_bdb(result)
    return I.Outputs(data=ds)

post_run()函数代码如下,打印结果:

def post_run(outputs):
    """后置运行函数"""
	import structlog
    
    logger = structlog.get_logger()
    
    data = outputs.data
    df = data.read()
    logger.info(df)
    return outputs

\

6.4 完整代码

"""data_select package.

data select
"""

import dai
from bigmodule import I

# 需要安装的第三方依赖包
# from bigmodule import R
# R.require("requests>=2.0", "isort==5.13.2")

# metadata
# 模块作者
author = "BigQuant"
# 模块分类
category = "示例模块"
# 模块显示名
friendly_name = "data_select"
# 文档地址, optional
doc_url = "https://bigquant.com/wiki/"
# 是否自动缓存结果
cacheable = True

ORDER_MAP = {
    "最大" : False,
    "最小" : True,
}

def run(
    input_data: I.port(desc="待筛选的数据", specific_type_name="DataSource", optional=False),
    columns: I.str(desc="指定字段") = None,
    top_n: I.int(desc="前 n 条数据", min=1, max=100) = 10,
    order: I.choice(desc="选择最大还是最小", values=list(ORDER_MAP.keys())) = "最大",
)->[
    I.port(desc="输出数据", name="data")
]:
    df = input_data.read()
    columns = columns.split(",")
    order = ORDER_MAP[order]
    rank_order = [order for _ in range(len(columns))]
    df = df.sort_values(by=columns, ascending=rank_order)
    if top_n > len(df):
        top_n = len(df)
    result = df.head(top_n)
    ds = dai.DataSource.write_bdb(result)
    return I.Outputs(data=ds)


def post_run(outputs):
    """后置运行函数"""
    import structlog
    
    logger = structlog.get_logger()
    
    data = outputs.data
    df = data.read()
    logger.info(df)
    return outputs

\

7. 安装数据筛选模块

请参考4. 安装数据获取模块,安装数据筛选模块。

\

8. 使用模块

新建一个空白的可视化策略。

在可视化策略中左侧的导航栏“开发”->"示例模块"下,找到我们新创建的两个模块,双击即可在策略中使用。

若目录及模块没有加载出来,请尝试刷新网页。

首先点击模块data_get,在可视化策略右侧显式设置相关参数,如下:

指定获取的字段填充的参数为:

date,instrument,open,close

然后点击模块data_select,同样地设置相关参数,如下:

通过“连线”的方式,将模块data_get 的输出端口和模块data_select输入端口相连:

点击可视化策略左上角的运行按钮,即可运行模块:

下滑即可查看运行结果:

至此,上手模块开发大功告成!

\

9. 了解更多

访问开源模块项目:


相关文章:上手开发一个可视线性策略的简单模块

{link}