JobPlus知识库 IT 大数据 文章
Spark Streaming带状态的算子UpdateStateByKey的操作Local模式(使用Scala语言)

带状态的算子UpdateStateByKey:

这个操作允许保持一些状态的信息,并且有新数据进来的时候持续更新状态。

使用这个操作必须:

1.定义一个有数据类型的状态。

2.定义状态更新的方法。

3.配置checkpoint目录用于存放状态,生产上最好配在HDFS上。

例子

需求:统计到目前为止累计出现的单词的个数(需要保持住以前的状态)。

项目结构

 

pom.xml

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

  2. <modelVersion>4.0.0</modelVersion>

  3. <groupId>com.sid.spark</groupId>

  4. <artifactId>spark-train</artifactId>

  5. <version>1.0</version>

  6. <inceptionYear>2008</inceptionYear>

  7. <properties>

  8. <scala.version>2.11.8</scala.version>

  9. <kafka.version>0.9.0.0</kafka.version>

  10. <spark.version>2.2.0</spark.version>

  11. <hadoop.version>2.9.0</hadoop.version>

  12. <hbase.version>1.4.4</hbase.version>

  13. </properties>

  14. <repositories>

  15. <repository>

  16. <id>scala-tools.org</id>

  17. <name>Scala-Tools Maven2 Repository</name>

  18. <url>http://scala-tools.org/repo-releases</url>

  19. </repository>

  20. </repositories>

  21. <pluginRepositories>

  22. <pluginRepository>

  23. <id>scala-tools.org</id>

  24. <name>Scala-Tools Maven2 Repository</name>

  25. <url>http://scala-tools.org/repo-releases</url>

  26. </pluginRepository>

  27. </pluginRepositories>

  28. <dependencies>

  29. <dependency>

  30. <groupId>org.scala-lang</groupId>

  31. <artifactId>scala-library</artifactId>

  32. <version>${scala.version}</version>

  33. </dependency>

  34. <dependency>

  35. <groupId>org.apache.kafka</groupId>

  36. <artifactId>kafka_2.11</artifactId>

  37. <version>${kafka.version}</version>

  38. </dependency>

  39. <dependency>

  40. <groupId>org.apache.hadoop</groupId>

  41. <artifactId>hadoop-client</artifactId>

  42. <version>${hadoop.version}</version>

  43. </dependency>

  44. <!--<dependency>-->

  45. <!--<groupId>org.apache.hbase</groupId>-->

  46. <!--<artifactId>hbase-clinet</artifactId>-->

  47. <!--<version>${hbase.version}</version>-->

  48. <!--</dependency>-->

  49. <!--<dependency>-->

  50. <!--<groupId>org.apache.hbase</groupId>-->

  51. <!--<artifactId>hbase-server</artifactId>-->

  52. <!--<version>${hbase.version}</version>-->

  53. <!--</dependency>-->

  54. <dependency>

  55. <groupId>org.apache.spark</groupId>

  56. <artifactId>spark-streaming_2.11</artifactId>

  57. <version>${spark.version}</version>

  58. </dependency>

  59. <dependency>

  60. <groupId>net.jpountz.lz4</groupId>

  61. <artifactId>lz4</artifactId>

  62. <version>1.3.0</version>

  63. </dependency>

  64. </dependencies>

  65. <build>

  66. <sourceDirectory>src/main/scala</sourceDirectory>

  67. <testSourceDirectory>src/test/scala</testSourceDirectory>

  68. <plugins>

  69. <plugin>

  70. <groupId>org.scala-tools</groupId>

  71. <artifactId>maven-scala-plugin</artifactId>

  72. <executions>

  73. <execution>

  74. <goals>

  75. <goal>compile</goal>

  76. <goal>testCompile</goal>

  77. </goals>

  78. </execution>

  79. </executions>

  80. <configuration>

  81. <scalaVersion>${scala.version}</scalaVersion>

  82. <args>

  83. <arg>-target:jvm-1.5</arg>

  84. </args>

  85. </configuration>

  86. </plugin>

  87. <plugin>

  88. <groupId>org.apache.maven.plugins</groupId>

  89. <artifactId>maven-eclipse-plugin</artifactId>

  90. <configuration>

  91. <downloadSources>true</downloadSources>

  92. <buildcommands>

  93. <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>

  94. </buildcommands>

  95. <additionalProjectnatures>

  96. <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>

  97. </additionalProjectnatures>

  98. <classpathContainers>

  99. <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>

  100. <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>

  101. </classpathContainers>

  102. </configuration>

  103. </plugin>

  104. </plugins>

  105. </build>

  106. <reporting>

  107. <plugins>

  108. <plugin>

  109. <groupId>org.scala-tools</groupId>

  110. <artifactId>maven-scala-plugin</artifactId>

  111. <configuration>

  112. <scalaVersion>${scala.version}</scalaVersion>

  113. </configuration>

  114. </plugin>

  115. </plugins>

  116. </reporting>

  117. </project>

代码

  1. package com.sid.spark

  2. import org.apache.spark.SparkConf

  3. import org.apache.spark.streaming.{Seconds, StreamingContext}

  4. /**

  5.  * Created by jy02268879 on 2018/7/17.

  6.  *

  7.  * 使用Spark Streaming完成有状态的统计

  8.  * 使用带状态的算子UpdateStateByKey

  9.  *

  10.  */

  11. object UpdateStateByKey {

  12. def main(args: Array[String]): Unit = {

  13. val sparkConf = new SparkConf().setAppName("UpdateStateByKey")setMaster("local[2]")

  14. val ssc = new StreamingContext(sparkConf,Seconds(5))

  15. /**如果使用了带状态的算子必须要设置checkpoint,

  16.      * 这里是设置在HDFS的文件夹中

  17.      */

  18. ssc.checkpoint("hdfs://node1:9000/testdata/sparkstreaming/hdfswordcount/")

  19. val lines = ssc.socketTextStream("node1",6789)

  20. val result = lines.flatMap(_.split(" ")).map((_,1))

  21. val state = result.updateStateByKey[Int](updateFunction _)

  22. state.print()

  23. ssc.start()

  24. ssc.awaitTermination()

  25. }

  26. /**

  27.    * 把当前的数据去更新已有的数据

  28.    * currentValues: Seq[Int] 新有的状态

  29.    * preValue: Option[Int] 已有的状态

  30.    * */

  31. def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {

  32. val currentCount = currentValues.sum//每个单词出现了多少次

  33. val preCount = preValues.getOrElse(0)

  34. Some(currentCount+preCount)//返回

  35. }

  36. }

node1上启动nc

nc -lk 6789

IDEA启动项目

在nc中输入

a a a a 

在输入 a a


如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

¥ 打赏支持
361人赞 举报
分享到
用户评价(0)

暂无评价,你也可以发布评价哦:)

扫码APP

扫描使用APP

扫码使用

扫描使用小程序