RDD
就是 Resillient Distributed Dataset
,即弹性分布式数据集。
RDD
在抽象上来讲是一种抽象的分布式的数据集合、简单的理解成一种数据结构。它是被分区的,每个分区分布在集群中的不同的节点上。从而可以让数据进行并行的计算。
它是Spark
框架上的通用货币。所有算子都是基于 RDD 来执行的,不同的场景会有不同的 rdd 实现类,但是都可以进行互相转换。RDD
执行过程中会形成 DAG
图,然后形成 lineage
保证容错性等。
从物理的角度来看 RDD
存储的是 block
和 node
之间的映射。
从逻辑的角度来看是一个 hdfs 文件,在抽象上是一种元素集合集合。它是被分区的,分为多个分区,每个分区分布在集群中的不同结点上,从而让 RDD
中的数据可以被并行操作(分布式数据集)
它主要特点就是弹性和容错性。
RDD
的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark
会自动将RDD
数据写入磁盘RDD
可以自动从节点失败中恢复过来。即如果某个节点上的RDD
partition
,因为节点故障,导致数据丢了,那么RDD
会自动通过自己的数据来源重新计算该partition
。这一切对使用者都是透明的。HDFS
或者HIVE
表来创建分区之间的依赖关系——宽、窄依赖;
区分方法:
RDD
的partition
与父RDD
的partition
是否是1对多的关系shuffle
就是宽依赖,否则就是窄依赖窄依赖:Narrow Dependency
父RDD
和子RDD
是一对一的依赖关系,如map
,filter
宽依赖:Shuffle Dependency
本质就是shuffle
。如reduceByKey
,groupyByKey
,父RDD
一个分区数据给了子RDD
的多个分区
Shuffle
描述的是一个过程,表现多对多的依赖关系,是Map
和Reduce
两个阶段的纽带,是对数据重新分区的过程,将经过mapTask
后,key
值相同的数据重新划分到同一个partition
中。
Shuffle
实现分为HashShuffleManager
和SortShuffleManager
,也可以自定义。
shuffle简介:在 DAG 阶段以shuffle为界,划分 stage,
上游 stage做 map task,每个maptask将计算结果数据分成多份,每一份对应到下游stage 的每个partition中,并将其临时写到磁盘,该过程叫做shuffle write;
下游stage 做reduce task,每个reduce task通过网络拉取上游 stage中所有map task的指定分区结果数据,该过程叫做shuffle read,最后完成reduce的业务逻辑。
Shuffle版本也随着spark不断进步和优化:
从2.0开始,把 Sort Based Shuffle
和 Tungsten-Sort
全部统一到 Sort Based Shuffle
中,Hash Based Shuffle
退出历史舞台。
目前spark2.1,直接把SortBased Shuffle
的writer分为三种:BypassMergeSortShuffleWriter
,SortShuffleWriter
和 UnsafeShuffle Writer
。
BypassMergeSortShuffle Writer:
Hash Shuffle 中的HashShuffle Writer
实现基本一致,唯一的区别在于,map端的多个输出文件会被汇总为一个文件。所有分区的数据会合并为同一个文件,会生成一个索引文件,是为了索引到每个分区的起始地址,可以随机 access某个partition 的所有数据。
SortShuffleWriter
:会对分区内进行排序或者全局排序。
处理步骤:使用 PartitionedAppendOnlyMap
或者 PartitionedPairBuffer
在内存中进行排序,排序的K是(partitionld, hash (key))这样一个元组。如果超过内存limit,spill 到一个文件中,这个文件中元素也是有序的,首先是按照 partitionld 时排厅,如宋 partona相同,再根据hash (key)进行比较排序。如果需要输出全局有序的文件的时候,就需要对之前所有的输出文件和当前内存中的数据结构中的数据进行merge sort,实现全局排序。
最终读取的时候,从整个全局merge后的读取迭代器中读取的数据,就是按照partitionld 从小到大排序的数据,误取过在中使用丹仅州刀区力权,并且记录每个分区文件的起始写入位置,把这些位置数据写入索引文件中。
UnsafeShuffleWriter
:优化部分是在shuffle write进行序列化写入过程中,直接对二进制进行排序,减少了内存消耗,最终只是 partition 级别的排序。
但是这种需要一定条件:对单条记录、shuffle数量有限制,而且不能带有聚合函数。排序实现:利用一个LongArray存储分区 ID、pageNumber、offset in page,并对这个数组排序。每次插入一条 record 到 page 中,就把 partionld + pageNumber + offset in page,作可以迭代器 PackedRecordPointer
为一个元素插入到 LongArray中。要想反向获得 record,
定义的数据结构就是[24 bit partition number][13 bit memory page number][27 bit offset inpage]然后到根据该指针可以拿到真实的record。
Partitioner
是shuffle
过程中key
重分区时的策略,即计算key
决定k-v
属于哪个分区,Transformation
是宽依赖的算子时,父RDD
和子RDD
之间会进行shuffle
操作,shuffle
涉及到网络开销,由于父RDD
和子RDD
中的partition
是多对多关系,所以容易造成partition
中数据分配不均匀,导致数据的倾斜。
从概念上讲,分区器(Partitioner
)定义了如何分布数据,决定一个RDD
可以分成多少个分区,每个分区的数据量有多大,从而决定了每个Task将处理哪些数据。
一般来说分区器是针对key-value
值的RDD
,并通过对key的运算来划分分区。非key-value
形式的RDD
无法根据数据特征来进行分区,也就没有设置分区器,此时Spark会把数据均匀的分配到执行节点上。
1、HashPatitioner
分区:对于给定的key
值,计算其 hashcode
除以分区数取余,最后这个值就是分区的id
。可能会出现数据不均匀,因为同一 key
值都在同一分区。
2、RangerPartitoner
分区:将一定范围的数映射到某一分区内。
3、自定义分区器
第一步:确定边界,从所有的RDD当中随机抽取样本并进行排序,依次来确定分区的rangerBounds(边界)(这里因为RDD如果很大的话,没法按照计算总量,所以需要用到蓄水池抽样)。
第二步:计算key值在rangerBounds所处的范围,得出分区id。
额外:蓄水池抽样算法,不知总量的随机抽样。找个 leetcode敲下理解。
leetcode382.Linked List Random Node(算法随机采样)
Init : a reservoir with the size: k //k is the sample number
fori=k+1 to N
M=random(1, i);
if(M<k)
SWAP the Mth value and ith value
end for
总的来说,spark分为三大类算子:
Transformation
转换算子:一个RDD 转换生成一个新的 RDD 的操作。 一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,不触发提交作业,需要等到有 Action
操作的时候才会真正触发运算。通过谱系图(lineage)这个DAG图进行操作,进行恢复。函数 | 作用 |
---|---|
map()(常用) | 将函数应用于RDD中每个元素,将返回值构成新的RDD |
flatMap()(常用) | 将函数引用于RDD中每个元素,将返回的迭代器的所有内容取出重新构成新的RDD |
filter()(常用) | filter()的参数为布尔函数,返回满足该布尔函数的元素构成新的RDD |
distinct() | 去重 |
sample(withReplacement,[seed]) | 对RDD采样,以及是否替换 |
union() | 生成一个包含两个RDD中所有元素的RDD,不去重。类似并集 |
intersection() | 将两个RDD共同的元素构成新的RDD,去重。类似交集 |
substract() | 在左边RDD中移除右边RDD中的内容,类似左连接 |
cartesian() | 与另一个RDD笛卡尔积 |
Action
行动算子:这类算子会触发 SparkContext
提交 Job 作业; Action
算子会触发 Spark
提交作业(Job
),对RDD
进行实际计算,并将求得的结果返回到驱动程序中或者写入到外部存储系统中。函数 | 作用 |
---|---|
collect()(常用) | 返回RDD中全部元素 |
count()(常用) | 返回RDD中元素个数 |
countByValue()(常用) | 返回各元素在RDD中出现的次数,返回类型为元组的集合 |
take(num)(常用) | 返回RDD中num个元素 |
top(num) | 返回RDD中最前面的num个元素 |
takeOrder(num)(ordering) | 从RDD中按照提供的顺序返回最前面的num个元素 |
takeSample(withReplacement,num,[seed]) | 从RDD中返回任意些元素 |
reduce(func)(常用) | 并行整合RDD中所有数据,类似sum |
fold(zero)(func) | 和reduce()一样,但是需要提供初始值 |
aggregate(zeroValue)(seq0p, comb0p) | 和reduce()相似,但是通常返回不同类型的函数 |
foreach(func)(常用) | 遍历RDD中每个元素使用传入的函数 |
Controller
控制操作:Spark
中控制算子也是懒执行的,当我们每次调用行动操作时,都会重算RDD
的所有依赖,如果多次行动操作使用同一个RDD
,就会导致大量的重复运算。为避免这种现象,可以对数据进行持久化,也就是存储该RDD
,保存在各自的分区中。控制算子有三种,cache
,persist
(RDD 持久化原理)checkpoint
,以上算子都可以将RDD
持久化,持久化的单位是partition
。
cache
和persist
都用于将一个RDD进行缓存,这样在以后使用的过程中就不需要重复计算 。cache 默认缓存级别是MEMORY_ONLY
checkpoint
算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。
级别 | 使用的空间 | CPU时间 | 是否在内存 | 是否在磁盘 | 备注 |
---|---|---|---|---|---|
MEMORY_ONLY | 高 | 低 | 是 | 否 | |
MEMORY_ONLY_SER | 低 | 高 | 是 | 否 | |
MEMORY_AND_DISK | 高 | 中等 | 部分 | 部分 | 如果数据在内存放不下,溢写到磁盘上 |
MEMORY_AND_DISK_SER | 低 | 高 | 部分 | 部分 | 如果数据在内存放不下,溢写到磁盘上,在内存中放序列化后的数据 |
DISK_ONLY | 低 | 高 | 否 | 是 |
👉输入:定义RDD
,在 Spark
程序运行中,数据从外部数据空间(例如, HDFS
、 Scala
集合或数据)输入到 Spark
,数据就进入了 Spark
运行时数据空间,会转化为 Spark
数据块,形成RDD
。
👉运行:在 Spark 数据输入形成 RDD 后,进行相应的操作,通过行动(Action
)算子,触发 Spark
提交作业,提交作业。如果数据需要复用,可以通过 Cache
算子,将数据缓存到内存。
👉输出:程序运行结束数据会输出 Spark
运行时空间,存储到分布式存储中(如saveAsTextFile
输出到 HDFS
)或Scala
数据或集合中( collect
输出到 Scala
集合,count
返回 Scala Int
型数据)。
Local
模式: 是用单机的多线程或者多进程模拟Spark分布式计算,通常用来 单机调试,验证开发出来的应用程序逻辑上有没有问题。该方式下,在程序执行过程中,只会生成一个SparkSubmit进程。这个SparkSubmit进程又当爹、又当妈,既是客户提交任务的Client进程、又是Spark的driver程序、还充当着Spark执行Task的Executor角色。其中N代表可以使用N个线程,每个线程拥有一个core。如果不指定N,则默认是1个线程(该线程有1个core)。如果是local[*],则代表 Run Spark locally with as many worker threads as logical cores on your machine.
👉Standalone
模式: 独立模式,自带完整的模式 , 该方式适用master和worker进程来模拟集群形式,不需要启动hadoop。在架构上和 MapReduce1比较,具有一致性,都是由Master、worker组成(只是名称不一样),资源抽象为粗粒式的slot,多少slot多少task。
👉Spark on YARN
:因为现在企业用到 hadoop
是基于YARN
的,为了融合Spark
,该方式是使用YARN来做集群管理,为Spark
应用分配资源 , 进行统一资源管理。有两种方式, YARN-client
(用于交互,client
当中运行sparkContext
进程进行任务分发监控),YARN-cluster
任务的分发和监控放在 MRAPPmaster
当中。
👉 Spark on mesos
:YARN
和 mesos
都是统一资源管理和调度系统。mesos
支持粗粒式和细粒式调度,前者节省了资源调度时间的开销,后者是不存在资源的浪费,但是资源调度延迟较大。
👉Driver:这是监督 Spark
作业或程序端到端执行的主程序。 它与集群的资源管理器进行资源的协商,并将程序编排成尽可能小的数据本地并行编程单元。
👉Executors:在任何 Spark
任务中,可以有一个或多个 executor
,即执行由 drive
委派的较小任务的进程。 executor
处理数据,最好是本地节点的,并将结果存储在内存和 / 或磁盘中。
👉Master:Apache Spark
已经在主 / 从架构中实现,因此 master
指的是执行驱动程序的集群节点。
👉Slave(已经改名为 Worker
):在分布式集群模式下,slave
是指运行执行程序的节点,因此在群集中可以有多个从机(而且大部分情况都是这样)。
👉Job:这是对任何一组数据执行的操作的集合。 典型的 word count job 涉及从任意来源读取文本文件,然后分离 (splitting
) 并聚合 (aggregating
) 这些字。
👉DAG:Spark
引擎中的任何 Spark
工作都由 DAG
的操作代表。 DAG
按顺序表示 Spark
操作的逻辑执行。 在发生故障的情况下由 DAG 重新计算可能的血统(lineage)。
👉Task:一个 job
可以拆分成更小的单位,以被称为 task
的孤立任务进行操作。 每个 task
由 executor
在一个数据分区上执行。
👉Stages:Spark
作业可以按逻辑划分为多个 stage
,每个stage
代表一组具有相同的洗牌(shuffle
)依赖关系的任务,即发生数据洗牌 (shuffle
) 的任务。 在洗牌映射(shuffle map
)阶段,任务结果被输入到下一个阶段;在结果阶段,task 计算 action
,开始对 Spark job
的赋值,例如take()
,foreach()
和 collect()
。
(* 注:shuffle
是划分 DAG
中 stage
的标识, 同时影响 Spark
执行速度的关键步骤。RDD
的 Transformation
函数中, 又分为窄依赖 (narrow dependency
) 和宽依赖 (wide dependency
) 的操作. 窄依赖跟宽依赖的区别是是否发生 shuffle
(洗牌) 操作. 宽依赖会发生shuffle
操作. 窄依赖是子RDD
的各个分片 (partition
) 不依赖于其他分片, 能够独立计算得到结果, 宽依赖指子 RDD
的各个分片会依赖于父 RDD
的多个分片, 所以会造成父RDD
的各个分片在集群中重新分片)
一个 Spark job 怎么执行的?
不管 Spark job 的大小,Spark job 都需要 SparkContext 来执行。 在之前使用 REPL 的例子中,人们会注意到使用了一个名为 sc 的环境变量,这就是在 REPL 环境中如何访问一个 SparkContext。
👉 SparkContext 创建一个由不同 transformation 组成的 operator graph,一旦在某个 transformation 之上执行 action,这个 group 会被提交给 DAGScheduler。 根据 RDD 的性质或由窄变换(narrow transformation)或宽变换(wide transformation)(需要 shuffle 操作的变换)产生的结果。DAGScheduler 会创建 stage。
👉DAGScheduler 以这样的方式拆分 DAG:每个 stage 都由在通用洗牌边界(common shuffle boundaries,实在不知道怎么翻译合适)下的相同洗牌依赖(shuffle dependency)组成。 此外,stage 可以是洗牌映射(shuffle map)阶段,在这种情况下,其任务结果将是另一个阶段的输入;也可以是结果阶段(result stage),在这种情况下,其任务直接计算启动作业的 action,例如 count()。
👉 然后 stage 被 DAGScheduler 作为任务集(TaskSets)提交给 TaskScheduler。 TaskScheduler 通过集群管理器(YARN,Mesos 和 Spark standalone)调度 TaskSet 并监视其执行情况。 如果任何任务失败,则重新运行,最后将结果发送到 DAGScheduler。 如果结果输出文件丢失,那么 DAGScheduler 将重新提交这些阶段给 TaskScheduler 以重新运行。
👉然后在满足资源和数据局部性约束的指定执行器(executor)(slave(worker)节点上运行的 JVM)上安排 task。 每个 executor 还可以分配多个 task。
1、启动:用户程序启动SparkContext
,是程序的总入口,初始化过程中启动DAGScheduler
作业调度和 TaskScheduler
任务调度。
2、生成作业:DAGScheduler
:根据shuffleDependency
将作业划分为不同的stage
,根据 RDD之间的依赖关系,宽依赖和窄依赖,划分原则就是遇见窄依赖就放进当前stage
,遇到宽依赖则断开。(相当于shuffle
是前后的stage
分界线)每一个stage
里面都会划分一个taskset
,也就是数据集,而DAGSchedule
的下一个任务就是将这个TaskSet
传给TaskSchedule
(在最后一个 stage
划分结束,就会触发作业的提交)。
3、提交任务集: TaskScheduler
:分配 Task
到哪一个executor
上去执行,SchedulerBackend
配合TaskScheduler
完成具体任务的资源分配。
4、任务执行:Executor
:实际任务的运行最终都 Execter
类来执行,对每个任务创建一个TaskRunner
类,交给线程池去实现。
spark-submit
提交代码,执行 new SparkContext()
,在 SparkContext 里构造 DAGScheduler
和 TaskScheduler
。stage的划分依据就是看是否产生了shuflle(即宽依赖),遇到一个shuffle操作就划分为前后两个stage.
两个方法:利用“血缘(Lineage)容错”和检查点(checkpoint)机制。
应用场景:当 spark 应用程序特别复杂,从初始的 RDD 开始到最后整个应用程序完成有很多的步骤,而且整个应用运行时间特别长,这种情况下就比较适合使用 checkpoint 功能。
原因:对于特别复杂的 Spark 应用,会出现某个反复使用的 RDD,即使之前持久化过但由于节点的故障导致数据丢失了,没有容错机制,所以需要重新计算一次数据。
Checkpoint 首先会调用 SparkContext
的 setCheckPointDIR()
方法,设置一个容错的文件系统的目录,比如说 HDFS
;然后对 RDD 调用checkpoint()
方法。之后在RDD
所处的job
运行结束之后,会启动一个单独的job
,来将checkpoint
过的 RDD 数据写入之前设置的文件系统,进行高可用、容错的类持久化操作。
检查点机制是我们在 spark streaming
中用来保障容错性的主要机制,它可以使spark streaming
阶段性的把应用数据存储到诸如 HDFS 等可靠存储系统中,以供恢复时使用。具体来说基于以下两个目的服务:
Spark SQL 比 Hadoop Hive 快,是有一定条件的,而且不是 Spark SQL 的引擎比 Hive 的引擎快,相反,Hive 的 HQL 引擎还比 Spark SQL 的引擎更快。其实,关键还是在于 Spark 本身快。
reduceByKey:reduceByKey 会在结果发送至 reducer 之前会对每个 mapper 在本地进行 merge,有点类似于在 MapReduce 中的 combiner。这样做的好处在于,在 map 端进行一次 reduce 之后,数据量会大幅度减小,从而减小传输,保证 reduce 端能够更快的进行结果计算。
groupByKey:groupByKey 会对每一个 RDD 中的 value 值进行聚合形成一个序列 (Iterator),此操作发生在 reduce 端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成 OutOfMemoryError。
所以在进行大量数据的 reduce 操作时候建议使用 reduceByKey。不仅可以提高速度,还可以防止使用 groupByKey 造成的内存溢出问题。
spark 非常重要的一个功能特性就是可以将 RDD 持久化在内存中。
调用 cache() 和 persist() 方法即可。cache() 和 persist() 的区别在于,cache() 是 persist() 的一种简化方式,cache() 的底层就是调用 persist() 的无参版本 persist(MEMORY_ONLY),将数据持久化到内存中。
如果需要从内存中清除缓存,可以使用 unpersist() 方法。RDD 持久化是可以手动选择不同的策略的。在调用 persist() 时传入对应的 StorageLevel 即可。
最主要的区别在于持久化只是将数据保存在 BlockManager 中,但是 RDD 的 lineage(血缘关系,依赖关系) 是不变的。但是 checkpoint 执行完之后,rdd 已经没有之前所谓的依赖 rdd 了,而只有一个强行为其设置的 checkpointRDD,checkpoint 之后 rdd 的 lineage 就改变了。
持久化的数据丢失的可能性更大,因为节点的故障会导致磁盘、内存的数据丢失。但是 checkpoint 的数据通常是保存在高可用的文件系统中,比如 HDFS 中,所以数据丢失可能性比较低
Transformation
操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Acion 操作的时候才会真正触发运算,这也就是懒加载.
MR:抽象层次低,需要使用手工代码来完成程序编写,使用上难以上手;
Spark:Spark 采用 RDD 计算模型,简单容易上手。
MR:只提供 map 和 reduce 两个操作,表达能力欠缺;
Spark:Spark 采用更加丰富的算子模型,包括 map、flatmap、groupbykey、reducebykey 等;
MR:一个 job 只能包含 map 和 reduce 两个阶段,复杂的任务需要包含很多个 job,这些 job 之间的管理以来需要开发者自己进行管理;
Spark:Spark 中一个 job 可以包含多个转换操作,在调度时可以生成多个 stage,而且如果多个 map 操作的分区不变,是可以放在同一个 task 里面去执行;
MR:中间结果存放在 hdfs 中;
Spark:Spark 的中间结果一般存在内存中,只有当内存不够了,才会存入本地磁盘,而不是 hdfs;
MR:只有等到所有的 map task 执行完毕后才能执行 reduce task;
Spark:Spark 中分区相同的转换构成流水线在一个 task 中执行,分区不同的需要进行 shuffle 操作,被划分成不同的 stage 需要等待前面的 stage 执行完才能执行。
MR:只适合 batch 批处理,时延高,对于交互式处理和实时处理支持不够;
Spark:Spark streaming 可以将流拆成时间间隔的 batch 进行处理,实时计算。
Spark 中和 join 相关的算子有这几个:join
、fullOuterJoin
、leftOuterJoin
、rightOuterJoin
join
join函数会输出两个RDD中key相同的所有项,并将它们的value联结起来,它联结的key要求在两个表中都存在,类似于SQL中的INNER JOIN。但它不满足交换律,a.join(b)与b.join(a)的结果不完全相同,值插入的顺序与调用关系有关。
leftOuterJoin
leftOuterJoin会保留对象的所有key,而用None填充在参数RDD other中缺失的值,因此调用顺序会使结果完全不同。如下面展示的结果,
rightOuterJoin
rightOuterJoin与leftOuterJoin基本一致,区别在于它的结果保留的是参数other这个RDD中所有的key。
fullOuterJoin
fullOuterJoin会保留两个RDD中所有的key,因此所有的值列都有可能出现缺失的情况,所有的值列都会转为Some对象。
Hadoop 底层使用 MapReduce 计算架构,只有 map 和 reduce 两种操作,表达能力比较欠缺,而且在 MR 过程中会重复的读写 hdfs,造成大量的磁盘 io 读写操作,所以适合高时延环境下批处理计算的应用;
Spark 是基于内存的分布式计算架构,提供更加丰富的数据集操作类型,主要分成转化操作和行动操作,包括 map、reduce、filter、flatmap、groupbykey、reducebykey、union 和 join 等,数据分析更加快速,所以适合低时延环境下计算的应用;
spark 与 hadoop 最大的区别在于迭代式计算模型。基于 mapreduce 框架的 Hadoop 主要分为 map 和 reduce 两个阶段,两个阶段完了就结束了,所以在一个 job 里面能做的处理很有限;spark 计算模型是基于内存的迭代式计算模型,可以分为 n 个阶段,根据用户编写的 RDD 算子和程序,在处理完一个阶段后可以继续往下处理很多个阶段,而不只是两个阶段。所以 spark 相较于 mapreduce,计算模型更加灵活,可以提供更强大的功能。
但是 spark 也有劣势,由于 spark 基于内存进行计算,虽然开发容易,但是真正面对大数据的时候,在没有进行调优的轻局昂下,可能会出现各种各样的问题,比如 OOM 内存溢出等情况,导致 spark 程序可能无法运行起来,而 mapreduce 虽然运行缓慢,但是至少可以慢慢运行完。
Hadoop/MapReduce 和 Spark 最适合的都是做离线型的数据分析,但 Hadoop 特别适合是单次分析的数据量 “很大” 的情景,而 Spark 则适用于数据量不是很大的情景。
Dataset转POJO
SparkSession spark = CloudUtils.getSparkSession();
// 查询原始数据
Dataset<Row> student = spark.sql("select * from `event`.`student`");
// 生成schema
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("id", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("major", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
// 转换查询结果为POJO List
List<Student> students = spark.createDataFrame(student.toJavaRDD(), schema)
.as(Encoders.bean(Student.class))
.collectAsList();
System.out.println(students);
Dataset中的日期类型为timestamp和java中的Date类型不兼容,和Timestamp类型相互兼容。
为了解决上述问题,我们可以先将Dataset转为JSON,然后将JSON转为POJO,代码
// 查出数据并转为json集合
List<String> jsonList = spark.sql("select * from `event`.`user`")
.toJSON()
.collectAsList();
// 将json转为pojo,这里使用的是FastJSON
List<User> users = jsonList.stream()
.map(jsonString -> JSON.parseObject(jsonString, User.class))
.collect(Collectors.toList());
System.out.println(users);
POJO转Dataset
// 获取users列表
List<User> users = createUsers();
// 使用createDataFrame转为dataset
Dataset<Row> ds = spark.createDataFrame(users, User.class);
// 将驼峰式列名改为下划线式列名,camelToUnderline方法网上搜索
String[] columns = ds.columns();
String[] newColumns = Arrays.stream(columns)
.map(column -> camelToUnderline(column))
.toArray(String[]::new);
// 转为新的df(重命名后的)
ds.toDF(newColumns);
ds.show();