• AWS Glue Pyspark+Athena基础学习汇总


    Pyspark 基础学习汇总篇🍎


    一、AWS 架构

    ① AWS Glue:工作平台,包括脚本的编写以及管理脚本的运行状态以及调度等(主要:数据库配置、ETL和数据转换脚本编写、调度)

    ② Amazon S3 数据湖(数仓):数据的存储

    ③ Athena:(雅典娜)SQL直接编写查询工作台(会产生费用)

    ④ QucikSight:报表展示


    二、QuickSight开户授权操作

    点击用户头像,–管理QuickSight --邀请用户 --通过邮箱 --选择共享文件夹 --指定文件夹选择授权用户


    三、什么是S aas

    SaaS 行业电商、医疗、地产、物流,一般都会用到hubspot

    从产品角度来看,Hubspot有三款产品,Marketing、CRM和Sales,共同实现Inbound Marketing(集客营销)全流程服务。

    其中,Marketing是核心,提供SEO、社交媒体、网页制作及优化、网站评分等工具产品;CRM实现数据可视化,并自动追踪客户行为;Sales作为联系销售人员与客户的工具。


    四、创建临时视图:
    // 创建它的SparkSession对象终止前有效
    df.createOrReplaceTempView("tempViewName")  
    
    // spark应用程序终止前有效
    df.createOrReplaceGlobalTempView("tempViewName")  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    五、删除临时视图:
    spark.catalog.dropTempView("tempViewName")
    spark.catalog.dropGlobalTempView("tempViewName")
    
    • 1
    • 2

    六、基础Demo
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from pyspark.sql.functions import *
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    ## @type: DataSource
    ## @args: [database = "hello", table_name = "hello_public_t_product", transformation_ctx = "datasource0"]
    ## @return: datasource0
    ## @inputs: []
    
    
    import datetime
    from datetime import datetime, timedelta
    from pyspark.sql.types import StringType
    
    today_str = (datetime.now() - timedelta(hours=4, minutes=0)).strftime("%Y-%m-%d")
    today_int = (datetime.now() - timedelta(hours=4, minutes=0)).strftime("%Y%m%d")
    
    
    yesterday_str = (datetime.now() - timedelta(hours=28, minutes=0)).strftime("%Y-%m-%d")
    yesterday_int = (datetime.now() - timedelta(hours=28, minutes=0)).strftime("%Y%m%d")
    
    #连接的库名与表名
    # read data from storage
    contacts = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "hive_t_test", transformation_ctx = "datasource0")
    
    print('start')
    df = contacts.toDF().select("contact_id","dt")
    df.show(5)
    print('end')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    七、Spark2.3.0之后dynamic模式

    Spark配置 spark.sql.sources.partitionOverwriteMode=STATIC //此参数默认为STATIC

    此模式作用域插入数据,如果static模式时,会插入数据前删除其他分区,dynamic模式则不会删除其他分区,只会覆盖那些有数据写入的分区

    spark.conf.set(“spark.sql.sources.partitionOverwriteMode”,“dynamic”)

    八、join注意:

    left right 这些join字段不会覆盖,outer inner 可以覆盖重复字段


    🍌代码编写小知识一

    1.过滤并去重
    contact_df = contact_user_bind.toDF().filter(col('status')=='match').select('user_id','contact_id').dropDuplicates()
    -- dropDuplicates()去重可以根据行或列,也可以保留重复数据,可以指定列去重
    
    • 1
    • 2

    2.数据类型转换withColumn/select/selectExpr/sql
    https://blog.csdn.net/nanfeizhenkuangou/article/details/121802837
    
    • 1

    3.增加一列+时间减法
    product_export_df = product_export_df.withColumn('export_time_us', product_export_df.create_time - expr('INTERVAL 4 HOURS'))
    
    • 1

    4.增加一列+When判断
    store_order_detail_df = store_order_detail_df.withColumn("servicebytrendsi", when((col("fulfillment_service") =='trendsi')|(col("fulfillment_service") =='trendsi-fashion-dropshipping'), 1).otherwise(0))
    
    • 1

    5.表连接(🌟)
    store_order_merge  = store_order_df.join(store_order_detail_df,store_order_df.id == store_order_detail_df.store_order_id,how='left')
    #----------------
    order_detail_total.join(sku_df.select('sku_id','color'),['sku_id'])
    
    • 1
    • 2
    • 3

    6.字符串拼接(字段拼接)
    user_df = user_df.withColumn('user_name',concat(user_df.first_name,lit(" "),user_df.last_name))
    
    • 1

    7.设置日期格式
    user_df = user_df.withColumn('sign_up_date', date_format(user_df.create_time-expr('INTERVAL 4 HOURS'),"MM/dd/yyyy"))
    #------------------------
    this_month  = (datetime.now() - timedelta(hours=4, minutes=0)).strftime("%Y-%m")
    
    • 1
    • 2
    • 3

    8.过滤掉含有空值的行
    store_df = store_df.dropna(subset=['user_id'])
    
    • 1

    9.过滤某列中包含特定值的对象
    order_df = order_df.filter(col('order_type').isin(1,3))
    
    • 1

    10.多列条件&处理
    order_df = order_df.filter((~col('user_id').isin(327,166))&(col('pay_time_us')>'2021-04-01'))
    
    • 1

    11.when…then
    order_df_detail = order_df_detail.withColumn('weight',when((col('weight')<=300)|(col('weight').isNull()),300).otherwise(col('weight')))
    
    • 1

    12.去空+单双引号
    isNotNull()、’单引号标识本身不变 “代表字符串
    
    • 1

    13.多条件判断

    write_dataframe = write_dataframe.withColumn(‘status’,when(col(‘deal_id’).isNull(),lit(“miss”)).when(col(‘user_type’).isNull(),lit(“extra”)).otherwise(‘match’))


    14.日期函数

    – timedelta(时间间隔) :miyaBirthdayNextday=miyaBirthday+datetime.timedelta(days=1)

    – datetime.timedelta(days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0)

    – strftime(日期格式):print(datetime.date.today().strftime(‘%d/%m/%Y’))

    – date_format(日期格式):contacts = contacts.withColumn(‘date’, date_format(col(‘create_time’),“yyyyMMdd”))


    15.正则替换字符串函数

    (27条消息) REGEXP_REPLACE的使用方法__JohnnyChu的博客-CSDN博客_regexp_replace


    16.求差集函数subtract()
    actual = result_mapping.select("user_id").subtract(result_mapping1.select('user_id')).dropDuplicates()
    
    • 1

    17.union/union all并集去重

    write_dataframe = result_mapping1.union(result_mapping2).union(result_mapping3)


    18.开窗函数✨

    注意:导包from pyspark.sql.window ``import Window

    write_dataframe = write_dataframe.withColumn(“rank”, row_number().over(Window.partitionBy(“contact_id”).orderBy(desc(“spu_sale_amount”))))


    19.时间戳格式转换

    create_time >= to_timestamp(‘2022-01-03 00:00:00’, ‘yyyy-mm-dd hh24:mi:ss’)


    20.explode数组转列-数据切分

    columns = [“Seqno”,“Name”]
    data = [(“1”, “john jones”),
    (“2”, “tracey smith”),
    (“3”, “amy sanders”)]

    df = spark.createDataFrame(data=data,schema=columns)
    df.withColumn(“split_name”,explode(split(col(“Name”)," "))).show()


    🍑代码编写小知识二

    21.数据类型转换
    SELECT CONCAT(CAST(round((3/21)*100,3) AS CHAR),'%') as aa from act_canal; 
    
    • 1

    22.通过日期获取周数
    SELECT weekofyear('2022-06-13');
    
    • 1

    23.SparkSQL日期格式转换
     SELECT DATE_FORMAT(NOW(),'yyyy-MM');
     -----mysql-----
     DATE_FORMAT(SYSDATE(),'%Y-%m-%d %H:%i:%s')
     -----postgres-----
     to_timestamp('2022-01-03 00:00:00', 'yyyy-mm-dd hh24:mi:ss')
     
    SELECT TO_CHAR(CURRENT_DATE, 'yyyy-mm-dd');
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    24.日期获取第几周
    SELECT weekofyear('2022-06-13')-23;
    #----------------------------------
    df_student.withColumn("week_of_month", date_format(col("birthday"), "W")).show()
    
    • 1
    • 2
    • 3

    25.groupby多条件分组
    store_order_merge.groupBy(['user_id','year_month','year_last_month']).agg(sum(col('price')*col('num')).alias("total_pay_amount"))
    
    • 1

    26.多条件连表
    write_dataframe = write_dataframe.join(trendsi_sales_china,on=['user_id','year_last_month','year_month'],how='outer')
    #-----
    df1.join(df2, on=[df1['age'] == df2['age'], df1['sex'] == df2['sex']], how='left_outer')
    
    • 1
    • 2
    • 3

    27.数据合并
    df1.unionByName(df2).show()
    
    • 1

    28.日期减月份
    store_order_df = store_order_df.withColumn('last_month_pay_time', col('pay_time') - expr('INTERVAL 1 MONTHS'))
    
    • 1

    29.union+union all+unionByName
    df1.unionByName(df2, allowMissingColumns=True).show():允许拥有不同字段,null值填充
    df1.unionByName(df2).show():此种情况根据字段进行合并(字段数一致)
    
    union: 两个df合并,但是不按列名进行合并,而是位置,列名以前表为准(a.union(b) 列名顺序以a为准),会对重复数据进行去重
    unionAll:同union方法(PySpark 中两者的行为都相同)
    unionByName:合并时按照列名进行合并,而不是位置
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    30.本月的天数+本月当前第几天
    start_day_of_next_month = (datetime.now() + relativedelta(months=1)).replace(day=1)
    start_day_of_this_month = (datetime.now()).replace(day=1)
    
    this_month_days=(start_day_of_next_month-start_day_of_this_month).days+1
    this_month_days_now=(datetime.now() -start_day_of_this_month).days
    
    • 1
    • 2
    • 3
    • 4
    • 5

    31.计数、平均值、标准差、最小值和最大值
    df = spark.createDataFrame(
        [("Bob", 13, 40.3, 150.5), ("Alice", 12, 37.8, 142.3), ("Tom", 11, 44.1, 142.2)],
        ["name", "age", "weight", "height"],
    )
    df.describe(['age']).show()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    32.删除重复行数据
    df.dropDuplicates().show()
    
    • 1

    33.取两集合的交集
    df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
    df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
    df1.intersectAll(df2).sort("C1", "C2").show()
    
    • 1
    • 2
    • 3

    34.UTF函数遍历
    from pyspark.sql.functions import pandas_udf
    df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
    def filter_func(iterator):
        for pdf in iterator:
            yield pdf[pdf.id == 1]
    df.mapInPandas(filter_func, df.schema).show()  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    35.多个DF合并数据(遍历UNION)
    # 数据集必须字段一样,否则无法合并
    buff = []
    for pdfs in [d1, d2,d3]:
          buff.append(pdfs)
    mergeDF = reduce(lambda x,y: x.union(y), buff)
    mergeDF.show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    36.获取当前下周,下月时间
    days7_str = (datetime.now() - timedelta(hours=176, minutes=0)).strftime("%Y-%m-%d")
    days7_int = (datetime.now() - timedelta(hours=176, minutes=0)).strftime("%Y%m%d")
    
    days14_str = (datetime.now() - timedelta(hours=344, minutes=0)).strftime("%Y-%m-%d")
    days14_int = (datetime.now() - timedelta(hours=344, minutes=0)).strftime("%Y%m%d")
    
    days30_str = (datetime.now() - timedelta(hours=728, minutes=0)).strftime("%Y-%m-%d")
    days30_int = (datetime.now() - timedelta(hours=728, minutes=0)).strftime("%Y%m%d")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    37.pyspark开窗函数over()
    #定义分区和排序
    window = Window.partitionBy(['spu_id']).orderBy(['spu_co'])
    #使用rank排序,过滤rank=1,并删除rank列
    sku_df = sku_df.withColumn('rank', rank().over(window)).filter("rank= '1'").drop('rank')
    
    • 1
    • 2
    • 3
    • 4

    38.concat+weekofyear()
    t_order_df = t_order_df.withColumn('year_week',concat(concat(year(col('pay_time')),lit('-')),weekofyear(col('pay_time'))))
    
    • 1

    39.数据类型转换cast()+日期周格式
    china_order_df_week = china_order_df.filter(col('year_month_week')== concat(lit(this_month),lit('-'),lit(weekofyear(current_date()).cast(StringType())),lit("W")))
    
    • 1

    🍉代码编写小知识三

    40.pyspark处理数组数据
    from pyspark.sql.functions import explode_outer
    """ with array """
    df.select(df.name,explode_outer(df.knownLanguages)).show()
    """ with map """
    df.select(df.name,explode_outer(df.properties)).show()
    
    • 1
    • 2
    • 3
    • 4
    • 5

    41.空值判断
    字段.isNotNull()-------------字段.isNull()
    
    • 1

    42.Get String length
    from pyspark.sql.functions import length
    df_books.where(length(col("book_name")) >= 20).show()
    
    • 1
    • 2

    43.row to col✨ 行转列
    From pyspark.sql import functions as F
    keys=['is_jit','after_reason_type','supplier_id']
    column='after_reason'
    Column_value='value'
    Column_value_list=['']
    df.groupby(keys).pivot(column, column_value_list).agg(F.first(column_value),ignore nulls=True)).fillna('*')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    44.读取制定分区数据✨
    # dw_order_cut
    partitionstate = "dt >='2021-01-01'"
    #partitionstate = "dt >= '"+days30_int+"'"
    partitionstate1 = "dt = '"+yesterday_int+"'"
    partitionstate2 = "time_us_hour = '0'"
    partitionstate3 = "dt = '"+cday_int+"'"
    
    source_t_sku = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "hive_t_sku", push_down_predicate = partitionstate1)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    45.日期-七天-30天
    days7_str = (datetime.now() - timedelta(hours=((24*7)+8), minutes=0)).strftime("%Y-%m-%d")
    days7_int = (datetime.now() - timedelta(hours=((24*7)+8), minutes=0)).strftime("%Y%m%d")
    
    day30_str = (datetime.now() - timedelta(hours=((24*30)+8), minutes=0)).strftime("%Y-%m-%d")
    day30_int = (datetime.now() - timedelta(hours=((24*30)+8), minutes=0)).strftime("%Y%m%d")
    
    • 1
    • 2
    • 3
    • 4
    • 5

    46.数据类型to_string()
    df = ps.DataFrame({'col1': [1, 2, 3], 'col2': [4, 5, 6]}, columns=['col1', 'col2'])
    print(df.to_string())
       col1  col2
    0     1     4
    1     2     5
    2     3     6
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    47.字符串截取substr✨substring()
    b=a.withColumn("Sub_Name",a.Name.substr(1,3)).show()
    参考链接:https://zhuanlan.zhihu.com/p/34901943
    df.withColumn('year', substring('date'14))\
      .withColumn('month', substring('date'52))\
      .withColumn('day', substring('date'72))
    
    • 1
    • 2
    • 3
    • 4
    • 5

    48.UDF函数yyyyMMdd–yyyy-MM-dd✨
    from datetime import datetime
    from pyspark.sql.functions import col,udf
    from pyspark.sql.types import DateType
    
    
    rdd = sc.parallelize(['20161231', '20140102', '20151201', '20161124'])
    df1 = sqlContext.createDataFrame(rdd, ['old_col'])
    
    # UDF to convert string to date
    func =  udf (lambda x: datetime.strptime(x, '%Y%m%d'), DateType())
    
    df = df1.withColumn('new_col', date_format(func(col('old_col')), 'MM-dd-yyy'))
    
    df.show()
    
    write_dataframe = write_dataframe.withColumn('date', to_timestamp('dt', 'yyyyMMdd'))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    49.dayofweek()
    #dayofweek:星期天~星期六==== 1~7  (星期天是1,星期一是2	......)
    若改成国人习惯的日期,则需要自行转换,下面给出demo (scala中用when else 来判断)
    //新增days_of_week 当周第几天(按照国人习惯,周一为第一天)
    .withColumn("days_of_week", when(dayofweek(from_unixtime(col("unix_time"), "yyyy-MM-dd")) === 1, 7)
    .otherwise(dayofweek(from_unixtime(col("unix_time"), "yyyy-MM-dd")) -1)
    .cast(LongType))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    50.concat字符串拼接
    user_tag.withColumn('iso_week_agg',concat(col('iso_year'),lit('-'),col('iso_week')))
    相似函数:concat_ws()
    
    • 1
    • 2

    51.countDistinct统计去重
    user_tag.groupby(['iso_week_agg']).agg(countDistinct(col('user_id')).alias('user_reg_number'))
    
    • 1

    52.处理跨年周weekday()
    # 获取数据源
    user_tag = source_user_tag.toDF().select('user_id','create_time')
    user_tag.show(20)
    # get source + etl + year-week 处理跨年周 
    iso_weekday = when(dayofweek('create_time') != 1, dayofweek('create_time')-1).otherwise(7)
    week_from_prev_year = (month('create_time') == 1) & (weekofyear('create_time') > 9)
    week_from_next_year = (month('create_time') == 12) & (weekofyear('create_time') == 1)
    iso_year = when(week_from_prev_year, year('create_time') - 1) \
                .when(week_from_next_year, year('create_time') + 1) \
                .otherwise(year('create_time'))
            
    user_tag = user_tag.withColumn('iso_year', iso_year)
    user_tag = user_tag.withColumn('iso_week', lpad(weekofyear('create_time'), 3, "W0"))
    user_tag = user_tag.withColumn('iso_week_agg',concat(col('iso_year'),lit('-'),col('iso_week')))
    user_tag.show(20)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    53.collect_list 与 collect_set

    pyspark 系列 - collect_list 與 collect_set 實例教學 | Happy Coding Lab (chilunhuang.github.io)

    Collect_list :将数据根据groupby的主字段进行合并成数组,collect_set:会进行去重


    54.Lpad & Rpad
    #### Add both leading and Trailing space
    # 指定col的长度,并指定字符补齐长度
    df_states = df_states.withColumn('states_Name_new', lpad(df_states.state_name,20, '#'))
    df_states = df_states.withColumn('states_Name_new', rpad(df_states.states_Name_new,24, '#'))
    
    df_states.show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    55.replace

    Python pyspark.sql.DataFrame.replace用法及代码示例 - 纯净天空 (vimsky.com)


    56.✨同期群数据分析-- Day(留存率)
    # 同期群分析--Day
    
    # 1.查找每个客户首次购买的时间
    t_order = source_t_order.toDF().select('user_id','create_time')
    t_order = t_order.withColumn('create_time',date_format(col('create_time'),"yyyy-MM-dd")).filter(col('create_time')>'2022-07-01')
    t_order.show(20)
    
    from pyspark.sql.window import Window
    # with min(create_time)
    t_order = t_order.withColumn('FirstPurchaseDate',min('create_time').over(Window.partitionBy('user_id').orderBy(col('create_time'))))
    t_order.show(20)
    
    # 2.查找重复周期(下次下单时间间隔 )
    t_order =  t_order.withColumn('ReturnDays',datediff('create_time','FirstPurchaseDate'))
    t_order.show(20)
    
    # 3.计算具有相同首次日期和重复次数的客户
    t_order = t_order.groupby(['FirstPurchaseDate','ReturnDays']).agg(count('user_id').alias('count_user_id'))
    t_order.show(20)
    
    # 4.获取队列矩阵
    t_order = t_order.groupby('FirstPurchaseDate').pivot('ReturnDays').agg(first('count_user_id')).orderBy('FirstPurchaseDate')
    t_order.show(20)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    57.两个日期减
    # 获取第几天分区
    dt=date_format(
    date_add('Day'-8current_timestamp),'%Y%m%d'# Athen + Glue + MySQL
    date_diff(create_time,delivery_time)<=7 or date_diff('day',create_time,delivery_time)<=7
    
    #pgsql
    date_part('day',create_time-delivery_time)<=7
    
    # pgsql 日期加减
    to_char(delivery_time + INTERVAL '7 day'-INTERVAL '4 hour','yyyy-mm-dd')='2022-07-18'
    
    # Athena 日期加减
    SELECT date_add('day', 1, timestamp '2022-11-30 10:30:00') AS new_date;
    SELECT date_format(date_parse('20230721', '%Y%m%d') - INTERVAL '20' DAY, '%Y%m%d') AS new_date;
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    58.字符串截取SUBSTR
    b=a.withColumn("Sub_Name",a.Name.substr(1,3)).show(): 第一个字符截取三个
    
    • 1

    59.over()开窗 rank()/dense_rank()/row_number()
    # over(partition  by  order  by) 
    from pyspark.sql.window import Window
    import pyspark.sql.functions as F
    
    windowSpec  = Window.partitionBy("iso_week_agg").orderBy(F.asc("pay_iso_week_agg"))
    order_df = order_df.withColumn("dense_rank", F.dense_rank().over(windowSpec))
    order_df.show(50)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    🍇代码编写小知识一

    60.add column null
    test = test.withColumn('order_id',lit(None))
    
    • 1

    61.明细写入数据库命令
    t_sku_predict_7days.write.format('jdbc').options(url='jdbc:mysql://prod-trendsi-bi-mysql.cluster-cctv6bkiavlh.us-west-1.rds.amazonaws.com:3306/DW',driver='com.mysql.jdbc.Driver',dbtable='lkb_test_01',user='admin',password='8S5u0tWH2vQCz9bbpn2B').mode('overwrite').save()
    
    
    • 1
    • 2

    62.常用pyspark函数 ceil …
    https://blog.csdn.net/weixin_39998521/article/details/110598705
    
    • 1

    63.get_json_object
    #transform sku data
    sku_df = source_t_sku.toDF().filter(col('deleted')==0).select(col('id').alias('sku_id'),'sku_code',col('spu_id').alias('spu_id'),'cogs_price','image',col('dimension_values').alias('property'),'barcode','third_scan_code')
    sku_df = sku_df.withColumn('color', get_json_object(col("property"),"$.Color").alias("color"))
    
    • 1
    • 2
    • 3

    64.pgsql 日期格式转化
    to_char(date,varchar) select to_char(current_date - 1,'YYYY-MM-dd')
    to_date('2012-05-01 23:59:59','yyyy-mm-dd hh24:mi:ss') char ,char
    to_timestamp(char,char) 
    select cast(to_char(current_date - 1,'YYYY-MM-dd') AS DATE)
    
    cloudquery
    select
    pay_time,spu_id,sum(num)
    from
    lkb_test_skc1 where spu_id=27490 and  date_format(pay_time,'%Y-%m-%d')<date'2022-09-08' and date_format(pay_time,'%Y-%m-%d') >date'2021-04-01' group by pay_time,spu_id
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    65.split切分字段
    sku_df = sku_df.withColumn("spu_co", split(col("barcode"), "-").getItem(0))
    
    • 1

    66.Pyspark 之 SparkSQL Demo(读S3并更新Athena映射元信息)
    #此方式作废,直接使用AWS GLue 的爬网程序Craler可以直接实现
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from pyspark.sql.functions import *
    import pymysql
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    ## @type: DataSource
    ## @args: [database = "hello", table_name = "hello_public_t_product", transformation_ctx = "datasource0"]
    ## @return: datasource0
    ## @inputs: []
    
    import datetime
    from datetime import datetime, timedelta
    
    today_str = (datetime.now() - timedelta(hours=4, minutes=0)).strftime("%Y-%m-%d")
    today_int = (datetime.now() - timedelta(hours=4, minutes=0)).strftime("%Y%m%d")
    
    yesterday_str = (datetime.now() - timedelta(hours=28, minutes=0)).strftime("%Y-%m-%d")
    yesterday_int = (datetime.now() - timedelta(hours=28, minutes=0)).strftime("%Y%m%d")
    
    
    ''' --- 读分区表---
    # 读取 parquet 文件
    PATH = "s3://xxxxxxx/xxxxx/xxxxxx/"
    df = spark.read.parquet(PATH +"dt="+yesterday_int+"/*")
    df = df.withColumn('dt',lit(yesterday_int))
    # 展示 DataFrame
    df.show(20)
    
    print('begin')
    
    
    DataSink5 = glueContext.getSink(
        path = PATH, 
        connection_type = "s3", 
        updateBehavior = "UPDATE_IN_DATABASE", 
        partitionKeys = ["dt"], 
        enableUpdateCatalog = True, 
        transformation_ctx = "DataSink5")
    
    DataSink5.setCatalogInfo(
        catalogDatabase = "ods",
        catalogTableName = "ods_t_sku")
    
    from awsglue.dynamicframe import DynamicFrame
    
    NewDynamicFrame = DynamicFrame.fromDF(df, glueContext, "nested")
    DataSink5.setFormat("glueparquet")
    DataSink5.writeFrame(NewDynamicFrame)
    
    print('end')
    '''
    
    
    
    # --- 不分区 ----
    # 读取 parquet 文件
    PATH = "s3://xxxxxx/xxx/xx/xxxxxxx/"
    df = spark.read.parquet(PATH +"*")
    
    # 展示 DataFrame
    df.show(20)
    
    print('begin')
    
    
    DataSink5 = glueContext.getSink(
        path = PATH, 
        connection_type = "s3", 
        updateBehavior = "UPDATE_IN_DATABASE", 
        enableUpdateCatalog = True, 
        transformation_ctx = "DataSink5")
    
    DataSink5.setCatalogInfo(
        catalogDatabase = "ods",
        catalogTableName = "ods_t_carton")
    
    from awsglue.dynamicframe import DynamicFrame
    
    NewDynamicFrame = DynamicFrame.fromDF(df, glueContext, "nested")
    DataSink5.setFormat("glueparquet")
    DataSink5.writeFrame(NewDynamicFrame)
    
    print('end')
    
    
    # query = '''
    #     SELECT *
    #     FROM hello.xxxxxx
    #     LIMIT 5;
    # '''
    # 
    # result = spark.sql(query)
    # result.show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106

    67.获取字段数据类型
    #Get datatype of birthday 
    columndf_student.select(``"birthday"``).dtypes
    
    • 1
    • 2

    68.数据类型转换dtypes(改变col类型)
    df = df.withColumn("kills",df.kills.astype("int"))
    df.select("kills").dtypes
    
    • 1
    • 2

    69.lag & lead
    https://zhuanlan.zhihu.com/p/202969159
    
    • 1

    70.实现对列的累积求和
    https://blog.csdn.net/XnCSD/article/details/90676259
    
    • 1

    71.Python通知脚本
    import sys
    import smtplib
    import datetime
    from email.mime.text import MIMEText
    
    # 邮件设置
    mail_host = "smtp.163.com" # 邮箱服务器地址
    mail_port = 465            # 邮箱服务器端口
    mail_user = "xxxxxxxx@163.com"
    mail_pass = "xxxxxxxxxx"
    mail_to = ["xxxxxxxx@qq.com","xxxxxxx@qq.com"] # 多个收件人使用列表
    
    # 邮件内容
    now = datetime.datetime.now()
    now = now + datetime.timedelta(hours=8) # 将当前时间增加8小时= cn time
    print(now)
    
    if now.hour >= 0 and now.hour < 2:
        msg = MIMEText("xxxxxxxx") # 邮件正文
    elif now.hour >= 10 and now.hour < 12:
        msg = MIMEText("xxxxxxx") # 邮件正文
    else:
        msg = MIMEText("xxxxxxx") # 邮件正文
     # 邮件正文
    msg['From'] = mail_user
    msg['To'] = ",".join(mail_to) # 将多个收件人邮箱地址用逗号连接
    msg['Subject'] = "随便"
    
    # 发送邮件
    smtp = smtplib.SMTP_SSL(mail_host, mail_port)
    smtp.login(mail_user, mail_pass)
    smtp.sendmail(mail_user, mail_to, msg.as_string())
    smtp.quit()
    print('success')
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    72.dropna删除含有null的行数据
    https://blog.csdn.net/qq_39954916/article/details/120584754
    
    • 1

    73.quicksight-string-date🪐
    parseDate(concat(substring({dt},1,4),'-',substring({dt},5,2),'-',substring({dt},7,2)),'yyyy-MM-dd')
    
    • 1

    74.Pyspark解析URL
    t_traffic_detail = t_traffic_detail.withColumn('utm_campaign' ,expr("parse_url(url,'QUERY', 'utm_campaign')"))
    
    • 1

    75.pyspark 按条件读取Mysql表部分数据,提高效率✨
    print('start')
    #store_product = glueContext.create_dynamic_frame.from_catalog(database = "hello", table_name = "trendsi_public_t_store_product", transformation_ctx = "datasource0")
    
    query= "(select * from t_test where update_time >= (date(now()) - interval  '1 day' +interval '4 hour') and update_time <(date(now()) +interval '4 hour')) as testresult"
    store_product = spark.read.format("jdbc").option("url", "jdbc:postgresql:xxxxxxxxxx").option("driver", "org.postgresql.Driver").option("dbtable", query).option("user", "xxxxxx").option("password", "xxxxxxxxxx").load()
    print('success')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    76.Athena-explode(unnest)🪐
    -- 1 可供直接测试使用
    WITH dataset AS (
      SELECT
        'engineering' as department,
        ARRAY['Sharon', 'John', 'Bob', 'Sally'] as users
    )
    SELECT department, names FROM dataset
    CROSS JOIN UNNEST(users) as t(names)
    -- 2.实例演示
    select user_id,source from 
    (select user_id,split(custom_source,';') cs from hive_dw_user_new_tag where length(custom_source)>12 limit 20)
    cross join unnest(cs) as t(source)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    77.intersection交集
    x = sc.parallelize(['A','A','B'])
    y = sc.parallelize(['A','C','D'])
    z = x.intersection(y)
    print(x.collect())
    print(y.collect())
    print(z.collect())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    78.数组中取单个元素
    #增加图片
    wms_abnormal = source_wms_abnormal.toDF().select('id','image')
    wms_abnormal = wms_abnormal.withColumn('image', regexp_replace('image', '([\[\]"]+)', ''))
    wms_abnormal = wms_abnormal.withColumn("split_image",split(col("image"),","))
    wms_abnormal = wms_abnormal.withColumn('image1', col('split_image').getItem(0)) 
    wms_abnormal = wms_abnormal.withColumn('image2',  col('split_image').getItem(1))
    wms_abnormal = wms_abnormal.withColumn('image3',  col('split_image').getItem(2))
    wms_abnormal = wms_abnormal.drop('split_image')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    79.重复记录归一行
    from pyspark.sql.functions import collect_list
    
    shopify_store_url = left_join.groupby('user_id').agg(collect_list('domain').alias("shopify_store_url"))
    
    shopify_store_url = shopify_store_url.withColumn("shopify_store_url",concat_ws(",",col("shopify_store_url")))
    
    • 1
    • 2
    • 3
    • 4
    • 5

    🧆代码编写小知识四

    80.Glue read S3 to DataFrame🪐
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    import pyspark
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType,StructField, StringType, IntegerType
    from pyspark.sql.functions import *
    
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    ## @type: DataSource
    ## @args: [database = "prod-bi", table_name = "biods_t_user", transformation_ctx = "datasource0"]
    ## @return: datasource0
    ## @inputs: []
    
    
    import datetime
    from datetime import datetime, timedelta
    from pyspark.sql.types import StringType
    
    
    recover_day =datetime.now()- timedelta(hours=4+24*0, minutes=0)
    
    today_str = (recover_day).strftime("%Y-%m-%d")
    today_int = (recover_day).strftime("%Y%m%d")
    
    yesterday_str = (recover_day - timedelta(hours=24, minutes=0)).strftime("%Y-%m-%d")
    yesterday_int = (recover_day - timedelta(hours=24, minutes=0)).strftime("%Y%m%d")
    
    print('start')
    
    from pyspark.sql import SparkSession
    
    df = spark.read.options(header='True', inferSchema='True', delimiter='&') \
      .csv("s3://xxxxxx/xx/xxx-emp")
    
    df.printSchema()
    df.show(20)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    81.Cast to DecimalType
    order_df = order_df.withColumn('us_delivery_cost',when((col('package_weight').isNull())&(col('logistics_company_id').isin(1132,25)),lit(10.95).cast(DecimalType(10,2))).otherwise(col('us_delivery_cost')))
    
    
    • 1
    • 2

    82.Athena url处理函数✨
    select url_extract_parameter(url,'curPage') from traffic.xxxxxx
    
    • 1

    83.Athena 提取数组元素函数✨
    select case  when cardinality(tmp)=3 then concat(tmp[1],tmp[2],tmp[3]) 
                 when cardinality(tmp)=2 then concat(tmp[1],tmp[2]) 
                 when cardinality(tmp)=1 then tmp[1] end category123  from
    (select split(event_value, '/') tmp from traffic.xxxxx where  event_target='page' and event_type='imp' and page_code='product_listing_page'   limit 20) a
    
    • 1
    • 2
    • 3
    • 4

    84.pyspark逻辑取反
    order_df = order_df.filter((~col('user_id').isin(327,166))&(col('pay_time')>'2021-04-01')&(col('pay_time')<today_str))
    
    
    • 1
    • 2

    85.创建临时DataFrame
    from pyspark.sql.functions import last_day
    
    df = spark.createDataFrame([('1997-02-10',)], ['d'])
    df.select(last_day(df.d).alias('date')).show()
    
    • 1
    • 2
    • 3
    • 4

    86、QS数据时间限制(通过SQL)
    select distinct * from report_inventory_kpi where  (dt >= date_format(add_months(current_timestamp,-2),'yyyyMMdd'))
    
    • 1

    87、关于pyspark中weekofyear
    pyspark3.1.0之后weekofyear默认周日至周六,之前默认是周一周日,如果需要制定周一是一周的开始,则firstDayOfWeek=2
    from pyspark.sql.functions import weekofyear
    
    df.select(weekofyear('date', firstDayOfWeek=2)) # 显示指定 firstDayOfWeek 参数为 2
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    88、Athena获取ISO的年-周
    select concat(date_format(date_add('day', -1 ,date '2023-01-03'), '%x-%v'),'W')
    
    • 1

    持续更新中…🎈

  • 相关阅读:
    一起学数据结构(6)——栈和队列
    Javascript笔记 rest VS spread
    场馆“智慧化”是否有必要?
    TPU演进十年:Google的十大经验教训
    软件测试必须知道的精华总结
    【调优】大数据常见 Join 的使用场景
    阿里云研发工程师刘睿:阿里云消息生态及最佳实践
    【DLY-310端子排型电流继电器】
    “入职 半 年,那个高薪挖来的自动化测试工程师被劝退了。”
    JAVA 获得特定格式时间
  • 原文地址:https://blog.csdn.net/llAl_lAll/article/details/132790225