• 通过外部XAI技术解释的模型


    PyODPS:基于DataWorks平台利用PyODPS实现案例集合之odps.models.project格式表的基本操作(输出表的基本信息/增删改查/表分区/数据上传下载)

    导读:PyODPS可以在DataWorks等数据开发平台中作为数据开发节点调用。工作流节点中会包含PYODPS类型节点。这些平台提供了PyODPS运行环境和调度执行的能力,无需用户手动创建MaxCompute入口对象。

    目录

    基于DataWorks平台利用PyODPS实现案例集合之表的基本操作

    1、输出表的基本信息

    2、表的初始化

    3、表的增删改查

    (1)、增

    # 创建表

    T1、利用Schema来创建表

    T2、简单的方式是使用逗号连接的“字段名,字段类型”字符串组合来创建表

    # 创建DataFrame

    # 写入数据

    T1、# 使用 with 表达式

    T2、# 不使用 with 表达式

    T3、使用MaxCompute对象的write_table方法写入数据

    (2)、删

    # 删除表

    (3)、改

    # 同步表更新

    (4)、查

    # 查询行记录Record

    # 查询表数据

    T1、通过调用head获取表数据,但仅限于查看每张表开始的小于1万条的数据

    T2、通过在table上执行open_reader操作,打开一个reader来读取数据。用户可以使用with表达式,也可以不使用。

    T3、使用MaxCompute对象的read_table方法获取表数据

    4、表分区相关

    5、数据上传下载通道

    (1)、上传示例

    (2)、下载示例


    基于DataWorks平台利用PyODPS实现案例集合之表的基本操作

    1. from odps import ODPS #与ODPS调用相关的SDK需要增加这个模块导入。
    2. print("Hello World") #往日志文件输出。
    3. print(o.exist_table('pyodps_iris')) # ataWorks的PyODPS节点中,将会包含一个全局变量odps或者o,即MaxCompute入
    4. project = o.get_project() # 获取到默认项目空间
    5. print(project)

    1、输出表的基本信息

    1. # # 列出项目空间下的所有表及其每个字段类型
    2. for table in o.list_tables():
    3. print(table)

    basic_cplus__lf_dev.`consumer`

    1. # 输出表的基本信息
    2. t = o.get_table('consumer', project='basic_cplus__**_dev') # 调用get_table来获取表
    3. # print(t)
    4. print(t.lifecycle)
    5. print(t.creation_time)
    6. print(t.is_virtual_view)
    7. print(t.size)
    8. print(t.comment)
    9. print(t.schema.columns[:4])

    2、表的初始化

    1. 1
    2. 2
    3. 3
    4. 4

    3、表的增删改查

    (1)、增

    # 创建表

    T1、利用Schema来创建表

    1. table = o.create_table('my_new_table', schema)
    2. table = o.create_table('my_new_table', schema, if_not_exists=True) #只有不存在表时才创建。
    3. table = o.create_table('my_new_table', schema, lifecycle=7) #设置生命周期。

    T2、简单的方式是使用逗号连接的“字段名,字段类型”字符串组合来创建表

    1. table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
    2. # 创建分区表可传入 (表字段列表, 分区字段列表)
    3. table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)

    # 创建DataFrame

    PyODPS提供了DataFrame框架DataFrame框架,支持更方便地方式来查询和操作MaxCompute数据。使用to_df方法,即可转化为DataFrame对象。

    1. table = o.get_table('my_table_name')
    2. df = table.to_df()

    # 写入数据

    类似于open_reader,table对象同样可以执行open_writer来打开writer,并写入数据。

    •  每次调用write_table,MaxCompute都会在服务端生成一个文件。这一操作需要较大的时间开销,同时过多的文件会降低后续的查询效率。因此,建议在使用write_table方法时,一次性写入多组数据, 或者传入一个Generator对象。
    •  使用write_table方法写表时会追加到原有数据。PyODPS不提供覆盖数据的选项,如果需要覆盖数据,需要手动清除原有数据。对于非分区表,需要调用table.truncate(),对于分区表,需要删除分区后再建立。

    T1、# 使用 with 表达式

    1. with t.open_writer(partition='pt=test') as writer:
    2. records = [[111, 'aaa', True], # 这里可以是list。
    3. [222, 'bbb', False],
    4. [333, 'ccc', True],
    5. [444, '中文', False]]
    6. writer.write(records) # 这里records可以是可迭代对象。
    7. with t.open_writer(partition='pt1=test1,pt2=test2') as writer: #多级分区写法
    8. records = [t.new_record([111, 'aaa', True]), # 也可以是Record对象。
    9. t.new_record([222, 'bbb', False]),
    10. t.new_record([333, 'ccc', True]),
    11. t.new_record([444, '中文', False])]
    12. writer.write(records)with t.open_writer(partition='pt=test', blocks=[0, 1]) as writer: # 此处同时打开两个block。
    13. writer.write(0, gen_records(block=0))
    14. writer.write(1, gen_records(block=1)) # 此处两个写操作可以多线程并行,各个block间是独立的。

    T2、# 不使用 with 表达式

    1. writer = t.open_writer(partition='pt=test', blocks=[0, 1])
    2. writer.write(0, gen_records(block=0))
    3. writer.write(1, gen_records(block=1))
    4. writer.close() # 不要忘记关闭 writer,否则数据可能写入不完全

    T3、使用MaxCompute对象的write_table方法写入数据

    1. records = [[111, 'aaa', True], # 这里可以是list
    2. [222, 'bbb', False],
    3. [333, 'ccc', True],
    4. [444, '中文', False]]
    5. o.write_table('test_table', records, partition='pt=test', create_partition=True)
    6. 如果分区不存在,可以使用create_partition参数指定创建分区,示例如下。
    7. with t.open_writer(partition='pt=test', create_partition=True) as writer:
    8. records = [[111, 'aaa', True], # 这里可以是list。
    9. [222, 'bbb', False],
    10. [333, 'ccc', True],
    11. [444, '中文', False]]
    12. writer.write(records) # 这里records可以是可迭代对象。

    (2)、删

    # 删除表

    1. o.delete_table('my_table_name', if_exists=True) # 只有表存在时删除。
    2. t.drop() # Table对象存在的时候可以直接执行drop函数。

    (3)、改

    # 同步表更新

    当一个表被其他程序更新,例如改变了Schema之后,可以调用reload方法同步表的更新

    table.reload()

    (4)、查

    # 查询行记录Record

    Record表示表的一行记录,用户在Table对象上调用new_record即可创建一个新的Record

    1. t = o.get_table('mytable')
    2. r = t.new_record(['val0', 'val1']) # 值的个数必须等于表schema的字段数
    3. r2 = t.new_record() # 您也可以不传入值
    4. r2[0] = 'val0' # 可以通过偏移设置值
    5. r2['field1'] = 'val1' # 也可以通过字段名设置值
    6. r2.field1 = 'val1' # 通过属性设置值
    7. print(record[0]) # 取第0个位置的值
    8. print(record['c_double_a']) # 通过字段取值
    9. print(record.c_double_a) # 通过属性取值
    10. print(record[0: 3]) # 切片操作
    11. print(record[0, 2, 3]) # 取多个位置的值
    12. print(record['c_int_a', 'c_double_a']) # 通过多个字段取值

    # 查询表数据

    T1、通过调用head获取表数据,但仅限于查看每张表开始的小于1万条的数据

    1. t = odps.get_table('dual')
    2. for record in t.head(3):
    3. print(record[0]) # 取第0个位置的值
    4. print(record['c_double_a']) # 通过字段取值
    5. print(record[0: 3]) # 切片操作
    6. print(record[0, 2, 3]) # 取多个位置的值
    7. print(record['c_int_a', 'c_double_a']) # 通过多个字段取值

    T2、通过在table上执行open_reader操作,打开一个reader来读取数据。用户可以使用with表达式,也可以不使用。

    1. # 使用with表达式
    2. with t.open_reader(partition='pt=test') as reader:
    3. count = reader.count
    4. for record in reader[5:10] # 可以执行多次,直到将count数量的record读完,这里可以改
    5. 造成并行操作
    6. # 处理一条记录
    7. # 不使用with表达式
    8. reader = t.open_reader(partition='pt=test')
    9. count = reader.count
    10. for record in reader[5:10] # 可以执行多次,直到将count数量的record读完,这里可以改造
    11. 成并行操作
    12. # 处理一条记录

    T3、使用MaxCompute对象的read_table方法获取表数据

    1. for record in o.read_table('test_table', partition='pt=test'):
    2. # 处理一条记录

    4、表分区相关

    1. # 判断是否为分区表
    2. if table.schema.partitions:
    3. print('Table %s is partitioned.' % table.name)
    4. # 遍历表全部分区:
    5. for partition in table.partitions:
    6. print(partition.name)
    7. for partition in table.iterate_partitions(spec='pt=test'):
    8. # 遍历二级分区。
    9. 判断分区是否存在:
    10. table.exist_partition('pt=test,sub=2019')
    11. # 获取分区:
    12. partition = table.get_partition('pt=test')
    13. print(partition.creation_time)
    14. # 创建分区。
    15. t.create_partition('pt=test', if_not_exists=True) # 分区。不存在时候才创建。
    16. # 删除分区。
    17. t.delete_partition('pt=test', if_exists=True) # 存在时才删除。
    18. partition.drop() # Partition对象存在的时候直接drop。

    5、数据上传下载通道

    MaxCompute Tunnel是MaxCompute的数据通道,用户可以通过Tunnel向MaxCompute中上传或者下载数据。
    说明:不推荐直接使用Tunnel接口,推荐用户直接使用表的写和读接口。同时,如果安装了Cython,在安装PyODPS时会编译C代码,加速Tunnel的上传和下载。
    说明:需要注意,PyODPS暂不支持上传外部表。s

    (1)、上传示例

    1. from odps.tunnel import TableTunnel
    2. table = o.get_table('my_table')
    3. tunnel = TableTunnel(odps)
    4. upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
    5. with upload_session.open_record_writer(0) as writer:
    6. record = table.new_record()
    7. record[0] = 'test1'
    8. record[1] = 'id1'
    9. writer.write(record)
    10. record = table.new_record(['test2', 'id2'])
    11. writer.write(record)
    12. upload_session.commit([0])

    (2)、下载示例

    1. from odps.tunnel import TableTunnel
    2. tunnel = TableTunnel(odps)
    3. download_session = tunnel.create_download_session('my_table', partition_spec='pt=
    4. test')
    5. with download_session.open_record_reader(0, download_session.count) as reader:
    6. for record in reader:
    7. # 处理每条记录。

  • 相关阅读:
    【IMX6ULL笔记】-- 从驱动到应用(基于Qt)- LED
    NAND闪存改变了现代生活
    鉴源论坛 · 观模丨软件单元测试真的有必要吗?(下)
    Docker如何使用
    数论学习笔记 - 同余及其拓展知识
    【附源码】计算机毕业设计JAVA移动学习网站
    自主实现qsort函数
    Python中关闭文件很重要
    vue3基础流程
    《神经网络与深度学习》算法伪代码汇总
  • 原文地址:https://blog.csdn.net/qq_41185868/article/details/126811352