• Dataworks实现接口调用


    RestAPI调用

    功能:实现Restful风格的API调用

    步骤一:配置RestAPI数据源,在url中填Restful风格的url,若是需要账号密码登录等可以切换“验证方法”

    步骤二:在离线同步中创建离线同步任务,数据来源选择配置的Restful数据源

    其他接口调用

    功能:实现非Restful风格的接口调用

    使用节点:PyODPS,其中PyODSP2的python版本为2.7,PyODSP3的python版本为3.7

    实现方法:导入所需的第三方python包,采用python编码实现

    第三方包导入

    公有云:直接pip安装所需包即可

    专有云:导入第三方包方法如下:      

            步骤一

    PyPI · The Python Package Index等平台下载对应版本的python包,将文件后缀改为.zip

           步骤二:上传资源,将第三方包上传至dataworks(注意:所有上传文件均要打包上传,直接上传文件会乱码导致无法使用)

    步骤三:在PyODPS中引用资源,然后开发接口

    ##@resource_reference{"test.zip"}

    import sys

    import os

    root_path =os.path.dirname(os.path.abspath('test.zip'))

    os.system("unzip test.zip-d" + root_path)

    sys.path.append(root_path)

    import pyrfc

    遇到问题:若是需要第三方包较多、包需要安装以及需要SDK等工具时,建议使用虚拟机配置环境后打包上传

    推荐方法:在本地linux虚拟机(不用在意linux版本),安装Anaconda后配置对应的虚拟环境,将虚拟环境的python包整体打包(路径一般为:~/anaconda/envs/env_name/lib/pyyhon3.7/site-packages)

    注意只打包site-packages内的文件,另外如果需要用到其他sdk等工具,可以将文件(X.so)直接放入即可

    实例:中台实现SAP RFC接口调用

    Python环境已经导入系统:包名为pack_for_sap_rfc.zip

    ##@resource_reference{"pack_for_sap_rfc.zip"}

    import sys

    import os

    import pandas as pd

    from odps import ODPS

    from odps.df import DataFrame

    # 运行环境导入

    root_path =os.path.dirname(os.path.abspath('pack_for_sap_rfc.zip'))

    os.system("unzippack_for_sap_rfc.zip -d" + root_path)

    sys.path.append(root_path)

    import pyrfc

    # SAP服务器IP

    ASHOST = 'x.x.x.x'

    # 客户端号

    CLIENT = '800'

    # 系统数字

    SYSNR = '03'

    # 用户名

    USER = 'xxx

    # 密码

    PASSWD = 'xxx'

    # 创建连接

    conn = pyrfc.Connection(ashost=ASHOST, sysnr=SYSNR, client=CLIENT, user=USER, passwd=PASSWD)

    # 调用接口,其中'test'为RFC接口名,其他为传入参数

    result =conn.call('test',

                       I_DATUM_FROM='',

                       I_DATUM_TO='')  

    # 调用时间,传入时间参数              

    ds = args['ds']

    # 获取返回结果中的表

    row_lists = [pd.Series(row) for row in result["ET_CONTRACT"]]

    # 获取列名

    column = result["ET_CONTRACT"][0].keys()

    # 根据列名创建Dataframe

    df = pd.DataFrame(row_lists, columns=column)

    #print(df.head(10))

    # 删除指定分区 方式一

    # t =o.get_table("table_name")

    #t.delete_partition('ds='+ds, if_exists=True)

    # 删除指定分区 方式二

    o.execute_sql("altertable table_name drop if exists partition(ds ='{}')".format(ds))

    # 往指定分区插入数据

    o.write_table("table_name", df.values.tolist(), partition='ds={}'.format(ds),create_partition=True)

    注意:提交周期任务时添加时间参数

  • 相关阅读:
    怎么合并视频?快把这些方法收好
    【批量去除图片的背景】
    算法工程师跳槽至B站被原公司索赔200万,法院二审判决:无需赔偿
    2022年双十一数码产品排名,数码好物选购指南
    Docker学习笔记
    Springboot初始化自动生成数据库表结构
    自定义网页中被选中文本的样式 CSS selection
    Subcontracting分包订单中组件的修改无法记录在item修改记录中的说明
    testcontainers-java 新增对 TiDB 的支持
    优秀的程序员不是你的尽头,而是起点
  • 原文地址:https://blog.csdn.net/hutc_Alan/article/details/132908509