• 数仓模块ods层—每日行情数据入库(01)


    这里我们已经拿到了每日的行情数据,现在要做的就是将行情数据入库,也就是数仓的ods 环节

    数据情况

    我们将每天的行情数据按照存储到一个文件里,文件名按照日期命名,存储到一个csv文件中,如下所示

    image-20221022231742120

    下面是csv 文件的字段情况

    image-20221023185957473

    这里有一个要注意的地方,由于脚本改动的原因,我们的有一部分数据文件是不包含列名的也就是第一行就是数据,如下所示,我们可以看到20220916 数据文件第一行不是列名而是数据,所以我们在后面入库的时候要进行处理

    image-20221023190358799

    还有就是由于行情数据不是每天都有,所以程序每天运行会产生一些空文件,这里的空文件不一定大小是零,因为有的空文件有列名行,所欲文件大小是大于零的,有的空文件没有列名行,所以文件大小是零。

    脚本开发

    针对我们上面提到的一些数据情况我们开发脚本,重点如下

    1. 空文件过滤,这里我们认为文件大小小于等于76B 就是空文件
    2. clickhouse 的数据入库我们使用clickhouse-client,但是需要针对有列名的和无列名的分别处理
    3. 关于clickhouse-client 的调用我们使用subprocess
    4. 处理后的文件移动到trade_complete 文件夹下
    import os
    import subprocess
    import shutil
    basePath="/Users/kingcall/workspace/tmp/量化交易/trade"
    completePath="/Users/kingcall/workspace/tmp/量化交易/trade_complete"
    
    def checkStatus(res,filePath):
        reStatus=int(res[0])
        if reStatus!=0:
            print("处理失败:{}".format(filePath))
            print(res[1])
        else:
            print("处理成功:{}".format(filePath))
            fileName=os.path.split(filePath)[1]
            destFilePath=os.path.join(completePath,fileName)
            shutil.move(filePath,destFilePath)
    # 使用clickhouse-client 把数据加载到clickhouse
    def load2ck(filePath,isWithTitle):
        if isWithTitle:
            res=subprocess.getstatusoutput("clickhouse-client --password www1234 --query='INSERT INTO stock.ods_stock_daily_price_di FORMAT CSVWithNames'< {}".format(filePath))
            checkStatus(res,filePath)
        else:
            res=subprocess.getstatusoutput("cat {} | clickhouse-client --password www1234 --query='INSERT INTO stock.ods_stock_daily_price_di FORMAT CSV'".format(filePath))
            checkStatus(res,filePath)
    
    def fileFilter(basePath):
        files=sorted(os.listdir(basePath),reverse=True)
        for file in files:
            filePath=os.path.join(basePath,file)
            fileSize=os.stat(filePath).st_size
            # 过滤出非空文件
            if fileSize>76:
                # 判断文件是否包含字段名(首行是否是字段)
                try:
                    with open(filePath) as f:
                        firstline=f.readline()
                        if firstline.startswith(","):
                            load2ck(filePath,True)
                        else:
                            load2ck(filePath,False)
                except UnicodeDecodeError as e:
                    print("open file {0} error ".format(file))
            else:
            		# 删除空文件
                os.remove(filePath)
    if __name__ == '__main__':
        fileFilter(basePath)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    到这里我们的脚本就开发好了

    Clickhouse 建表

    首先我们创建了一个库stock,然后创建每日行情数据的表ods_stock_daily_price_di ,这个表的主键是股票代码和交易日期,建表语句如下,之所以使用ReplacingMergeTree 表引擎是为了对数据进行过滤,因为我们知道MergeTree 有可能发生数据重复的情况

    create database stock;
    
    create table ods_stock_daily_price_di(
    	ts_code String comment '股票代码',
    	trade_date date comment '交易日期',
    	open Float32 comment '开盘价',
    	high Float32 comment '最高价',
    	low Float32 comment '最低价',
    	close Float32 comment '收盘价',
    	pre_close Float32 comment '昨收价(前复权)',
    	change Float32 comment '涨跌额',
    	pct_chg Float32 comment '涨跌幅(未复权)',
    	vol Float32 comment '成交量(手)',
    	amount Float32 comment '成交额(千元)'
    )
    engine=ReplacingMergeTree()
    partition by toYear(trade_date)
    primary key(ts_code,trade_date)
    ORDER BY (ts_code,trade_date)
    ;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    Clickhouse导入数据

    这里我们直接运行脚本,导入数据即可,我们发现报错了,20211223.csv 处理失败了

    image-20221023165910224

    这个报错很明显,第一列(Column 0)行是0,第二列(Column 1)是000001.sz 不能转成 Date 类型,我们看一下数据文件我们发现文件的第一列是一列数字,其实是一个ID 列,也就是说不应该读取这一列的

    image-20221023170020415

    但是我们从日志中看到2021122.csv 这个文件本正确处理了,这是因为由于这个文件有标题行,所以处理方式和上面没有标题行的处理方式不一样,也就是说数据按照列名得到了争取的处理,由于第一列没有列名被舍弃了

    image-20221023165958339

    也可以在命令行进行查看

    head -n 3 /Users/kingcall/workspace/tmp/量化交易/trade/20211223.csv
    0,000001.SZ,20211223,17.4,17.43,17.21,17.32,17.39,-0.07,-0.4025,1059575.94,1831122.716
    1,000002.SZ,20211223,19.87,19.99,19.5,19.65,19.86,-0.21,-1.0574,697226.78,1371590.383
    2,000004.SZ,20211223,19.6,19.99,19.23,19.29,19.53,-0.24,-1.2289,38028.01,74356.92
    
    • 1
    • 2
    • 3
    • 4

    既然如此我们处理一些这个文件即可,这里我们使用panda 读取进来后,去掉第一列然后再写出去

    image-20221023171110500

    我们看到读取进来后,添加了行标签和列标签,这里我们只要1到11列即可,也就是删除掉0列,数据如下所示

    image-20221023172133712

    我们改造一下上面脚本的load2ck 方法

    # 使用clickhouse-client 把数据加载到clickhouse
    def load2ck(filePath,isWithTitle):
        if isWithTitle:
            res=subprocess.getstatusoutput("clickhouse-client --password www1234 --query='INSERT INTO stock.ods_stock_daily_price_di FORMAT CSVWithNames'< {}".format(filePath))
            checkStatus(res,filePath)
        else:
            # 删除第一列
            tmpData=pd.read_csv(filePath,header=None)
            result=tmpData[[1,2,3,4,5,6,7,8,9,10,11]]
            os.remove(filePath)
            result.to_csv(filePath,header=None,index=None)
            res=subprocess.getstatusoutput("clickhouse-client --password www1234 --query='INSERT INTO stock.ods_stock_daily_price_di FORMAT CSV' < {} ".format(filePath))
            checkStatus(res,filePath)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    到这里我们的脚本就写好了,下面我们的全部文件就处理完了

    image-20221023173428120

    Clickhouse 数据校验

    这里我们可以定义一些数据的校验规则,其实就是数据质量检测与监控,关于数仓的数据质量监控我们在数仓建模的专栏里介绍过,这里就不过多赘述了,主要监测数据量和核心字段的值即可。例如检查数据有没有重复

    SELECT 
    	ts_code,trade_date,count(1) as cnt
    from
    	ods_stock_daily_price_di
    group by
    	ts_code,trade_date
    HAVING 
    	count(1)>1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    我们的数据如下

    image-20221023202915852

    总结

    这里我们的每日行情数据就完成入库了,涉及到的主要知识点

    1. clickhouse 的数据写入,这里需要解释一下我们为什么不直接爬虫写入而是通过处理文件呢,主要是为了批量写入
    2. python 脚本的开发,主要涉及到os 模块和subprocess模块
  • 相关阅读:
    2.7HDR与LDR
    一次解释器模式的实际使用
    cpu设计和实现(基础)
    这两个工具能批量PDF转图片,建议收藏使用
    环境配置 | 图文VS2022配置OpenCV,Dlib
    【GIT版本控制】--初始化仓库
    JVM-虚拟机栈
    基于 BP 神经网络特征提取的指纹识别应用(Matlab代码实现)
    [附源码]java毕业设计闲置物品线上交易系统
    Spring的循环依赖
  • 原文地址:https://blog.csdn.net/king14bhhb/article/details/127662838