• 用idea工具scala 和 Java开发 spark案例:WordCount


    目录

    一 环境准备

    二 scala代码编写

    三 java 代码编写


    一 环境准备

            创建一个 maven 工程

            添加下列依赖

    1. <dependency>
    2. <groupId>org.apache.sparkgroupId>
    3. <artifactId>spark-core_2.12artifactId>
    4. <version>${spark.version}version>
    5. dependency>
    6. <dependency>
    7. <groupId>org.apache.sparkgroupId>
    8. <artifactId>spark-sql_2.12artifactId>
    9. <version>${spark.version}version>
    10. dependency>
    11. <dependency>
    12. <groupId>org.apache.sparkgroupId>
    13. <artifactId>spark-mllib_2.12artifactId>
    14. <version>${spark.version}version>
    15. dependency>
    16. <dependency>
    17. <groupId>org.apache.sparkgroupId>
    18. <artifactId>spark-streaming_2.12artifactId>
    19. <version>${spark.version}version>
    20. dependency>
    21. <dependency>
    22. <groupId>org.apache.sparkgroupId>
    23. <artifactId>spark-graphx_2.12artifactId>
    24. <version>${spark.version}version>
    25. dependency>
    26. <dependency>
    27. <groupId>org.apache.sparkgroupId>
    28. <artifactId>spark-hive_2.12artifactId>
    29. <version>${spark.version}version>
    30. dependency>
    31. <dependency>
    32. <groupId>mysqlgroupId>
    33. <artifactId>mysql-connector-javaartifactId>
    34. <version>${mysql.version}version>
    35. dependency>
    36. <dependency>
    37. <groupId>com.alibabagroupId>
    38. <artifactId>fastjsonartifactId>
    39. <version>1.2.62version>
    40. dependency>

            原本就下载过这些依赖的没必要再下一遍,可以用之前的,比如 json,mysql,mysq 这里版本是 mysql 5 ,不一样的注意修改

            

    二 scala代码编写

            首先准备好数据,即一个 txt 文本里面加一些单词,可以放在 hdfs 或本地或其它地方,读取的时候注意改代码,这里是读取 hdfs 上的 txt 文本,注意改成自己的地址

             新建一个 scala 的 object,编写代码:

    1. import org.apache.spark.rdd.RDD
    2. import org.apache.spark.sql.SparkSession
    3. import org.apache.spark.{SparkConf, SparkContext}
    4. object WordCountDemo {
    5. def main(args: Array[String]): Unit = {
    6. val conf : SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount")
    7. val sc : SparkContext = SparkContext.getOrCreate(conf)
    8. var spark : SparkSession = SparkSession.builder().config(conf).getOrCreate()
    9. // val rdd1: RDD[String] = sc.textFile("hdfs://101.200.63.3:9000/kb23/tmp/*.txt")
    10. // val rdd2: RDD[String] = rdd1.flatMap(x => x.split(" "))
    11. // val rdd3: RDD[(String, Int)] = rdd2.map(x => (x, 1))
    12. // val result: RDD[(String, Int)] = rdd3.reduceByKey(_ + _)
    13. val result2: RDD[(String, Int)] = sc.textFile("hdfs://101.200.63.3:9000/kb23/tmp/*.txt").flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y)
    14. //打印到 console
    15. // result2.glom().collect.foreach(x=>println(x.toList))
    16. //保存到 hdfs
    17. result2.saveAsTextFile("hdfs://101.200.63.3:9000/kb23/sparkoutput/wordcount")
    18. }
    19. }

            这里稍微解释一下代码中的一些函数:

            map:转换函数,数据集合中每个元素进行一次我们定义的方法

            flatMap: 与map类似,但是映射为0个或多个

            collect:以数组的形式返回数据集中的所有元素 

            glom:将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变。

     

            云服务器的朋友可能有的报错

    1. 22/05/0305:48:53 WARN DFSClient: Failed to connect to /10.0.24.10:9866 for block, add to deadNodes and continue. org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.0.24.10:9866]
    2. org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.0.24.10:9866]

            出现这种错误看字面意思就很容易明白,这是本地与 datanode 通信时,namenode 给的是 datanode 的内网 ip,所以本地找不到

            解决方法也很简单,设置一下让 namenode 传过来的是服务器名而不是 ip

            在 idea 中,resource 文件夹中添加文件 hdfs-site.xml

            hdfs-site.xml内容:

    1. <property>
    2. <name>dfs.client.use.datanode.hostnamename>
    3. <value>truevalue>
    4. <description>Whether datanodes should use datanode hostnames whenconnecting to other datanodes for data transfer.
    5. description>
    6. property>

    三 java 代码编写

            这里原数据存储在本地,文件名为 input.txt

    1. import org.apache.spark.SparkConf;
    2. import org.apache.spark.api.java.JavaPairRDD;
    3. import org.apache.spark.api.java.JavaRDD;
    4. import org.apache.spark.api.java.JavaSparkContext;
    5. import scala.Tuple2;
    6. import java.util.Arrays;
    7. import java.util.Map;
    8. public class WordCount {
    9. public static void main(String[] args) {
    10. // 创建SparkConf对象
    11. SparkConf conf = new SparkConf()
    12. .setAppName("WordCount")
    13. .setMaster("local");
    14. // 创建JavaSparkContext对象
    15. JavaSparkContext sc = new JavaSparkContext(conf);
    16. // 读取文本文件
    17. JavaRDD<String> lines = sc.textFile("input.txt");
    18. // 计算单词出现次数
    19. JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
    20. JavaRDD<String> filteredWords = words.filter(word -> !word.isEmpty());
    21. JavaPairRDD<String, Integer> wordCounts = filteredWords.mapToPair(word -> new Tuple2<>(word, 1))
    22. .reduceByKey((x, y) -> x + y);
    23. Map<String, Integer> wordCountsMap = wordCounts.collectAsMap();
    24. // 输出结果
    25. for (Map.Entry<String, Integer> entry : wordCountsMap.entrySet()) {
    26. System.out.println(entry.getKey() + ": " + entry.getValue());
    27. }
    28. // 关闭JavaSparkContext对象
    29. sc.close();
    30. }
    31. }

  • 相关阅读:
    基本数据类型和运算符(java)
    【Linux】gcc/g++ 和 gdb && git工具的基本使用方式
    mysql数据900W+从17s到300ms是怎么做到的?sql优化的魅力(荣耀典藏版)
    两个输入信号同时输入判断
    std::cout无法打印uint8_t类型的数据
    Win11怎么修改关机界面颜色?Win11修改关机界面颜色的方法
    「Python入门」Python多进程
    JVM-JVM中对象的结构
    【食品化学与营养】第二章 水的化学与营养 笔记
    【数据库】期末复习(计科版)
  • 原文地址:https://blog.csdn.net/jojo_oulaoula/article/details/133695169