知识总结

核心要点

1
2
3
4
5
6
7
sql的规范非常重要:
1. sql语法使用的是clickhouse(目前使用ch)
2. 需要输入的参数使用{}括起来
3. 参数以sql_开头的会将传入的参数值作为整个sql的一部分,如BENCHMARK_IDS="333,444,555"会将它转换生成注入参数sql_BENCHMARK_IDS="(a.BENCHMARK_ID = '333' or a.BENCHMARK_ID = '444' or a.BENCHMARK_ID = '555')")
①注意sql_后面的名字需要和字段的名字一样,且会加一个s结尾,这样表示会传入多个参数,python转换的时候自动转换成or的形式去匹配数据
②在参数设置-其他里面需要选择左连接还是右连接的参数设置
4. 最外层字段名要使用AS,且目前看到的字段名全是大写的

中台系统

1
2
1. 核心要点是某些无法直接查询或者通过sql写出来得指标
2. 计算这些指标,首先需要明确需要用到哪些数据,以及计算的公式,然后通过公式将数据转换成模型,将它生成api即可

金融指标开发:核心要点:按照模板修改metadata和公式输入输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1. 静态指标开发:定期需要跑的指标===>跑批等--->变化不大的指标===>结果存在数据库里
注意事项:
1. 操作数据库使用dao里面的类,不要直接访问数据源
2. 指标发布需要先写代码,写代码建议在元静态模板的基础上修改
3. 静态指标发布:需要将指标在配置文件中进行添加(config文件夹下)


2. 动态指标开发:需要实时计算的===>由前端发起请求,调用notebook的代码实现
1. 复制模板代码,在模板代码的基础上进行修改
2. 动态指标建议使用olap_开头
3. 函数输出的metadata也需要包含输入的参数
4. 动态指标的发布: 同静态,需要在配置文件里添加(config文件夹下)
5. 转换成py代码,都需要在http://localhost:8000/下进行生成
6.动态指标是否创建好,需要在jobs中的indicators_olpa.py文件下查看是否已经包含需要的函数
7.动态指标发布完后可以通过postman进行测试

疑问的地方:

1. locallhost:8000怎么通过api接口自测
1. postman的使用在哪里

