• 多线程增量下载K线数据


    准备一份股票列表的CSV文件,文件格式如下

    codenameclosecmvdate_ipo
    300434金石亚药12.89427982959020150424
    300380安硕信息19.31241993416320140128
    688123聚辰股份132.821114087266620191223
    300586美联新材20.34790882138120170104
    300534陇神戎发12.96389465063120160913
    300813泰林生物60.55160529663520200114
    688259创耀科技88.76157821235820220112
    301211亨迪药业30.43177175385520211222
    688099晶晨股份75.53121409395420190808

    范例CSV下载

    程序所在目录下,手工建好3个目录
    data/day
    data/week
    data/month

    导入必要的包

    1. import akshare as ak
    2. import numpy as np
    3. import pandas as pd
    4. import warnings
    5. import time
    6. import datetime as dt
    7. warnings.filterwarnings("ignore")

    读取股票列表

    1. #需要下载的股票列表
    2. #将 yyyymmdd形式的日期字符串,转换为yyyy-mm-dd形式
    3. def f_str2dtstr(x):
    4. try:
    5. return dt.datetime.strptime(x,'%Y%m%d').strftime('%Y-%m-%d')
    6. except:
    7. return np.NaN
    8. df = pd.read_csv('wz.csv',dtype={'code':str,'name':str,'close':float,'cmv':float,'date_ipo':str})
    9. #print(df)
    10. df['date_ipo'] = df['date_ipo'].apply(lambda x: f_str2dtstr(x))
    11. df

     设置线程工作相关参数

    1. import threading
    2. job_per_thread = 15
    3. remainder = len(df) % job_per_thread
    4. count_thread = 0
    5. if remainder == 0:
    6. count_thread = int(len(df)/job_per_thread)
    7. else:
    8. count_thread = int((len(df)-job_per_thread)/job_per_thread) +1
    9. print(job_per_thread) # 每个线程处理多少个股票数据
    10. print(count_thread) # 余数,最后一个线程处理多少个股票数据
    11. print(remainder) # 线程数量

    线程处理函数

    • 参数为线程编号(第X号线程,根据循环的变量来的)
    • 网络读取数据,纠错5次
    • 日线,月线,周线全部下载
    • 增量下载、任意时间下载(对比本地数据,删除本地第一条数据,以新下载的为准)
    1. def proc_get(m):
    2. if remainder == 0:
    3. count_job = job_per_thread
    4. else:
    5. if m == (count_thread-1):
    6. count_job = remainder
    7. else:
    8. count_job = job_per_thread
    9. for i in range(0,count_job):
    10. code = df.iloc[m*job_per_thread+i,0]
    11. date_s = '1970-01-01'
    12. d = pd.DataFrame(data=None,columns=['date','open','high','low','close','volume','turnover'])
    13. p_s = ['weekly','daily', 'monthly']
    14. for k in range(0,len(p_s)):
    15. item_p = p_s[k]
    16. path_file = "./data/"
    17. if item_p == 'daily':
    18. path_file = path_file+"day/"
    19. elif item_p == 'weekly':
    20. path_file = path_file+"week/"
    21. elif item_p == 'monthly':
    22. path_file = path_file+"month/"
    23. else:
    24. break
    25. path_file = path_file+"%s.csv"%(code)
    26. try:
    27. d = pd.read_csv("./data/day/%s.csv"%(code),dtype={'code':str,'code':str,'date':str})
    28. except:
    29. pass
    30. if len(d) > 0:
    31. #print(d.iloc[-1,0])
    32. date_s = d.iloc[-1,0]
    33. date_e = dt.datetime.now().strftime('%Y%m%d')
    34. if dt.datetime.strptime(date_s,'%Y-%m-%d') < dt.datetime.strptime(date_e,'%Y%m%d') :
    35. for j in range(0,5):
    36. try:
    37. a = ak.stock_zh_a_hist(symbol=code,period=item_p,start_date=date_s,end_date=date_e,adjust="qfq")
    38. a = a[['日期','开盘','最高','最低','收盘','成交量','换手率']]
    39. a.columns=['date','open','high','low','close','volume','turnover']
    40. #print(a)
    41. if len(d) > 0:
    42. d = pd.concat([d.iloc[0:-2,:],a],axis=0,ignore_index=True)
    43. else:
    44. d = a
    45. break
    46. except:
    47. print(code)
    48. time.sleep(3)
    49. if len(d) > 0:
    50. d.to_csv(path_file,index=False,date_format="%Y-%m-%d")

    开启多线程 

    1. threads = []
    2. for m in range(0,count_thread):
    3. t = threading.Thread(target=proc_get,args=(m,)) #注意即使一个参数,参数也要有,结尾
    4. #print(m)
    5. #print(t)
    6. threads.append(t)
    7. t.start()
    8. for t in threads:
    9. t.join()

    说明: 

    网络下载数据失败的代码会被打印出来
    一般情况下超过5次的就是真的没数据 
    没数据的一般是一些生成了股票代码但没交易的新股
    得空就更新一下你的K线数据呗,增量挺快的
    数据全部取得是前复权的
    未复权,包含前收盘价的数据可以换一个源取,有兴趣的可以做一下。
    任何疑问 turui@163.net
    当前接口获取的数据,起码比巨宽的强
    以SH000300数据为例,巨宽的数据从2005.04.08才有
    看到这种形状,赶紧入吧,涨停板等着你
    数据

  • 相关阅读:
    Python | Leetcode Python题解之第51题N皇后
    hadoop上传和下载文件过程【博学谷学习记录】
    一起来学Kotlin:概念:20. Kotlin open 关键字与类名、函数名和变量名的使用
    升级打怪拿offer,10w+字总结的Java面试题(附答案)够你刷
    创作一款表情包生成微信小程序:功能详解与用户体验优化
    【理解链表指针赋值】链表中cur->next = cur->next->next->next与cur =cur->next->next的区别
    Java Web 8 HTTP&Tomcat&Servlet 8.3 Servlet
    tomcat8.5处理get请求时,控制台输出中文乱码问题的解决
    Pytorch教程
    Linux进程控制--程序替换
  • 原文地址:https://blog.csdn.net/turui/article/details/127892837