• 在 EMR Serverless 上使用 Delta Lake


    本文是一份开箱即用的全自动脚本,用于在 EMR Serverless 上提交一个 Delta Lake 作业。本文完全遵循《最佳实践:如何优雅地提交一个 Amazon EMR Serverless 作业?》 一文给出的标准和规范!

    1. 导出环境相关变量

    注意: 以下仅为示意值,实操时请根据个人环境替换相关值。

    export APP_NAME="emr-serverless-deltalake-test"
    export APP_S3_HOME="s3://$APP_NAME"
    export APP_LOCAL_HOME="/home/ec2-user/$APP_NAME"
    export EMR_SERVERLESS_APP_ID='00fbfel40ee59k09'
    export EMR_SERVERLESS_EXECUTION_ROLE_ARN='arn:aws:iam::1111111111111:role/EMR_SERVERLESS_ADMIN'
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2. 创建作业专属工作目录和S3存储桶

    mkdir -p $APP_LOCAL_HOME
    aws s3 mb $APP_S3_HOME
    
    • 1
    • 2

    3. 准备作业脚本

    cat << EOF >> $APP_LOCAL_HOME/delta_table.py
    from datetime import datetime
    from pyspark import SparkConf, SparkContext
    from pyspark.sql import HiveContext, SparkSession
    
    spark = SparkSession\
            .builder\
            .appName("Delta-Lake integration demo - create tables")\
            .enableHiveSupport()\
            .getOrCreate()
            
    ## Create a DataFrame
    data =  spark.createDataFrame([("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
    ("101",  "2015-01-01", "2015-01-01T12:14:58.597216Z"),
    ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
    ("103",  "2015-01-01",  "2015-01-01T13:51:40.519832Z")],
    ["id", "creation_date",  "last_update_time"])
    
    spark.sql("""drop table if exists delta_table""")
    
    ## Write a DataFrame as a Delta Lake dataset to the S3  location
    spark.sql("""CREATE  TABLE IF NOT EXISTS delta_table (id string, creation_date string, 
    last_update_time string)
    USING delta location
    's3://$APP_NAME/delta_table'""");
    
    data.writeTo("delta_table").append()
    EOF
    aws s3 cp $APP_LOCAL_HOME/delta_table.py $APP_S3_HOME/delta_table.py
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    4. 准备作业描述文件

    cat << EOF > $APP_LOCAL_HOME/start-job-run.json
    {
        "name":"$APP_NAME",
        "applicationId":"$EMR_SERVERLESS_APP_ID",
        "executionRoleArn":"$EMR_SERVERLESS_EXECUTION_ROLE_ARN",
        "jobDriver":{
            "sparkSubmit":{
                "entryPoint":"s3://$APP_NAME/delta-test.py",
                "sparkSubmitParameters":"--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar,/usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"
            }
       },
       "configurationOverrides":{
            "monitoringConfiguration":{
                "s3MonitoringConfiguration":{
                    "logUri":"$APP_S3_HOME/logs"
                }
            }
       }
    }
    EOF
    jq . $APP_LOCAL_HOME/start-job-run.json
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    5. 提交 & 监控 作业

    export EMR_SERVERLESS_JOB_RUN_ID=$(aws emr-serverless start-job-run \
        --no-paginate --no-cli-pager --output text \
        --name apache-hudi-delta-streamer \
        --application-id $EMR_SERVERLESS_APP_ID \
        --execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \
        --execution-timeout-minutes 0 \
        --cli-input-json file://$APP_LOCAL_HOME/start-job-run.json \
        --query jobRunId) && \
    now=$(date +%s)sec && \
    while true; do
        jobStatus=$(aws emr-serverless get-job-run \
                        --no-paginate --no-cli-pager --output text \
                        --application-id $EMR_SERVERLESS_APP_ID \
                        --job-run-id $EMR_SERVERLESS_JOB_RUN_ID \
                        --query jobRun.state)
        if [ "$jobStatus" = "PENDING" ] || [ "$jobStatus" = "SCHEDULED" ] || [ "$jobStatus" = "RUNNING" ]; then
            for i in {0..5}; do
                echo -ne "\E[33;5m>>> The job [ $EMR_SERVERLESS_JOB_RUN_ID ] state is [ $jobStatus ], duration [ $(date -u --date now-$now +%H:%M:%S) ] ....\r\E[0m"
                sleep 1
            done
        else
            echo -ne "The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]\n\n"
            break
        fi
    done
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    6. 检查错误

    JOB_LOG_HOME=$APP_LOCAL_HOME/log/$EMR_SERVERLESS_JOB_RUN_ID
    rm -rf $JOB_LOG_HOME && mkdir -p $JOB_LOG_HOME
    aws s3 cp --recursive $APP_S3_HOME/logs/applications/$EMR_SERVERLESS_APP_ID/jobs/$EMR_SERVERLESS_JOB_RUN_ID/ $JOB_LOG_HOME >& /dev/null
    gzip -d -r -f $JOB_LOG_HOME >& /dev/null
    grep --color=always -r -i -E 'error|failed|exception' $JOB_LOG_HOME
    
    • 1
    • 2
    • 3
    • 4
    • 5
  • 相关阅读:
    DBC配置SecOC属性
    关于运行flutter app 运行到模拟器出现异常提示
    Debug Information
    ModuleNotFoundError: No module named ‘scripts.animatediff_mm‘ 解决方案
    GaussDB数据库如何创建修改数据库和数据表
    (中)苹果有开源,但又怎样呢?
    分类预测 | Matlab实现KOA-CNN-LSTM-selfAttention多特征分类预测
    vue项目中的 env文件从何而来?什么是 process.env
    空间几何(点线面)知识整理
    质疑标普,理解标普,加入标普
  • 原文地址:https://blog.csdn.net/bluishglc/article/details/133276221