JobPlus知识库 互联网 大数据 文章
广告流量实时统计

1.项目分析

项目地址:https://gitee.com/jenrey/adv

技术分析:

    SparkStreaming或者Strom

数据:

    广告流量点击数据

需求分析:

   1)【 实时】统计【每天】【各省】【热门】广告(分组求广告点击次数多的TopN)

   2)实时统计某个阶段广告投放趋势

数据调研:

timestamp:时间戳,用户点击广告的时间

province:省份,用户在哪个省份点击的广告

city:城市,用户在哪个城市点击的广告

userid:用户的唯一标识

advid:被点击的广告id

现在有数据源在kafka里面

2.黑名单过滤

[java] 

  1. import kafka.serializer.StringDecoder  
  2. import org.apache.spark.broadcast.Broadcast  
  3. import org.apache.spark.rdd.RDD  
  4. import org.apache.spark.streaming.dstream.DStream  
  5. import org.apache.spark.streaming.kafka.KafkaUtils  
  6. import org.apache.spark.streaming.{Seconds, StreamingContext}  
  7. import org.apache.spark.{SparkConf, SparkContext}  
  8.   
  9. /** 
  10.   * Create by jenrey on 2018/5/27 21:07 
  11.   */  
  12. object AdvApplicationTest {  
  13.   def main(args: Array[String]): Unit = {  
  14.     val conf = new SparkConf()  
  15.     conf.setAppName("AdvApplicationTest")  
  16.     conf.setMaster("local")  
  17.     conf.set("", "") //序列化  
  18.     val sc = new SparkContext(conf)  
  19.     val ssc = new StreamingContext(sc, Seconds(5))  
  20.   
  21.     /** 
  22.       * TODO:第一步:从kafka获取数据(direct 方式) 
  23.       */  
  24.     /* K: ClassTag, 
  25.        V: ClassTag, 
  26.        KD <: Decoder[K]: ClassTag, 
  27.        VD <: Decoder[V]: ClassTag] ( 
  28.        ssc: StreamingContext, 
  29.        kafkaParams: Map[String, String], 
  30.        topics: Set[String]*/  
  31.     val kafkaParams = Map("metadata.broker.list" -> "hadoop04:9092")  
  32.     val topics = Set("aura")  
  33.     val logDStream: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)  
  34.   
  35.     //TODO:如果【一个用户】【一天内】对【某个广告】点击的次数超过了【100次】,这样的用户属于黑名单用户,这样的数据就不统计了  
  36.     /** 
  37.       * TODO:第二步:进行黑名单过滤 
  38.       */  
  39.     val filterLogDStream: DStream[String] = blackListFileter(logDStream,ssc)  
  40.     /** 
  41.       * TODO:第三步:动态生成黑名单 
  42.       */  
  43.     /** 
  44.       * TODO:第四步:实时统计每天各省各城市广告点击量 
  45.       */  
  46.     /** 
  47.       * TODO:第五步:实时统计每天各省热门广告点击量 
  48.       */  
  49.     /** 
  50.       * TODO:第六步:实时统计每天每个广告在最近一小时的滑动窗口的点击趋势 
  51.       */  
  52.     ssc.start()  
  53.     ssc.awaitTermination()  
  54.     ssc.stop()  
  55.   }  
  56.   
  57.   /** 
  58.     * 对黑名单进行过滤的方法 
  59.     * 
  60.     * @param logDStream 从kafka读取数据 
  61.     * @return 进行黑名单过滤以后的数据 
  62.     */  
  63.   def blackListFileter(logDStream: DStream[String], ssc: StreamingContext): DStream[String] = {  
  64.     //这个地方的黑名单,应该是从我们持久化的数据库里面读取的:有三个数据库是我们常用的(Redis,hbase,mysql)  
  65.     val blackList = List((1L, true), (2L, true), (3L, true))  
  66.     //把黑名单转化成RDD  
  67.     val blackListRDD: RDD[(Long, Boolean)] = ssc.sparkContext.parallelize(blackList)  
  68.     //广播黑名单  
  69.     val blackListBroadcast: Broadcast[Array[(Long, Boolean)]] = ssc.sparkContext.broadcast(blackListRDD.collect())  
  70.     //transform对传进来的DStream中的每一个RDD进行操作  
  71.     logDStream.transform(rdd => {  
  72.       //把传进来的数据切分,组成kv形式  
  73.       val user_lineRDD: RDD[(Long, String)] = rdd.map(line => {  
  74.         val fields: Array[String] = line.split(",")  
  75.         (fields(3).toLong, line)  
  76.       })  
  77.       //注意广播出去后,需要使用.value来获取播放值  
  78.       val blackRDD: RDD[(Long, Boolean)] = rdd.sparkContext.parallelize(blackListBroadcast.value)  
  79.       /** 
  80.         * List((22L, "qwe"), (2L, "asd"), (3L, "zxc")) 
  81.         * List((1L, true), (2L, true), (3L, true)) 
  82.         * leftOuterJoin 后的结果如下,此算子必须都是kv形式才行 
  83.         * (22,(qwe,None)) 
  84.         * (3,(zxc,Some(true))) 
  85.         * (2,(asd,Some(true))) 
  86.         */  
  87.       val resultRDD: RDD[(Long, (String, Option[Boolean]))] = user_lineRDD.leftOuterJoin(blackRDD)  
  88.       //这个是返回值,返回进行黑名单过滤以后的数据  
  89.       resultRDD.filter(tuple=>{  
  90.         tuple._2._2.isEmpty  
  91.       }).map(_._2._1)  
  92.     })  
  93.   }  
  94. }  

