1.项目分析
项目地址:https://gitee.com/jenrey/adv
技术分析:
SparkStreaming或者Strom
数据:
广告流量点击数据
需求分析:
1)【 实时】统计【每天】【各省】【热门】广告(分组求广告点击次数多的TopN)
2)实时统计某个阶段广告投放趋势
数据调研:
timestamp:时间戳,用户点击广告的时间
province:省份,用户在哪个省份点击的广告
city:城市,用户在哪个城市点击的广告
userid:用户的唯一标识
advid:被点击的广告id
现在有数据源在kafka里面
2.黑名单过滤
[java]
- import kafka.serializer.StringDecoder
- import org.apache.spark.broadcast.Broadcast
- import org.apache.spark.rdd.RDD
- import org.apache.spark.streaming.dstream.DStream
- import org.apache.spark.streaming.kafka.KafkaUtils
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- import org.apache.spark.{SparkConf, SparkContext}
- /**
- * Create by jenrey on 2018/5/27 21:07
- */
- object AdvApplicationTest {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf()
- conf.setAppName("AdvApplicationTest")
- conf.setMaster("local")
- conf.set("", "") //序列化
- val sc = new SparkContext(conf)
- val ssc = new StreamingContext(sc, Seconds(5))
- /**
- * TODO:第一步:从kafka获取数据(direct 方式)
- */
- /* K: ClassTag,
- V: ClassTag,
- KD <: Decoder[K]: ClassTag,
- VD <: Decoder[V]: ClassTag] (
- ssc: StreamingContext,
- kafkaParams: Map[String, String],
- topics: Set[String]*/
- val kafkaParams = Map("metadata.broker.list" -> "hadoop04:9092")
- val topics = Set("aura")
- val logDStream: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
- //TODO:如果【一个用户】【一天内】对【某个广告】点击的次数超过了【100次】,这样的用户属于黑名单用户,这样的数据就不统计了
- /**
- * TODO:第二步:进行黑名单过滤
- */
- val filterLogDStream: DStream[String] = blackListFileter(logDStream,ssc)
- /**
- * TODO:第三步:动态生成黑名单
- */
- /**
- * TODO:第四步:实时统计每天各省各城市广告点击量
- */
- /**
- * TODO:第五步:实时统计每天各省热门广告点击量
- */
- /**
- * TODO:第六步:实时统计每天每个广告在最近一小时的滑动窗口的点击趋势
- */
- ssc.start()
- ssc.awaitTermination()
- ssc.stop()
- }
- /**
- * 对黑名单进行过滤的方法
- *
- * @param logDStream 从kafka读取数据
- * @return 进行黑名单过滤以后的数据
- */
- def blackListFileter(logDStream: DStream[String], ssc: StreamingContext): DStream[String] = {
- //这个地方的黑名单,应该是从我们持久化的数据库里面读取的:有三个数据库是我们常用的(Redis,hbase,mysql)
- val blackList = List((1L, true), (2L, true), (3L, true))
- //把黑名单转化成RDD
- val blackListRDD: RDD[(Long, Boolean)] = ssc.sparkContext.parallelize(blackList)
- //广播黑名单
- val blackListBroadcast: Broadcast[Array[(Long, Boolean)]] = ssc.sparkContext.broadcast(blackListRDD.collect())
- //transform对传进来的DStream中的每一个RDD进行操作
- logDStream.transform(rdd => {
- //把传进来的数据切分,组成kv形式
- val user_lineRDD: RDD[(Long, String)] = rdd.map(line => {
- val fields: Array[String] = line.split(",")
- (fields(3).toLong, line)
- })
- //注意广播出去后,需要使用.value来获取播放值
- val blackRDD: RDD[(Long, Boolean)] = rdd.sparkContext.parallelize(blackListBroadcast.value)
- /**
- * List((22L, "qwe"), (2L, "asd"), (3L, "zxc"))
- * List((1L, true), (2L, true), (3L, true))
- * leftOuterJoin 后的结果如下,此算子必须都是kv形式才行
- * (22,(qwe,None))
- * (3,(zxc,Some(true)))
- * (2,(asd,Some(true)))
- */
- val resultRDD: RDD[(Long, (String, Option[Boolean]))] = user_lineRDD.leftOuterJoin(blackRDD)
- //这个是返回值,返回进行黑名单过滤以后的数据
- resultRDD.filter(tuple=>{
- tuple._2._2.isEmpty
- }).map(_._2._1)
- })
- }
- }
3.动态生成黑名单
[java]
- import java.util.{Date, Properties}
- import kafka.serializer.StringDecoder
- import org.apache.spark.broadcast.Broadcast
- import org.apache.spark.rdd.RDD
- import org.apache.spark.sql._
- import org.apache.spark.streaming.dstream.DStream
- import org.apache.spark.streaming.kafka.KafkaUtils
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- import org.apache.spark.{SparkConf, SparkContext}
- import utils.{ConnectionPool, DateUtils}
- /**
- * Create by jenrey on 2018/5/27 21:07
- */
- object AdvApplicationTest {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf()
- conf.setAppName("AdvApplicationTest")
- conf.setMaster("local")
- conf.set("", "") //序列化
- val sc = new SparkContext(conf)
- val ssc = new StreamingContext(sc, Seconds(5))
- val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
- /**
- * TODO:第一步:从kafka获取数据(direct 方式)
- */
- /* K: ClassTag,
- V: ClassTag,
- KD <: Decoder[K]: ClassTag,
- VD <: Decoder[V]: ClassTag] (
- ssc: StreamingContext,
- kafkaParams: Map[String, String],
- topics: Set[String]*/
- val kafkaParams = Map("metadata.broker.list" -> "hadoop04:9092")
- val topics = Set("aura")
- val logDStream: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
- //TODO:如果【一个用户】【一天内】对【某个广告】点击的次数超过了【100次】,这样的用户属于黑名单用户,这样的数据就不统计了
- /**
- * TODO:第二步:进行黑名单过滤
- */
- val filterLogDStream: DStream[String] = blackListFileter(logDStream, ssc)
- /**
- * TODO:第三步:动态生成黑名单 实时生成黑名单
- */
- DynamicGenerationBlacklists(filterLogDStream,spark)
- /**
- * TODO:第四步:实时统计每天各省各城市广告点击量
- */
- /**
- * TODO:第五步:实时统计每天各省热门广告点击量
- */
- /**
- * TODO:第六步:实时统计每天每个广告在最近一小时的滑动窗口的点击趋势
- */
- ssc.start()
- ssc.awaitTermination()
- ssc.stop()
- }
- /**
- * TODO:对黑名单进行过滤的方法
- *
- * @param logDStream 从kafka读取数据
- * @return 进行黑名单过滤以后的数据
- */
- def blackListFileter(logDStream: DStream[String], ssc: StreamingContext): DStream[String] = {
- //这个地方的黑名单,应该是从我们持久化的数据库里面读取的:有三个数据库是我们常用的(Redis,hbase,mysql)
- val blackList = List((1L, true), (2L, true), (3L, true))
- //把黑名单转化成RDD
- val blackListRDD: RDD[(Long, Boolean)] = ssc.sparkContext.parallelize(blackList)
- //广播黑名单
- val blackListBroadcast: Broadcast[Array[(Long, Boolean)]] = ssc.sparkContext.broadcast(blackListRDD.collect())
- //transform对传进来的DStream中的每一个RDD进行操作
- logDStream.transform(rdd => {
- //把传进来的数据切分,组成kv形式
- val user_lineRDD: RDD[(Long, String)] = rdd.map(line => {
- val fields: Array[String] = line.split(",")
- (fields(3).toLong, line)
- })
- //注意广播出去后,需要使用.value来获取播放值
- val blackRDD: RDD[(Long, Boolean)] = rdd.sparkContext.parallelize(blackListBroadcast.value)
- /**
- * List((22L, "qwe"), (2L, "asd"), (3L, "zxc"))
- * List((1L, true), (2L, true), (3L, true))
- * leftOuterJoin 后的结果如下,此算子必须都是kv形式才行
- * (22,(qwe,None))
- * (3,(zxc,Some(true)))
- * (2,(asd,Some(true)))
- */
- val resultRDD: RDD[(Long, (String, Option[Boolean]))] = user_lineRDD.leftOuterJoin(blackRDD)
- //这个是返回值,返回进行黑名单过滤以后的数据
- resultRDD.filter(tuple => {
- tuple._2._2.isEmpty
- }).map(_._2._1)
- })
- }
- /**
- * TODO:动态生成黑名单
- *
- * @param filterLogDStream 黑名单过滤完了以后的数据
- * 如果【一个用户】【一天内】对【某个广告】点击的次数超过了【100次】,这样的用户属于黑名单用户
- * 有三种方式:1)使用UpdateStateByKey 2)reduceByKey 存入HBase 3)Mysql的方式
- */
- def DynamicGenerationBlacklists(filterLogDStream: DStream[String], spark: SparkSession): Unit = {
- val date_userid_advid_ds: DStream[(String, Long)] = filterLogDStream.map(line => {
- val fields: Array[String] = line.split(",")
- val time = new Date(fields(0).toLong)
- val date: String = DateUtils.formatDateKey(time)
- val userid: String = fields(3)
- val advid: String = fields(4)
- (date + "_" + userid + "_" + advid, 1L)
- }).reduceByKey(_ + _)
- date_userid_advid_ds.foreachRDD(rdd => {
- rdd.foreachPartition(partition => {
- //下面是写好的工具类,连接Mysql
- val connection = ConnectionPool.getConnection()
- val statement = connection.createStatement()
- partition.foreach {
- case (date_userid_advid, count) => {
- val fields = date_userid_advid.split("_")
- val date = fields(0)
- val userid = fields(1).toLong
- val advid = fields(2).toLong
- val sql = s"insert into aura.tmp_advclick_count values($date,$userid,$advid,$count)";
- statement.execute(sql);
- }
- }
- ConnectionPool.returnConnection(connection)
- })
- })
- /**
- * 生成黑名单
- */
- val df: DataFrame = spark.read.format("jdbc")
- .option("url", "jdbc:mysql://localhost:3306/aura")
- .option("user", "aura")
- .option("password", "aura")
- .option("dbtable", "tmp_advclick_count")
- .load()
- df.createOrReplaceTempView("tmp_advclick_count")
- val sql =
- """
- select
- userid
- from
- (
- select
- date,userid,advid,sum(click_count) c_count
- from
- tmp_advclick_count
- group by date,userid,advid
- ) t
- where
- t.c_count>100
- """
- val blacklistdf= spark.sql(sql).distinct()
- val properties = new Properties()
- properties.put("user","aura")
- properties.put("password","aura")
- blacklistdf.write.mode(SaveMode.Append)
- .jdbc("jdbc:mysql://localhost:3306/aura","black_list",properties)
- }
- }
登录 | 立即注册