这里我们已经拿到了每日的行情数据,现在要做的就是将行情数据入库,也就是数仓的ods 环节
我们将每天的行情数据按照存储到一个文件里,文件名按照日期命名,存储到一个csv文件中,如下所示

下面是csv 文件的字段情况

这里有一个要注意的地方,由于脚本改动的原因,我们的有一部分数据文件是不包含列名的也就是第一行就是数据,如下所示,我们可以看到20220916 数据文件第一行不是列名而是数据,所以我们在后面入库的时候要进行处理

还有就是由于行情数据不是每天都有,所以程序每天运行会产生一些空文件,这里的空文件不一定大小是零,因为有的空文件有列名行,所欲文件大小是大于零的,有的空文件没有列名行,所以文件大小是零。
针对我们上面提到的一些数据情况我们开发脚本,重点如下
trade_complete 文件夹下import os
import subprocess
import shutil
basePath="/Users/kingcall/workspace/tmp/量化交易/trade"
completePath="/Users/kingcall/workspace/tmp/量化交易/trade_complete"
def checkStatus(res,filePath):
reStatus=int(res[0])
if reStatus!=0:
print("处理失败:{}".format(filePath))
print(res[1])
else:
print("处理成功:{}".format(filePath))
fileName=os.path.split(filePath)[1]
destFilePath=os.path.join(completePath,fileName)
shutil.move(filePath,destFilePath)
# 使用clickhouse-client 把数据加载到clickhouse
def load2ck(filePath,isWithTitle):
if isWithTitle:
res=subprocess.getstatusoutput("clickhouse-client --password www1234 --query='INSERT INTO stock.ods_stock_daily_price_di FORMAT CSVWithNames'< {}".format(filePath))
checkStatus(res,filePath)
else:
res=subprocess.getstatusoutput("cat {} | clickhouse-client --password www1234 --query='INSERT INTO stock.ods_stock_daily_price_di FORMAT CSV'".format(filePath))
checkStatus(res,filePath)
def fileFilter(basePath):
files=sorted(os.listdir(basePath),reverse=True)
for file in files:
filePath=os.path.join(basePath,file)
fileSize=os.stat(filePath).st_size
# 过滤出非空文件
if fileSize>76:
# 判断文件是否包含字段名(首行是否是字段)
try:
with open(filePath) as f:
firstline=f.readline()
if firstline.startswith(","):
load2ck(filePath,True)
else:
load2ck(filePath,False)
except UnicodeDecodeError as e:
print("open file {0} error ".format(file))
else:
# 删除空文件
os.remove(filePath)
if __name__ == '__main__':
fileFilter(basePath)
到这里我们的脚本就开发好了
首先我们创建了一个库stock,然后创建每日行情数据的表ods_stock_daily_price_di ,这个表的主键是股票代码和交易日期,建表语句如下,之所以使用ReplacingMergeTree 表引擎是为了对数据进行过滤,因为我们知道MergeTree 有可能发生数据重复的情况
create database stock;
create table ods_stock_daily_price_di(
ts_code String comment '股票代码',
trade_date date comment '交易日期',
open Float32 comment '开盘价',
high Float32 comment '最高价',
low Float32 comment '最低价',
close Float32 comment '收盘价',
pre_close Float32 comment '昨收价(前复权)',
change Float32 comment '涨跌额',
pct_chg Float32 comment '涨跌幅(未复权)',
vol Float32 comment '成交量(手)',
amount Float32 comment '成交额(千元)'
)
engine=ReplacingMergeTree()
partition by toYear(trade_date)
primary key(ts_code,trade_date)
ORDER BY (ts_code,trade_date)
;
这里我们直接运行脚本,导入数据即可,我们发现报错了,20211223.csv 处理失败了

这个报错很明显,第一列(Column 0)行是0,第二列(Column 1)是000001.sz 不能转成 Date 类型,我们看一下数据文件我们发现文件的第一列是一列数字,其实是一个ID 列,也就是说不应该读取这一列的

但是我们从日志中看到2021122.csv 这个文件本正确处理了,这是因为由于这个文件有标题行,所以处理方式和上面没有标题行的处理方式不一样,也就是说数据按照列名得到了争取的处理,由于第一列没有列名被舍弃了

也可以在命令行进行查看
head -n 3 /Users/kingcall/workspace/tmp/量化交易/trade/20211223.csv
0,000001.SZ,20211223,17.4,17.43,17.21,17.32,17.39,-0.07,-0.4025,1059575.94,1831122.716
1,000002.SZ,20211223,19.87,19.99,19.5,19.65,19.86,-0.21,-1.0574,697226.78,1371590.383
2,000004.SZ,20211223,19.6,19.99,19.23,19.29,19.53,-0.24,-1.2289,38028.01,74356.92
既然如此我们处理一些这个文件即可,这里我们使用panda 读取进来后,去掉第一列然后再写出去

我们看到读取进来后,添加了行标签和列标签,这里我们只要1到11列即可,也就是删除掉0列,数据如下所示

我们改造一下上面脚本的load2ck 方法
# 使用clickhouse-client 把数据加载到clickhouse
def load2ck(filePath,isWithTitle):
if isWithTitle:
res=subprocess.getstatusoutput("clickhouse-client --password www1234 --query='INSERT INTO stock.ods_stock_daily_price_di FORMAT CSVWithNames'< {}".format(filePath))
checkStatus(res,filePath)
else:
# 删除第一列
tmpData=pd.read_csv(filePath,header=None)
result=tmpData[[1,2,3,4,5,6,7,8,9,10,11]]
os.remove(filePath)
result.to_csv(filePath,header=None,index=None)
res=subprocess.getstatusoutput("clickhouse-client --password www1234 --query='INSERT INTO stock.ods_stock_daily_price_di FORMAT CSV' < {} ".format(filePath))
checkStatus(res,filePath)
到这里我们的脚本就写好了,下面我们的全部文件就处理完了

这里我们可以定义一些数据的校验规则,其实就是数据质量检测与监控,关于数仓的数据质量监控我们在数仓建模的专栏里介绍过,这里就不过多赘述了,主要监测数据量和核心字段的值即可。例如检查数据有没有重复
SELECT
ts_code,trade_date,count(1) as cnt
from
ods_stock_daily_price_di
group by
ts_code,trade_date
HAVING
count(1)>1
我们的数据如下

这里我们的每日行情数据就完成入库了,涉及到的主要知识点