代码整体介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
1. 代码里的api_mode apiext apis对应了三个版本的api接口,api_model对应了roboat_api,apis对应了跑批的api
2. 跑批程序需要在config/job_conf.json进行配置,否则不会进行代码转换,olap的配置目前看到是不用了所以不用进行配置
3. 跑批的py代码生成后(点击),可以在中台批处理任务/任务管理进行新建任务时候,任务方法里面可以看到跑批的算法列表(由跑批的api列表那个提供)
4. 接口里面olap实时生成目前已经不用了,在光大版本时候使用的
5. 接口里面tool也是光大版本使用的目前已经不用
5. 接口里面ext_tool只要是下载保存使用的
6. 接口面的robot_tool是邮储版本使用的,负责生成平铺指标函数类和聚合指标函数类==驼峰命名法,同时会将平铺和聚合类的ipynb代码转换成py代码,平铺转换后的代码在functions/ind下,聚合转换后的代码在functions/ind_agg下
7. metadata的作用是告诉中台由哪些参数
8. 操作数据模型进行数据读写,在inds_function/agg聚合里面有样例
9. agg类的聚合算法模型ipynb会生成对应的驼峰命名法类的算法模型,可在API管理的算法模型通过类名搜索到对应的模型,也可以在指标库管理里面搜到
10. 不同的级别:什么组合\组合树级别的,不知道什么意思,在meta_data中有定义agg-layers(里面有port\port-tree等这些划分)
11. 服务启动的时候会自动转换所有模型.除非有新增才需要手动去api生成.重启不考虑
12. 中台数据模型里面,目前只用到了单源模型和数据源,复合模型目前还没有开始使用.单源模型是md结尾的,复合模型是cmd结尾的
13. 在中台创建的单元数据模型,每次启动服务时候都会自动生成对应的类的py代码,使得可以直接在代码里使用了,不用进行数据库的读取操作了,也可以自己生成通过/robot_tool/model_to_ind_funcs接口
14. 聚合类本来只有那些自带的方法(光大版)==>在api管理的数据分组那里,后来添加了自定义agg的算法模型
15. 数据中台的指标库指标的操作在apiext下的api_data_utils.py代码里,进行实例化操作
16. 同一指标的时间维度配置,名称可以相同(在指标库/指标设置/新增指标/时间维度)==>循环跑,代码在api_data_utils部分
17. 共通参数名在common下的constants.py代码里,否则组合树那些可能无法解析,无法转换
18. 字典代码在commmon下的sys_dicts
19. dao数据库处理相关的
20. settings里面的sql配置是给dao里面的sql使用的,不是给数据模型使用的,数据模型使用的是数据源的配置的连接.在dao/models/data_source_init.py文件下
21. dao/data_model.py下负责解析大json,然后解析成文件
22. 层级类型的理解,什么组合树啊,port_ids啊
23. 单源模型的参数设置时候默认值填 1=1, 多个数据时候通常选在,左字段里面得选择表名.字段名
24. 汇总级别里面,组合树、组合、资产树、期间、资产等区别是什么
25. 测试流程:
1. 本地测试
①启动py服务,写jupyter代码,假数据测试
2. 服务器测试
①在服务器写完ipynb代码,或者更新最新的代码上去,重启服务(上传,生成也可以)
②在api管理里面新建算法模型的api接口
③通过api接口返回的json去python的某个接口获取大json
④把大json传给模型接口,完成
26. jupyter仅仅作为一个写代码的工具,无法和中台那些连起来
27. 组合代码:port_ids = 'ZY_HB001,ZY_JZ001,ZYB'
28. 配置中台数据源连接在dao/data_source_init.py下
29. 写代码流程: 目前是在原有代码的基础上修改,添加字段等操作
批处理代码流程:
1. 本地修改代码,添加字段,新建文件还需要添加到配置文件里
2. 重新生成批处理的文件,execute
3. 使用postman的/job/run_batch/{job_name}跑一下该任务
4. 断点调试,通过debug模式启动任务,然后在py代码里大断点
5. 运行postman跑批任务的接口,跑代码
6. 去数据库通过sql查询插入的数据是否正确,没问题则将修改的代码上传
30. 参数sql_port_ids等样式的特殊
31. 输出字符的name决定了输出字段的名字,是中文则是中文
32. 有后置过滤处理因而返回的参数不要与传入的参数有重叠
33. 重启服务:
cd /home/tams/docker_deploy/dc-start/
docker ps
docker restart 79f4e307d214

问题

1
2
3
4
1. 组合、资产
2. 源数据表里放的是哪些数据
①Bond\dwd\HK\LC\QT\sst等分别代表的含义
3. 指标设置里面的参数设置控件,怎么选择的问题

最早的投资时间小于平台日会报错

什么时候看完,什么时候开发完

本地调试代码修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def get_sql_util_instance(cls, data_source_id: str) -> DbUtils:   #####本地重载的,源代码是下面那个,不要动
logger.debug(str(cls))
if data_source_id in db_utils_dict:
return db_utils_dict[data_source_id]

try:
data_source_lock.acquire()
# 实时请求中台
if data_source_id:
ds_dict = rest_post(s.GET_ALL_DB_CONN_URL, {'dataSourceId': ''})
if ds_dict:
source_ids =[i for i in rest_post('/dataModel/getAllDbConnList', {'dataSourceId': ''})['list'] if i['dataSourceId'] == data_source_id]
ds_dict = {'size':len(source_ids),'list':source_ids}
if ds_dict and ('size' in ds_dict) and (ds_dict['size'] == 1):
data_s = __format_ds(ds_dict['list'][0])
data_source_config[data_s['dataSourceId']] = data_s

