① AWS Glue:工作平台,包括脚本的编写以及管理脚本的运行状态以及调度等(主要:数据库配置、ETL和数据转换脚本编写、调度)
② Amazon S3 数据湖(数仓):数据的存储
③ Athena:(雅典娜)SQL直接编写查询工作台(会产生费用)
④ QucikSight:报表展示
点击用户头像,–管理QuickSight --邀请用户 --通过邮箱 --选择共享文件夹 --指定文件夹选择授权用户
SaaS 行业电商、医疗、地产、物流,一般都会用到hubspot
从产品角度来看,Hubspot有三款产品,Marketing、CRM和Sales,共同实现Inbound Marketing(集客营销)全流程服务。
其中,Marketing是核心,提供SEO、社交媒体、网页制作及优化、网站评分等工具产品;CRM实现数据可视化,并自动追踪客户行为;Sales作为联系销售人员与客户的工具。
// 创建它的SparkSession对象终止前有效
df.createOrReplaceTempView("tempViewName")
// spark应用程序终止前有效
df.createOrReplaceGlobalTempView("tempViewName")
spark.catalog.dropTempView("tempViewName")
spark.catalog.dropGlobalTempView("tempViewName")
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "hello", table_name = "hello_public_t_product", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
import datetime
from datetime import datetime, timedelta
from pyspark.sql.types import StringType
today_str = (datetime.now() - timedelta(hours=4, minutes=0)).strftime("%Y-%m-%d")
today_int = (datetime.now() - timedelta(hours=4, minutes=0)).strftime("%Y%m%d")
yesterday_str = (datetime.now() - timedelta(hours=28, minutes=0)).strftime("%Y-%m-%d")
yesterday_int = (datetime.now() - timedelta(hours=28, minutes=0)).strftime("%Y%m%d")
#连接的库名与表名
# read data from storage
contacts = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "hive_t_test", transformation_ctx = "datasource0")
print('start')
df = contacts.toDF().select("contact_id","dt")
df.show(5)
print('end')
Spark配置 spark.sql.sources.partitionOverwriteMode=STATIC //此参数默认为STATIC
此模式作用域插入数据,如果static模式时,会插入数据前删除其他分区,dynamic模式则不会删除其他分区,只会覆盖那些有数据写入的分区
spark.conf.set(“spark.sql.sources.partitionOverwriteMode”,“dynamic”)
left right 这些join字段不会覆盖,outer inner 可以覆盖重复字段
contact_df = contact_user_bind.toDF().filter(col('status')=='match').select('user_id','contact_id').dropDuplicates()
-- dropDuplicates()去重可以根据行或列,也可以保留重复数据,可以指定列去重
https://blog.csdn.net/nanfeizhenkuangou/article/details/121802837
product_export_df = product_export_df.withColumn('export_time_us', product_export_df.create_time - expr('INTERVAL 4 HOURS'))
store_order_detail_df = store_order_detail_df.withColumn("servicebytrendsi", when((col("fulfillment_service") =='trendsi')|(col("fulfillment_service") =='trendsi-fashion-dropshipping'), 1).otherwise(0))
store_order_merge = store_order_df.join(store_order_detail_df,store_order_df.id == store_order_detail_df.store_order_id,how='left')
#----------------
order_detail_total.join(sku_df.select('sku_id','color'),['sku_id'])
user_df = user_df.withColumn('user_name',concat(user_df.first_name,lit(" "),user_df.last_name))
user_df = user_df.withColumn('sign_up_date', date_format(user_df.create_time-expr('INTERVAL 4 HOURS'),"MM/dd/yyyy"))
#------------------------
this_month = (datetime.now() - timedelta(hours=4, minutes=0)).strftime("%Y-%m")
store_df = store_df.dropna(subset=['user_id'])
order_df = order_df.filter(col('order_type').isin(1,3))
order_df = order_df.filter((~col('user_id').isin(327,166))&(col('pay_time_us')>'2021-04-01'))
order_df_detail = order_df_detail.withColumn('weight',when((col('weight')<=300)|(col('weight').isNull()),300).otherwise(col('weight')))
isNotNull()、’单引号标识本身不变 “代表字符串
write_dataframe = write_dataframe.withColumn(‘status’,when(col(‘deal_id’).isNull(),lit(“miss”)).when(col(‘user_type’).isNull(),lit(“extra”)).otherwise(‘match’))
– timedelta(时间间隔) :miyaBirthdayNextday=miyaBirthday+datetime.timedelta(days=1)
– datetime.timedelta(days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0)
– strftime(日期格式):print(datetime.date.today().strftime(‘%d/%m/%Y’))
– date_format(日期格式):contacts = contacts.withColumn(‘date’, date_format(col(‘create_time’),“yyyyMMdd”))
(27条消息) REGEXP_REPLACE的使用方法__JohnnyChu的博客-CSDN博客_regexp_replace
actual = result_mapping.select("user_id").subtract(result_mapping1.select('user_id')).dropDuplicates()
write_dataframe = result_mapping1.union(result_mapping2).union(result_mapping3)
注意:导包from
pyspark.sql.window ``import
Window
write_dataframe = write_dataframe.withColumn(“rank”, row_number().over(Window.partitionBy(“contact_id”).orderBy(desc(“spu_sale_amount”))))
create_time >= to_timestamp(‘2022-01-03 00:00:00’, ‘yyyy-mm-dd hh24:mi:ss’)
columns = [“Seqno”,“Name”]
data = [(“1”, “john jones”),
(“2”, “tracey smith”),
(“3”, “amy sanders”)]df = spark.createDataFrame(data=data,schema=columns)
df.withColumn(“split_name”,explode(split(col(“Name”)," "))).show()
SELECT CONCAT(CAST(round((3/21)*100,3) AS CHAR),'%') as aa from act_canal;
SELECT weekofyear('2022-06-13');
SELECT DATE_FORMAT(NOW(),'yyyy-MM');
-----mysql-----
DATE_FORMAT(SYSDATE(),'%Y-%m-%d %H:%i:%s')
-----postgres-----
to_timestamp('2022-01-03 00:00:00', 'yyyy-mm-dd hh24:mi:ss')
SELECT TO_CHAR(CURRENT_DATE, 'yyyy-mm-dd');
SELECT weekofyear('2022-06-13')-23;
#----------------------------------
df_student.withColumn("week_of_month", date_format(col("birthday"), "W")).show()
store_order_merge.groupBy(['user_id','year_month','year_last_month']).agg(sum(col('price')*col('num')).alias("total_pay_amount"))
write_dataframe = write_dataframe.join(trendsi_sales_china,on=['user_id','year_last_month','year_month'],how='outer')
#-----
df1.join(df2, on=[df1['age'] == df2['age'], df1['sex'] == df2['sex']], how='left_outer')
df1.unionByName(df2).show()
store_order_df = store_order_df.withColumn('last_month_pay_time', col('pay_time') - expr('INTERVAL 1 MONTHS'))
df1.unionByName(df2, allowMissingColumns=True).show():允许拥有不同字段,null值填充
df1.unionByName(df2).show():此种情况根据字段进行合并(字段数一致)
union: 两个df合并,但是不按列名进行合并,而是位置,列名以前表为准(a.union(b) 列名顺序以a为准),会对重复数据进行去重
unionAll:同union方法(PySpark 中两者的行为都相同)
unionByName:合并时按照列名进行合并,而不是位置
start_day_of_next_month = (datetime.now() + relativedelta(months=1)).replace(day=1)
start_day_of_this_month = (datetime.now()).replace(day=1)
this_month_days=(start_day_of_next_month-start_day_of_this_month).days+1
this_month_days_now=(datetime.now() -start_day_of_this_month).days
df = spark.createDataFrame(
[("Bob", 13, 40.3, 150.5), ("Alice", 12, 37.8, 142.3), ("Tom", 11, 44.1, 142.2)],
["name", "age", "weight", "height"],
)
df.describe(['age']).show()
df.dropDuplicates().show()
df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
df1.intersectAll(df2).sort("C1", "C2").show()
from pyspark.sql.functions import pandas_udf
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, df.schema).show()
# 数据集必须字段一样,否则无法合并
buff = []
for pdfs in [d1, d2,d3]:
buff.append(pdfs)
mergeDF = reduce(lambda x,y: x.union(y), buff)
mergeDF.show()
days7_str = (datetime.now() - timedelta(hours=176, minutes=0)).strftime("%Y-%m-%d")
days7_int = (datetime.now() - timedelta(hours=176, minutes=0)).strftime("%Y%m%d")
days14_str = (datetime.now() - timedelta(hours=344, minutes=0)).strftime("%Y-%m-%d")
days14_int = (datetime.now() - timedelta(hours=344, minutes=0)).strftime("%Y%m%d")
days30_str = (datetime.now() - timedelta(hours=728, minutes=0)).strftime("%Y-%m-%d")
days30_int = (datetime.now() - timedelta(hours=728, minutes=0)).strftime("%Y%m%d")
#定义分区和排序
window = Window.partitionBy(['spu_id']).orderBy(['spu_co'])
#使用rank排序,过滤rank=1,并删除rank列
sku_df = sku_df.withColumn('rank', rank().over(window)).filter("rank= '1'").drop('rank')
t_order_df = t_order_df.withColumn('year_week',concat(concat(year(col('pay_time')),lit('-')),weekofyear(col('pay_time'))))
china_order_df_week = china_order_df.filter(col('year_month_week')== concat(lit(this_month),lit('-'),lit(weekofyear(current_date()).cast(StringType())),lit("W")))
from pyspark.sql.functions import explode_outer
""" with array """
df.select(df.name,explode_outer(df.knownLanguages)).show()
""" with map """
df.select(df.name,explode_outer(df.properties)).show()
字段.isNotNull()-------------字段.isNull()
from pyspark.sql.functions import length
df_books.where(length(col("book_name")) >= 20).show()
From pyspark.sql import functions as F
keys=['is_jit','after_reason_type','supplier_id']
column='after_reason'
Column_value='value'
Column_value_list=['']
df.groupby(keys).pivot(column, column_value_list).agg(F.first(column_value),ignore nulls=True)).fillna('*')
# dw_order_cut
partitionstate = "dt >='2021-01-01'"
#partitionstate = "dt >= '"+days30_int+"'"
partitionstate1 = "dt = '"+yesterday_int+"'"
partitionstate2 = "time_us_hour = '0'"
partitionstate3 = "dt = '"+cday_int+"'"
source_t_sku = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "hive_t_sku", push_down_predicate = partitionstate1)
days7_str = (datetime.now() - timedelta(hours=((24*7)+8), minutes=0)).strftime("%Y-%m-%d")
days7_int = (datetime.now() - timedelta(hours=((24*7)+8), minutes=0)).strftime("%Y%m%d")
day30_str = (datetime.now() - timedelta(hours=((24*30)+8), minutes=0)).strftime("%Y-%m-%d")
day30_int = (datetime.now() - timedelta(hours=((24*30)+8), minutes=0)).strftime("%Y%m%d")
df = ps.DataFrame({'col1': [1, 2, 3], 'col2': [4, 5, 6]}, columns=['col1', 'col2'])
print(df.to_string())
col1 col2
0 1 4
1 2 5
2 3 6
b=a.withColumn("Sub_Name",a.Name.substr(1,3)).show()
参考链接:https://zhuanlan.zhihu.com/p/34901943
df.withColumn('year', substring('date', 1,4))\
.withColumn('month', substring('date', 5,2))\
.withColumn('day', substring('date', 7,2))
from datetime import datetime
from pyspark.sql.functions import col,udf
from pyspark.sql.types import DateType
rdd = sc.parallelize(['20161231', '20140102', '20151201', '20161124'])
df1 = sqlContext.createDataFrame(rdd, ['old_col'])
# UDF to convert string to date
func = udf (lambda x: datetime.strptime(x, '%Y%m%d'), DateType())
df = df1.withColumn('new_col', date_format(func(col('old_col')), 'MM-dd-yyy'))
df.show()
write_dataframe = write_dataframe.withColumn('date', to_timestamp('dt', 'yyyyMMdd'))
#dayofweek:星期天~星期六==== 1~7 (星期天是1,星期一是2 ......)
若改成国人习惯的日期,则需要自行转换,下面给出demo (scala中用when else 来判断)
//新增days_of_week 当周第几天(按照国人习惯,周一为第一天)
.withColumn("days_of_week", when(dayofweek(from_unixtime(col("unix_time"), "yyyy-MM-dd")) === 1, 7)
.otherwise(dayofweek(from_unixtime(col("unix_time"), "yyyy-MM-dd")) -1)
.cast(LongType))
user_tag.withColumn('iso_week_agg',concat(col('iso_year'),lit('-'),col('iso_week')))
相似函数:concat_ws()
user_tag.groupby(['iso_week_agg']).agg(countDistinct(col('user_id')).alias('user_reg_number'))
# 获取数据源
user_tag = source_user_tag.toDF().select('user_id','create_time')
user_tag.show(20)
# get source + etl + year-week 处理跨年周
iso_weekday = when(dayofweek('create_time') != 1, dayofweek('create_time')-1).otherwise(7)
week_from_prev_year = (month('create_time') == 1) & (weekofyear('create_time') > 9)
week_from_next_year = (month('create_time') == 12) & (weekofyear('create_time') == 1)
iso_year = when(week_from_prev_year, year('create_time') - 1) \
.when(week_from_next_year, year('create_time') + 1) \
.otherwise(year('create_time'))
user_tag = user_tag.withColumn('iso_year', iso_year)
user_tag = user_tag.withColumn('iso_week', lpad(weekofyear('create_time'), 3, "W0"))
user_tag = user_tag.withColumn('iso_week_agg',concat(col('iso_year'),lit('-'),col('iso_week')))
user_tag.show(20)
pyspark 系列 - collect_list 與 collect_set 實例教學 | Happy Coding Lab (chilunhuang.github.io)
Collect_list :将数据根据groupby的主字段进行合并成数组,collect_set:会进行去重
#### Add both leading and Trailing space
# 指定col的长度,并指定字符补齐长度
df_states = df_states.withColumn('states_Name_new', lpad(df_states.state_name,20, '#'))
df_states = df_states.withColumn('states_Name_new', rpad(df_states.states_Name_new,24, '#'))
df_states.show()
Python pyspark.sql.DataFrame.replace用法及代码示例 - 纯净天空 (vimsky.com)
# 同期群分析--Day
# 1.查找每个客户首次购买的时间
t_order = source_t_order.toDF().select('user_id','create_time')
t_order = t_order.withColumn('create_time',date_format(col('create_time'),"yyyy-MM-dd")).filter(col('create_time')>'2022-07-01')
t_order.show(20)
from pyspark.sql.window import Window
# with min(create_time)
t_order = t_order.withColumn('FirstPurchaseDate',min('create_time').over(Window.partitionBy('user_id').orderBy(col('create_time'))))
t_order.show(20)
# 2.查找重复周期(下次下单时间间隔 )
t_order = t_order.withColumn('ReturnDays',datediff('create_time','FirstPurchaseDate'))
t_order.show(20)
# 3.计算具有相同首次日期和重复次数的客户
t_order = t_order.groupby(['FirstPurchaseDate','ReturnDays']).agg(count('user_id').alias('count_user_id'))
t_order.show(20)
# 4.获取队列矩阵
t_order = t_order.groupby('FirstPurchaseDate').pivot('ReturnDays').agg(first('count_user_id')).orderBy('FirstPurchaseDate')
t_order.show(20)
# 获取第几天分区
dt=date_format(
date_add('Day',-8,current_timestamp),'%Y%m%d')
# Athen + Glue + MySQL
date_diff(create_time,delivery_time)<=7 or date_diff('day',create_time,delivery_time)<=7
#pgsql
date_part('day',create_time-delivery_time)<=7
# pgsql 日期加减
to_char(delivery_time + INTERVAL '7 day'-INTERVAL '4 hour','yyyy-mm-dd')='2022-07-18'
# Athena 日期加减
SELECT date_add('day', 1, timestamp '2022-11-30 10:30:00') AS new_date;
SELECT date_format(date_parse('20230721', '%Y%m%d') - INTERVAL '20' DAY, '%Y%m%d') AS new_date;
b=a.withColumn("Sub_Name",a.Name.substr(1,3)).show(): 第一个字符截取三个
# over(partition by order by)
from pyspark.sql.window import Window
import pyspark.sql.functions as F
windowSpec = Window.partitionBy("iso_week_agg").orderBy(F.asc("pay_iso_week_agg"))
order_df = order_df.withColumn("dense_rank", F.dense_rank().over(windowSpec))
order_df.show(50)
test = test.withColumn('order_id',lit(None))
t_sku_predict_7days.write.format('jdbc').options(url='jdbc:mysql://prod-trendsi-bi-mysql.cluster-cctv6bkiavlh.us-west-1.rds.amazonaws.com:3306/DW',driver='com.mysql.jdbc.Driver',dbtable='lkb_test_01',user='admin',password='8S5u0tWH2vQCz9bbpn2B').mode('overwrite').save()
https://blog.csdn.net/weixin_39998521/article/details/110598705
#transform sku data
sku_df = source_t_sku.toDF().filter(col('deleted')==0).select(col('id').alias('sku_id'),'sku_code',col('spu_id').alias('spu_id'),'cogs_price','image',col('dimension_values').alias('property'),'barcode','third_scan_code')
sku_df = sku_df.withColumn('color', get_json_object(col("property"),"$.Color").alias("color"))
to_char(date,varchar) select to_char(current_date - 1,'YYYY-MM-dd')
to_date('2012-05-01 23:59:59','yyyy-mm-dd hh24:mi:ss') char ,char
to_timestamp(char,char)
select cast(to_char(current_date - 1,'YYYY-MM-dd') AS DATE)
cloudquery
select
pay_time,spu_id,sum(num)
from
lkb_test_skc1 where spu_id=27490 and date_format(pay_time,'%Y-%m-%d')<date'2022-09-08' and date_format(pay_time,'%Y-%m-%d') >date'2021-04-01' group by pay_time,spu_id
sku_df = sku_df.withColumn("spu_co", split(col("barcode"), "-").getItem(0))
#此方式作废,直接使用AWS GLue 的爬网程序Craler可以直接实现
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
import pymysql
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "hello", table_name = "hello_public_t_product", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
import datetime
from datetime import datetime, timedelta
today_str = (datetime.now() - timedelta(hours=4, minutes=0)).strftime("%Y-%m-%d")
today_int = (datetime.now() - timedelta(hours=4, minutes=0)).strftime("%Y%m%d")
yesterday_str = (datetime.now() - timedelta(hours=28, minutes=0)).strftime("%Y-%m-%d")
yesterday_int = (datetime.now() - timedelta(hours=28, minutes=0)).strftime("%Y%m%d")
''' --- 读分区表---
# 读取 parquet 文件
PATH = "s3://xxxxxxx/xxxxx/xxxxxx/"
df = spark.read.parquet(PATH +"dt="+yesterday_int+"/*")
df = df.withColumn('dt',lit(yesterday_int))
# 展示 DataFrame
df.show(20)
print('begin')
DataSink5 = glueContext.getSink(
path = PATH,
connection_type = "s3",
updateBehavior = "UPDATE_IN_DATABASE",
partitionKeys = ["dt"],
enableUpdateCatalog = True,
transformation_ctx = "DataSink5")
DataSink5.setCatalogInfo(
catalogDatabase = "ods",
catalogTableName = "ods_t_sku")
from awsglue.dynamicframe import DynamicFrame
NewDynamicFrame = DynamicFrame.fromDF(df, glueContext, "nested")
DataSink5.setFormat("glueparquet")
DataSink5.writeFrame(NewDynamicFrame)
print('end')
'''
# --- 不分区 ----
# 读取 parquet 文件
PATH = "s3://xxxxxx/xxx/xx/xxxxxxx/"
df = spark.read.parquet(PATH +"*")
# 展示 DataFrame
df.show(20)
print('begin')
DataSink5 = glueContext.getSink(
path = PATH,
connection_type = "s3",
updateBehavior = "UPDATE_IN_DATABASE",
enableUpdateCatalog = True,
transformation_ctx = "DataSink5")
DataSink5.setCatalogInfo(
catalogDatabase = "ods",
catalogTableName = "ods_t_carton")
from awsglue.dynamicframe import DynamicFrame
NewDynamicFrame = DynamicFrame.fromDF(df, glueContext, "nested")
DataSink5.setFormat("glueparquet")
DataSink5.writeFrame(NewDynamicFrame)
print('end')
# query = '''
# SELECT *
# FROM hello.xxxxxx
# LIMIT 5;
# '''
#
# result = spark.sql(query)
# result.show()
#Get datatype of birthday
columndf_student.select(``"birthday"``).dtypes
df = df.withColumn("kills",df.kills.astype("int"))
df.select("kills").dtypes
https://zhuanlan.zhihu.com/p/202969159
https://blog.csdn.net/XnCSD/article/details/90676259
import sys
import smtplib
import datetime
from email.mime.text import MIMEText
# 邮件设置
mail_host = "smtp.163.com" # 邮箱服务器地址
mail_port = 465 # 邮箱服务器端口
mail_user = "xxxxxxxx@163.com"
mail_pass = "xxxxxxxxxx"
mail_to = ["xxxxxxxx@qq.com","xxxxxxx@qq.com"] # 多个收件人使用列表
# 邮件内容
now = datetime.datetime.now()
now = now + datetime.timedelta(hours=8) # 将当前时间增加8小时= cn time
print(now)
if now.hour >= 0 and now.hour < 2:
msg = MIMEText("xxxxxxxx") # 邮件正文
elif now.hour >= 10 and now.hour < 12:
msg = MIMEText("xxxxxxx") # 邮件正文
else:
msg = MIMEText("xxxxxxx") # 邮件正文
# 邮件正文
msg['From'] = mail_user
msg['To'] = ",".join(mail_to) # 将多个收件人邮箱地址用逗号连接
msg['Subject'] = "随便"
# 发送邮件
smtp = smtplib.SMTP_SSL(mail_host, mail_port)
smtp.login(mail_user, mail_pass)
smtp.sendmail(mail_user, mail_to, msg.as_string())
smtp.quit()
print('success')
https://blog.csdn.net/qq_39954916/article/details/120584754
parseDate(concat(substring({dt},1,4),'-',substring({dt},5,2),'-',substring({dt},7,2)),'yyyy-MM-dd')
t_traffic_detail = t_traffic_detail.withColumn('utm_campaign' ,expr("parse_url(url,'QUERY', 'utm_campaign')"))
print('start')
#store_product = glueContext.create_dynamic_frame.from_catalog(database = "hello", table_name = "trendsi_public_t_store_product", transformation_ctx = "datasource0")
query= "(select * from t_test where update_time >= (date(now()) - interval '1 day' +interval '4 hour') and update_time <(date(now()) +interval '4 hour')) as testresult"
store_product = spark.read.format("jdbc").option("url", "jdbc:postgresql:xxxxxxxxxx").option("driver", "org.postgresql.Driver").option("dbtable", query).option("user", "xxxxxx").option("password", "xxxxxxxxxx").load()
print('success')
-- 1 可供直接测试使用
WITH dataset AS (
SELECT
'engineering' as department,
ARRAY['Sharon', 'John', 'Bob', 'Sally'] as users
)
SELECT department, names FROM dataset
CROSS JOIN UNNEST(users) as t(names)
-- 2.实例演示
select user_id,source from
(select user_id,split(custom_source,';') cs from hive_dw_user_new_tag where length(custom_source)>12 limit 20)
cross join unnest(cs) as t(source)
x = sc.parallelize(['A','A','B'])
y = sc.parallelize(['A','C','D'])
z = x.intersection(y)
print(x.collect())
print(y.collect())
print(z.collect())
#增加图片
wms_abnormal = source_wms_abnormal.toDF().select('id','image')
wms_abnormal = wms_abnormal.withColumn('image', regexp_replace('image', '([\[\]"]+)', ''))
wms_abnormal = wms_abnormal.withColumn("split_image",split(col("image"),","))
wms_abnormal = wms_abnormal.withColumn('image1', col('split_image').getItem(0))
wms_abnormal = wms_abnormal.withColumn('image2', col('split_image').getItem(1))
wms_abnormal = wms_abnormal.withColumn('image3', col('split_image').getItem(2))
wms_abnormal = wms_abnormal.drop('split_image')
from pyspark.sql.functions import collect_list
shopify_store_url = left_join.groupby('user_id').agg(collect_list('domain').alias("shopify_store_url"))
shopify_store_url = shopify_store_url.withColumn("shopify_store_url",concat_ws(",",col("shopify_store_url")))
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import *
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "prod-bi", table_name = "biods_t_user", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
import datetime
from datetime import datetime, timedelta
from pyspark.sql.types import StringType
recover_day =datetime.now()- timedelta(hours=4+24*0, minutes=0)
today_str = (recover_day).strftime("%Y-%m-%d")
today_int = (recover_day).strftime("%Y%m%d")
yesterday_str = (recover_day - timedelta(hours=24, minutes=0)).strftime("%Y-%m-%d")
yesterday_int = (recover_day - timedelta(hours=24, minutes=0)).strftime("%Y%m%d")
print('start')
from pyspark.sql import SparkSession
df = spark.read.options(header='True', inferSchema='True', delimiter='&') \
.csv("s3://xxxxxx/xx/xxx-emp")
df.printSchema()
df.show(20)
order_df = order_df.withColumn('us_delivery_cost',when((col('package_weight').isNull())&(col('logistics_company_id').isin(1132,25)),lit(10.95).cast(DecimalType(10,2))).otherwise(col('us_delivery_cost')))
select url_extract_parameter(url,'curPage') from traffic.xxxxxx
select case when cardinality(tmp)=3 then concat(tmp[1],tmp[2],tmp[3])
when cardinality(tmp)=2 then concat(tmp[1],tmp[2])
when cardinality(tmp)=1 then tmp[1] end category123 from
(select split(event_value, '/') tmp from traffic.xxxxx where event_target='page' and event_type='imp' and page_code='product_listing_page' limit 20) a
order_df = order_df.filter((~col('user_id').isin(327,166))&(col('pay_time')>'2021-04-01')&(col('pay_time')<today_str))
from pyspark.sql.functions import last_day
df = spark.createDataFrame([('1997-02-10',)], ['d'])
df.select(last_day(df.d).alias('date')).show()
select distinct * from report_inventory_kpi where (dt >= date_format(add_months(current_timestamp,-2),'yyyyMMdd'))
pyspark3.1.0之后weekofyear默认周日至周六,之前默认是周一周日,如果需要制定周一是一周的开始,则firstDayOfWeek=2
from pyspark.sql.functions import weekofyear
df.select(weekofyear('date', firstDayOfWeek=2)) # 显示指定 firstDayOfWeek 参数为 2
select concat(date_format(date_add('day', -1 ,date '2023-01-03'), '%x-%v'),'W')
持续更新中…🎈