def getKafkaDStream(ssc : StreamingContext , topic: String , groupId:String ) ={
consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array(topic), consumerConfigs))
kafkaDStream
}
private val consumerConfigs: mutable.Map[String, Object] = mutable.Map[String,Object](
// kafka集群位置
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> MyPropsUtils(MyConfig.KAFKA_BOOTSTRAP_SERVERS),
// kv反序列化器
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
// groupId
// offset提交 自动 手动
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true",
//自动提交的时间间隔
//ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG
// offset重置 "latest" "earliest"
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest"
// .....
)
consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)是为了不限制groupId特意写的传参
是使用自带的kafka工具类createDirectStream方法去消费kafak 的数据,详细参数解释如下
在`KafkaUtils.createDirectStream`方法中,后续传递的参数的含义如下:
1. `ssc`:这是一个`StreamingContext`对象,用于指定Spark Streaming的上下文。
2. `LocationStrategies.PreferConsistent`:这是一个位置策略,用于指定Kafka消费者的位置策略。`PreferConsistent`表示优先选择分区分布均匀的消费者。
3. `ConsumerStrategies.Subscribe[String, String]`:这是一个消费者策略,用于指定Kafka消费者的订阅策略。`Subscribe[String, String]`表示按照指定的泛型主题字符串数组订阅消息,键和值的类型都为`String`。
4. `Array(topic)`:这是一个字符串数组,用于指定要订阅的Kafka主题。
5. `consumerConfigs`:这是一个`java.util.Properties`类型的对象,其中配置了一些Kafka消费者的属性。
总之,在`KafkaUtils.createDirectStream`方法中,这些参数组合被用于创建一个Kafka直连流(Direct Stream),该流可以直接从Kafka主题中消费消息,并将其转换为`InputDStream[ConsumerRecord[String, String]]`类型的DStream。
/**
* 生产者对象
*/
val producer : KafkaProducer[String,String] = createProducer()
/**
* 创建生产者对象
*/
def createProducer():KafkaProducer[String,String] = {
val producerConfigs: util.HashMap[String, AnyRef] = new util.HashMap[String,AnyRef]
//生产者配置类 ProducerConfig
//kafka集群位置
//producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092")
//producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,MyPropsUtils("kafka.bootstrap-servers"))
producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,MyPropsUtils(MyConfig.KAFKA_BOOTSTRAP_SERVERS))
//kv序列化器
producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer")
producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer")
//acks
producerConfigs.put(ProducerConfig.ACKS_CONFIG , "all")
//batch.size 16kb
//linger.ms 0
//retries
//幂等配置
producerConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG , "true")
val producer: KafkaProducer[String, String] = new KafkaProducer[String,String](producerConfigs)
producer
}
/**
* 生产(按照默认的黏性分区策略)
*/
def send(topic : String , msg : String ):Unit = {
producer.send(new ProducerRecord[String,String](topic , msg ))
}
/**或者!
* 生产(按照key进行分区)
*/
def send(topic : String , key : String , msg : String ):Unit = {
producer.send(new ProducerRecord[String,String](topic , key , msg ))
}
/**
* 关闭生产者对象
*/
def close():Unit = {
if(producer != null ) producer.close()
}
/**
* 刷写 ,将缓冲区的数据刷写到磁盘
*
*/
def flush(): Unit ={
producer.flush()
}
单纯的使用返回的ConsumerRecord不支持序列化,没有实现序列化接口
因此需要转换成通用的jsonobject对象
//3. 处理数据
//3.1 转换数据结构
val jsonObjDStream: DStream[JSONObject] = offsetRangesDStream.map(
consumerRecord => {
//获取ConsumerRecord中的value,value就是日志数据
val log: String = consumerRecord.value()
//转换成Json对象
val jsonObj: JSONObject = JSON.parseObject(log)
//返回
jsonObj
}
)
jsonObjDStream.foreachRDD(
rdd => {
rdd.foreachPartition(
jsonObjIter => {
for (jsonObj <- jsonObjIter) {
//分流过程
//分流错误数据
val errObj: JSONObject = jsonObj.getJSONObject("err")
if(errObj != null){
//将错误数据发送到 DWD_ERROR_LOG_TOPIC
MyKafkaUtils.send(DWD_ERROR_LOG_TOPIC , jsonObj.toJSONString )
}else{
}
}
}
}
else{
// 提取公共字段
val commonObj: JSONObject = jsonObj.getJSONObject("common")
val ar: String = commonObj.getString("ar")
val uid: String = commonObj.getString("uid")
val os: String = commonObj.getString("os")
val ch: String = commonObj.getString("ch")
val isNew: String = commonObj.getString("is_new")
val md: String = commonObj.getString("md")
val mid: String = commonObj.getString("mid")
val vc: String = commonObj.getString("vc")
val ba: String = commonObj.getString("ba")
//提取时间戳
val ts: Long = jsonObj.getLong("ts")
// 页面数据
val pageObj: JSONObject = jsonObj.getJSONObject("page")
if(pageObj != null ){
//提取page字段
val pageId: String = pageObj.getString("page_id")
val pageItem: String = pageObj.getString("item")
val pageItemType: String = pageObj.getString("item_type")
val duringTime: Long = pageObj.getLong("during_time")
val lastPageId: String = pageObj.getString("last_page_id")
val sourceType: String = pageObj.getString("source_type")
//封装成PageLog,这边还写了bean实体类去接收
var pageLog =
PageLog(mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,ts)
//发送到DWD_PAGE_LOG_TOPIC
MyKafkaUtils.send(DWD_PAGE_LOG_TOPIC , JSON.toJSONString(pageLog , new SerializeConfig(true)))//scala中bean没有set和get方法,这边是直接操作字段
}
//提取曝光数据
val displaysJsonArr: JSONArray = jsonObj.getJSONArray("displays")
if(displaysJsonArr != null && displaysJsonArr.size() > 0 ){
for(i <- 0 until displaysJsonArr.size()){
//循环拿到每个曝光
val displayObj: JSONObject = displaysJsonArr.getJSONObject(i)
//提取曝光字段
val displayType: String = displayObj.getString("display_type")
val displayItem: String = displayObj.getString("item")
val displayItemType: String = displayObj.getString("item_type")
val posId: String = displayObj.getString("pos_id")
val order: String = displayObj.getString("order")
//封装成PageDisplayLog
val pageDisplayLog =
PageDisplayLog(mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,displayType,displayItem,displayItemType,order,posId,ts)
// 写到 DWD_PAGE_DISPLAY_TOPIC
MyKafkaUtils.send(DWD_PAGE_DISPLAY_TOPIC , JSON.toJSONString(pageDisplayLog , new SerializeConfig(true)))
}
}
//提取事件数据(课下完成)
val actionJsonArr: JSONArray = jsonObj.getJSONArray("actions")
if(actionJsonArr != null && actionJsonArr.size() > 0 ){
for(i <- 0 until actionJsonArr.size()){
val actionObj: JSONObject = actionJsonArr.getJSONObject(i)
//提取字段
val actionId: String = actionObj.getString("action_id")
val actionItem: String = actionObj.getString("item")
val actionItemType: String = actionObj.getString("item_type")
val actionTs: Long = actionObj.getLong("ts")
//封装PageActionLog
var pageActionLog =
PageActionLog(mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,actionId,actionItem,actionItemType,actionTs,ts)
//写出到DWD_PAGE_ACTION_TOPIC
MyKafkaUtils.send(DWD_PAGE_ACTION_TOPIC , JSON.toJSONString(pageActionLog , new SerializeConfig(true)))
}
}
}
// 启动数据(课下完成)
val startJsonObj: JSONObject = jsonObj.getJSONObject("start")
if(startJsonObj != null ){
//提取字段
val entry: String = startJsonObj.getString("entry")
val loadingTime: Long = startJsonObj.getLong("loading_time")
val openAdId: String = startJsonObj.getString("open_ad_id")
val openAdMs: Long = startJsonObj.getLong("open_ad_ms")
val openAdSkipMs: Long = startJsonObj.getLong("open_ad_skip_ms")
//封装StartLog
var startLog =
StartLog(mid,uid,ar,ch,isNew,md,os,vc,ba,entry,openAdId,loadingTime,openAdMs,openAdSkipMs,ts)
//写出DWD_START_LOG_TOPIC
MyKafkaUtils.send(DWD_START_LOG_TOPIC , JSON.toJSONString(startLog ,new SerializeConfig(true)))
发送kafka的是自动提交,如果提交有误,会出现漏消费或者重复消费