# 实时创建连接池和db_utils
db_utils = create_db_utils(data_s['dbType'], data_s)
db_utils_dict[data_source_id] = db_utils
return db_utils
except ConnectionResetError as e:
logger.error(repr(e))
msg = '%s 无法访问,可能是服务未正常启动或网络不通。' % s.EUREKA_DATA_MID_NAME
logger.error(msg)
raise DataSourceNotFoundError(msg=msg)
finally:
data_source_lock.release()

logger.error('找不到数据源:%s的配置信息。' % data_source_id)
raise DataSourceNotFoundError(data_source_id)

取模型数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
PORT_IDS = 'DL1,DL2'
START_DATE = '2000-01-01'
END_DATE = '2020-01-01'
BENCHMARK_TYPE = '1'
BENCHMARK_ID = '2'
ind_instance = get_ind_instance({
gCp.AGG_IND_CLASS_NAME.value: 'PortfolioPeriodReturnAgg',
'PORT_IDS': PORT_IDS,
'START_DATE': START_DATE,
'END_DATE': END_DATE,
'BENCHMARK_TYPE':BENCHMARK_TYPE,
'BENCHMARK_ID':BENCHMARK_ID
})
df_list = ind_instance.get_source_df_list()
df = ind_instance.calc_port_return(df_list)
df

参数设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
PORT_IDS = 'DL71,NYLY-6MDK'
END_DATE = '2020-01-01'
START_DATE = '2000-01-01'
BENCHMARK_TYPE='CONTRACT'
BENCHMARK_ID='GZ001'
CALC_BENCHMARK_ID = '01'
NO_RISK_BENCHMARK_ID = '02'
is_test=False
calc_return2(get_source_data(PORT_IDS, START_DATE, END_DATE, BENCHMARK_TYPE='CONTRACT',BENCHMARK_ID = BENCHMARK_ID,BENCHMARK_IDS = ['GZ001','GZ002','GZ003'], CALC_BENCHMARK_ID = CALC_BENCHMARK_ID, NO_RISK_BENCHMARK_ID = NO_RISK_BENCHMARK_ID))




df_list = get_source_data('NYLY-10MDK', '2019-01-01', '2019-12-17',BENCHMARK_IDS = 'GY001', CALC_BENCHMARK_ID = "000007_CNSESH", NO_RISK_BENCHMARK_ID = 'GZ001')

代码详解

1
2
job_convert.py 代码里主要讲述了jupyter代码 转换成python代码的实现过程
apiext/api_data_utils.py 方法生成

image-20220917101635133

这个意思是根据使用组合和组合树的不同可以返回不同的指标,要在这里面标明,下面source是为了字段溯源使用的(血缘关系-光大版本的)目前直接读的数据模型不需要考虑来源于哪个表了,所以不用写

image-20220917103836858

平铺类算法指标想要实现多进程可以在meta_data里面写(这里说的是传入很多个组合的时候的情况),这里的写法意思是多进程跑方法

image-20220917104849624

这里是一个优化的点,通过测试查看多进程是否可以快一点

image-20220917163759789

df1 = pd.DataFrame([[‘2019-01-01’,10],[‘2019-01-02’,10],[‘2019-01-10’,10]],columns = [‘month_date’,’values’])
df2 = pd.DataFrame([[‘2019-01-13’,10],[‘2019-01-10’,10],[‘2019-01-15’,10]],columns = [‘month_date’,’values’])
start_date = min(df1.month_date.min(),df2.month_date.min())
end_date = max(df1.month_date.max(),df2.month_date.max())
month_num = len(pd.date_range(start_date,end_date,freq = ‘D’,closed = ‘left’))
date_df = pd.DataFrame(pd.date_range(start_date,freq = ‘D’,periods = month_num+1),columns = [‘month_date’])
date_df[‘month_date’] = date_df[‘month_date’].apply(lambda x:str(x)[:10])
date_df_all = date_df.merge(df1,on = ‘month_date’,how = ‘left’).merge(df2,on = ‘month_date’,how = ‘left’)