3.动态生成黑名单

[java] 

  1. import java.util.{Date, Properties}  
  2.   
  3. import kafka.serializer.StringDecoder  
  4. import org.apache.spark.broadcast.Broadcast  
  5. import org.apache.spark.rdd.RDD  
  6. import org.apache.spark.sql._  
  7. import org.apache.spark.streaming.dstream.DStream  
  8. import org.apache.spark.streaming.kafka.KafkaUtils  
  9. import org.apache.spark.streaming.{Seconds, StreamingContext}  
  10. import org.apache.spark.{SparkConf, SparkContext}  
  11. import utils.{ConnectionPool, DateUtils}  
  12.   
  13. /** 
  14.   * Create by jenrey on 2018/5/27 21:07 
  15.   */  
  16. object AdvApplicationTest {  
  17.   def main(args: Array[String]): Unit = {  
  18.     val conf = new SparkConf()  
  19.     conf.setAppName("AdvApplicationTest")  
  20.     conf.setMaster("local")  
  21.     conf.set("", "") //序列化  
  22.     val sc = new SparkContext(conf)  
  23.     val ssc = new StreamingContext(sc, Seconds(5))  
  24.     val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()  
  25.     /** 
  26.       * TODO:第一步:从kafka获取数据(direct 方式) 
  27.       */  
  28.     /* K: ClassTag, 
  29.        V: ClassTag, 
  30.        KD <: Decoder[K]: ClassTag, 
  31.        VD <: Decoder[V]: ClassTag] ( 
  32.        ssc: StreamingContext, 
  33.        kafkaParams: Map[String, String], 
  34.        topics: Set[String]*/  
  35.     val kafkaParams = Map("metadata.broker.list" -> "hadoop04:9092")  
  36.     val topics = Set("aura")  
  37.     val logDStream: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)  
  38.   
  39.     //TODO:如果【一个用户】【一天内】对【某个广告】点击的次数超过了【100次】,这样的用户属于黑名单用户,这样的数据就不统计了  
  40.     /** 
  41.       * TODO:第二步:进行黑名单过滤 
  42.       */  
  43.     val filterLogDStream: DStream[String] = blackListFileter(logDStream, ssc)  
  44.   
  45.     /** 
  46.       * TODO:第三步:动态生成黑名单  实时生成黑名单 
  47.       */  
  48.     DynamicGenerationBlacklists(filterLogDStream,spark)  
  49.     /** 
  50.       * TODO:第四步:实时统计每天各省各城市广告点击量 
  51.       */  
  52.     /** 
  53.       * TODO:第五步:实时统计每天各省热门广告点击量 
  54.       */  
  55.     /** 
  56.       * TODO:第六步:实时统计每天每个广告在最近一小时的滑动窗口的点击趋势 
  57.       */  
  58.     ssc.start()  
  59.     ssc.awaitTermination()  
  60.     ssc.stop()  
  61.   }  
  62.   
  63.   /** 
  64.     * TODO:对黑名单进行过滤的方法 
  65.     * 
  66.     * @param logDStream 从kafka读取数据 
  67.     * @return 进行黑名单过滤以后的数据 
  68.     */  
  69.   def blackListFileter(logDStream: DStream[String], ssc: StreamingContext): DStream[String] = {  
  70.     //这个地方的黑名单,应该是从我们持久化的数据库里面读取的:有三个数据库是我们常用的(Redis,hbase,mysql)  
  71.     val blackList = List((1L, true), (2L, true), (3L, true))  
  72.     //把黑名单转化成RDD  
  73.     val blackListRDD: RDD[(Long, Boolean)] = ssc.sparkContext.parallelize(blackList)  
  74.     //广播黑名单  
  75.     val blackListBroadcast: Broadcast[Array[(Long, Boolean)]] = ssc.sparkContext.broadcast(blackListRDD.collect())  
  76.     //transform对传进来的DStream中的每一个RDD进行操作  
  77.     logDStream.transform(rdd => {  
  78.       //把传进来的数据切分,组成kv形式  
  79.       val user_lineRDD: RDD[(Long, String)] = rdd.map(line => {  
  80.         val fields: Array[String] = line.split(",")  
  81.         (fields(3).toLong, line)  
  82.       })  
  83.       //注意广播出去后,需要使用.value来获取播放值  
  84.       val blackRDD: RDD[(Long, Boolean)] = rdd.sparkContext.parallelize(blackListBroadcast.value)  
  85.       /** 
  86.         * List((22L, "qwe"), (2L, "asd"), (3L, "zxc")) 
  87.         * List((1L, true), (2L, true), (3L, true)) 
  88.         * leftOuterJoin 后的结果如下,此算子必须都是kv形式才行 
  89.         * (22,(qwe,None)) 
  90.         * (3,(zxc,Some(true))) 
  91.         * (2,(asd,Some(true))) 
  92.         */  
  93.       val resultRDD: RDD[(Long, (String, Option[Boolean]))] = user_lineRDD.leftOuterJoin(blackRDD)  
  94.       //这个是返回值,返回进行黑名单过滤以后的数据  
  95.       resultRDD.filter(tuple => {  
  96.         tuple._2._2.isEmpty  
  97.       }).map(_._2._1)  
  98.     })  
  99.   }  
  100.   
  101.   /** 
  102.     * TODO:动态生成黑名单 
  103.     * 
  104.     * @param filterLogDStream 黑名单过滤完了以后的数据 
  105.     *                         如果【一个用户】【一天内】对【某个广告】点击的次数超过了【100次】,这样的用户属于黑名单用户 
  106.     *                         有三种方式:1)使用UpdateStateByKey 2)reduceByKey 存入HBase 3)Mysql的方式 
  107.     */  
  108.   def DynamicGenerationBlacklists(filterLogDStream: DStream[String], spark: SparkSession): Unit = {  
  109.     val date_userid_advid_ds: DStream[(String, Long)] = filterLogDStream.map(line => {  
  110.       val fields: Array[String] = line.split(",")  
  111.       val time = new Date(fields(0).toLong)  
  112.       val date: String = DateUtils.formatDateKey(time)  
  113.       val userid: String = fields(3)  
  114.       val advid: String = fields(4)  
  115.       (date + "_" + userid + "_" + advid, 1L)  
  116.     }).reduceByKey(_ + _)  
  117.   
  118.     date_userid_advid_ds.foreachRDD(rdd => {  
  119.       rdd.foreachPartition(partition => {  
  120.         //下面是写好的工具类,连接Mysql  
  121.         val connection = ConnectionPool.getConnection()  
  122.         val statement = connection.createStatement()  
  123.         partition.foreach {  
  124.           case (date_userid_advid, count) => {  
  125.             val fields = date_userid_advid.split("_")  
  126.             val date = fields(0)  
  127.             val userid = fields(1).toLong  
  128.             val advid = fields(2).toLong  
  129.             val sql = s"insert into aura.tmp_advclick_count values($date,$userid,$advid,$count)";  
  130.             statement.execute(sql);  
  131.           }  
  132.         }  
  133.         ConnectionPool.returnConnection(connection)  
  134.       })  
  135.     })  
  136.   
  137.     /** 
  138.       * 生成黑名单 
  139.       */  
  140.     val df: DataFrame = spark.read.format("jdbc")  
  141.       .option("url", "jdbc:mysql://localhost:3306/aura")  
  142.       .option("user", "aura")  
  143.       .option("password", "aura")  
  144.       .option("dbtable", "tmp_advclick_count")  
  145.       .load()  
  146.     df.createOrReplaceTempView("tmp_advclick_count")  
  147.     val sql =  
  148.       """  
  149.          select  
  150.               userid  
  151.          from  
  152.          (  
  153.         select  
  154.               date,userid,advid,sum(click_count) c_count  
  155.           from  
  156.               tmp_advclick_count  
  157.         group by date,userid,advid  
  158.         ) t  
  159.             where  
  160.             t.c_count>100  
  161.       """  
  162.     val blacklistdf= spark.sql(sql).distinct()  
  163.     val properties = new Properties()  
  164.     properties.put("user","aura")  
  165.     properties.put("password","aura")  
  166.     blacklistdf.write.mode(SaveMode.Append)  
  167.       .jdbc("jdbc:mysql://localhost:3306/aura","black_list",properties)  
  168.   
  169.   }  
  170. }  

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

¥ 打赏支持
191人赞 举报
分享到
用户评价(0)

暂无评价,你也可以发布评价哦:)

扫码APP

扫描使用APP

扫码使用

扫描使用小程序