• 大数据课程L9——网站流量项目的实时业务处理代码


    文章作者邮箱:yugongshiye@sina.cn              地址:广东惠州

     ▲ 本章节目的

    ⚪ 掌握网站流量项目的SparkStreaming代码;

    ⚪ 掌握网站流量项目的HBaseUtil代码;

    ⚪ 掌握网站流量项目的MysqlUtil代码;

    ⚪ 掌握网站流量项目的LogBean代码;

    ⚪ 掌握网站流量项目的TongjiBean代码;

    一、SparkStreaming代码

    package cn.tedu.kafkasource

    import org.apache.kafka.clients.consumer.ConsumerRecord

    import org.apache.kafka.common.TopicPartition

    import org.apache.kafka.common.serialization.StringDeserializer

    import org.apache.spark.SparkConf

    import org.apache.spark.streaming.dstream.InputDStream

    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

    import org.apache.spark.streaming.kafka010._

    import org.apache.spark.streaming.{Seconds, StreamingContext}

    import org.apache.spark.SparkContext

    import cn.tedu.pojo.LogBean

    import java.util.Calendar

    import cn.tedu.dao.HBaseUtil

    import cn.tedu.pojo.TongjiBean

    import cn.tedu.dao.MysqlUtil

    object SparkStreaming {

      def main(args: Array[String]): Unit = {

         val conf= new SparkConf().setMaster("local[3]").setAppName("test01")

                .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") 

        val sc=new SparkContext(conf)   

        val ssc=new StreamingContext(sc, Seconds(5))   

        val kafkaParams: Map[String, Object] = Map[String, Object](

                "bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",

                "key.deserializer" -> classOf[StringDeserializer],

                "value.deserializer" -> classOf[StringDeserializer],

                "group.id" -> "gp2"

            )

        val topics = Array("logdata")

        val kafkaSource=KafkaUtils.createDirectStream[String, String](

                ssc,

                PreferConsistent,

                Subscribe[String, String](topics, kafkaParams)

            ).map(x=>x.value())

        kafkaSource.foreachRDD{rdd=>

         //lines里存储了当前批次内的所有数据 

          val lines=rdd.toLocalIterator

          //遍历迭代器,对每条数据进行处理

          while(lines.hasNext){

            val line=lines.next()

            //第一步:清洗出所需要的业务字段。url,urlname,uvid,ssid,sscount,sstime,cip

            val info=line.split("\\|")

            val url=info(0)

            val urlname=info(1)

            val uvid=info(13)

            val ssid=info(14).split("_")(0)

            val sscount=info(14).split("_")(1)

            val sstime=info(14).split("_")(2)

  • 相关阅读:
    (02)Cartographer源码无死角解析-(27) 数据订阅、变换、排序、转发→总体复盘
    【Flowable】Flowable表结构(三)
    spark读取hive表字段,区分大小写问题
    释放搜索潜力:基于Milvus快速搭建语义检索系统(快速版),让信息尽在掌握
    你知道分库分表怎么无限扩容吗?
    java面试基础(三)
    Java EE --- spring 核心与设计思想
    数学公式与随机数
    mfc 浮动窗口
    java面试题
  • 原文地址:https://blog.csdn.net/u013955758/article/details/132818739