• Python读取hbase数据库


    1. hbase连接

    首先用hbase shell 命令来进入到hbase数据库,然后用list命令来查看hbase下所有表,以其中表“DB_level0”为例,可以看到库名“baotouyiqi”是拼接的,python代码访问时先连接:

    1. def hbase_connection(hbase_master, hbase_port, table_prefix=None):
    2. connection = happybase.Connection(host=hbase_master, port=hbase_port, table_prefix=table_prefix)
    3. return connection
    4. connection = hbase_connection(hbase_master, hbase_port, table_prefix) # 在连接的时候创建项目空间
    5. table = connection.table(tablename) # 获取表连接

    备注:完整代码在最后,想运行的直接滑倒最后复制即可

    2. 按条件读取hbase数据

    然后按照条件来查询表中想要的数据集,这里只列举两个条件:时间区间和指定列。同样,我们在shell下用scan命令来查看表中的数据结构:

    可以看到第一列是ROW,第二列是COLUMN+CELL,python代码取数据方法差不多:

    1. date_prex_start = bytes('dt_' + starttime, encoding='utf-8') # row_start
    2. date_prex_end = bytes('dt_' + endtime, encoding='utf-8') # row_stop
    3. # 通过设置row key的前缀row_prefix参数来进行局部扫描
    4. outdata = dict(table.scan(row_start=date_prex_start, row_stop=date_prex_end,
    5. columns=[onecolumn]))

    得到的结果如下,是个字典格式:

    3. 按格式输出hbase数据结果

    我们希望输出的结果是dataframe的,而且第一列是time,第二列是value,所以就做个简单格式处理:

    1. timesep = list(map(lambda x: x.decode('utf-8').replace('dt_', ''), outdata.keys()))
    2. tempdata = list(outdata.values())
    3. valuelist = list(map(lambda x: float(list(x.values())[0]), tempdata))
    4. if len(timesep) > 0:
    5. db_data2 = pd.DataFrame({'时间': timesep, onecolumn: valuelist})
    6. db_data2.loc[:, '时间2'] = [i[:16] for i in db_data2['时间']]
    7. db_data2 = db_data2.drop_duplicates(subset=['时间2'], keep='last') # 一分钟内多次数值取一个即可
    8. else:
    9. db_data2 = pd.DataFrame()
    10. if len(db_data2) < 1:
    11. return pd.DataFrame()
    12. db_data2.loc[:, '时间戳'] = [time.mktime(time.strptime(i, "%Y-%m-%d %H:%M:%S")) for i in db_data2['时间']]
    13. db_data2 = db_data2.sort_values(by=['时间戳'], ascending=False) # 将最新的数值放最前面
    14. db_data3 = db_data2.drop(columns=['时间2', '时间戳'])
    15. db_data3.columns = ['time', 'value']

    4. 完整代码(code)

    1. import happybase
    2. import time
    3. import pandas as pd
    4. from pathlib import Path
    5. os_file_name = Path(__file__).name
    6. def hbase_connection(hbase_master, hbase_port, table_prefix=None):
    7. connection = happybase.Connection(host=hbase_master, port=hbase_port, table_prefix=table_prefix)
    8. return connection
    9. def get_data_by_tum(hbase_master, hbase_port, table_prefix, tablename, columnslist, starttime, endtime):
    10. columnsid = '$'.join(columnslist)
    11. onecolumn = 'TimeSe:dt_' + columnsid # column
    12. connection = hbase_connection(hbase_master, hbase_port, table_prefix) # 在连接的时候创建项目空间
    13. table = connection.table(tablename) # 获取表连接
    14. date_prex_start = bytes('dt_' + starttime, encoding='utf-8') # row_start
    15. date_prex_end = bytes('dt_' + endtime, encoding='utf-8') # row_stop
    16. # 通过设置row key的前缀row_prefix参数来进行局部扫描
    17. outdata = dict(table.scan(row_start=date_prex_start, row_stop=date_prex_end,
    18. columns=[onecolumn]))
    19. timesep = list(map(lambda x: x.decode('utf-8').replace('dt_', ''), outdata.keys()))
    20. tempdata = list(outdata.values())
    21. valuelist = list(map(lambda x: float(list(x.values())[0]), tempdata))
    22. if len(timesep) > 0:
    23. db_data2 = pd.DataFrame({'时间': timesep, onecolumn: valuelist})
    24. db_data2.loc[:, '时间2'] = [i[:16] for i in db_data2['时间']]
    25. db_data2 = db_data2.drop_duplicates(subset=['时间2'], keep='last') # 一分钟内多次数值取一个即可
    26. else:
    27. db_data2 = pd.DataFrame()
    28. if len(db_data2) < 1:
    29. return pd.DataFrame()
    30. db_data2.loc[:, '时间戳'] = [time.mktime(time.strptime(i, "%Y-%m-%d %H:%M:%S")) for i in db_data2['时间']]
    31. db_data2 = db_data2.sort_values(by=['时间戳'], ascending=False) # 将最新的数值放最前面
    32. db_data3 = db_data2.drop(columns=['时间2', '时间戳'])
    33. db_data3.columns = ['time', 'value']
    34. return db_data3
    35. if __name__ == '__main__':
    36. begin_time = '2023-08-22 00:00:00'
    37. end_time = '2023-08-23 00:00:00'
    38. hbase_master = "142.21.8.22"
    39. hbase_port = 9097
    40. table_prefix = "baotouyiqi"
    41. table_name = "DB_level0"
    42. onedata = ["62340", "20", "204"]
    43. dataget = get_data_by_tum(hbase_master, hbase_port, table_prefix, table_name,
    44. onedata, begin_time, end_time)
    45. print(dataget)

  • 相关阅读:
    配管信息管理工具之Apollo
    ECharts数据可视化完整代码
    浅浅的 linux开发板 驱动的使用
    性能测试 —— Jmeter定时器
    Godot UI线程,Task异步和消息弹窗通知
    Android——解决BottomNavigationView+Fragment重建与重叠问题
    Splunk的CIM是什么?
    网上花店网页代码 html静态花店网页设计制作 dw静态鲜花网页成品模板素材网页 web前端网页设计与制作 div静态网页设计
    android studio cmake生成.a文件(静态库)及调用(c c++)静态库.a
    OS之文件逻辑结构
  • 原文地址:https://blog.csdn.net/Trisyp/article/details/136390906