JobPlus知识库 IT 大数据 文章
Spark Streaming接入Socket的数据Local模式(使用Scala语言)

Spark Streaming接入Socket的数据模拟一个wordcount的功能,结果打印到控制台,使用Local模式,使用Scala语言。

项目目录

pom.xml

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

  2. <modelVersion>4.0.0</modelVersion>

  3. <groupId>com.sid.spark</groupId>

  4. <artifactId>spark-train</artifactId>

  5. <version>1.0</version>

  6. <inceptionYear>2008</inceptionYear>

  7. <properties>

  8. <scala.version>2.11.8</scala.version>

  9. <kafka.version>0.9.0.0</kafka.version>

  10. <spark.version>2.2.0</spark.version>

  11. <hadoop.version>2.9.0</hadoop.version>

  12. <hbase.version>1.4.4</hbase.version>

  13. </properties>

  14. <repositories>

  15. <repository>

  16. <id>scala-tools.org</id>

  17. <name>Scala-Tools Maven2 Repository</name>

  18. <url>http://scala-tools.org/repo-releases</url>

  19. </repository>

  20. </repositories>

  21. <pluginRepositories>

  22. <pluginRepository>

  23. <id>scala-tools.org</id>

  24. <name>Scala-Tools Maven2 Repository</name>

  25. <url>http://scala-tools.org/repo-releases</url>

  26. </pluginRepository>

  27. </pluginRepositories>

  28. <dependencies>

  29. <dependency>

  30. <groupId>org.scala-lang</groupId>

  31. <artifactId>scala-library</artifactId>

  32. <version>${scala.version}</version>

  33. </dependency>

  34. <dependency>

  35. <groupId>org.apache.kafka</groupId>

  36. <artifactId>kafka_2.11</artifactId>

  37. <version>${kafka.version}</version>

  38. </dependency>

  39. <dependency>

  40. <groupId>org.apache.hadoop</groupId>

  41. <artifactId>hadoop-client</artifactId>

  42. <version>${hadoop.version}</version>

  43. </dependency>

  44. <!--<dependency>-->

  45. <!--<groupId>org.apache.hbase</groupId>-->

  46. <!--<artifactId>hbase-clinet</artifactId>-->

  47. <!--<version>${hbase.version}</version>-->

  48. <!--</dependency>-->

  49. <!--<dependency>-->

  50. <!--<groupId>org.apache.hbase</groupId>-->

  51. <!--<artifactId>hbase-server</artifactId>-->

  52. <!--<version>${hbase.version}</version>-->

  53. <!--</dependency>-->

  54. <dependency>

  55. <groupId>org.apache.spark</groupId>

  56. <artifactId>spark-streaming_2.11</artifactId>

  57. <version>${spark.version}</version>

  58. </dependency>

  59. <dependency>

  60. <groupId>net.jpountz.lz4</groupId>

  61. <artifactId>lz4</artifactId>

  62. <version>1.3.0</version>

  63. </dependency>

  64. </dependencies>

  65. <build>

  66. <sourceDirectory>src/main/scala</sourceDirectory>

  67. <testSourceDirectory>src/test/scala</testSourceDirectory>

  68. <plugins>

  69. <plugin>

  70. <groupId>org.scala-tools</groupId>

  71. <artifactId>maven-scala-plugin</artifactId>

  72. <executions>

  73. <execution>

  74. <goals>

  75. <goal>compile</goal>

  76. <goal>testCompile</goal>

  77. </goals>

  78. </execution>

  79. </executions>

  80. <configuration>

  81. <scalaVersion>${scala.version}</scalaVersion>

  82. <args>

  83. <arg>-target:jvm-1.5</arg>

  84. </args>

  85. </configuration>

  86. </plugin>

  87. <plugin>

  88. <groupId>org.apache.maven.plugins</groupId>

  89. <artifactId>maven-eclipse-plugin</artifactId>

  90. <configuration>

  91. <downloadSources>true</downloadSources>

  92. <buildcommands>

  93. <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>

  94. </buildcommands>

  95. <additionalProjectnatures>

  96. <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>

  97. </additionalProjectnatures>

  98. <classpathContainers>

  99. <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>

  100. <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>

  101. </classpathContainers>

  102. </configuration>

  103. </plugin>

  104. </plugins>

  105. </build>

  106. <reporting>

  107. <plugins>

  108. <plugin>

  109. <groupId>org.scala-tools</groupId>

  110. <artifactId>maven-scala-plugin</artifactId>

  111. <configuration>

  112. <scalaVersion>${scala.version}</scalaVersion>

  113. </configuration>

  114. </plugin>

  115. </plugins>

  116. </reporting>

  117. </project>

代码

  1. package com.sid.spark

  2. import org.apache.spark.SparkConf

  3. import org.apache.spark.streaming.{Seconds, StreamingContext}

  4. /**

  5.  * Created by jy02268879 on 2018/7/16.

  6.  *

  7.  * Spark Streaming 处理 Socket数据

  8.  *

  9.  * 测试用nc -lk 6789

  10.  */

  11. object SocketWordCount {

  12. def  main(args: Array[String]): Unit = {

  13. /**

  14.        * 当运行Spark Streaming应用程序的时候如果使用的Local模式,

  15.        * 不要使用local或者local[1]作为master的URL。

  16.        * 因为这种写法意味着仅仅只有一个线程能被使用,

  17.        * 如果使用基于Receiver的input DStream(如果用的HDFS上面的文件就可以用local[1]或local),

  18.        * Receiver就已经占用了线程了,

  19.        * 主流程就处理不了数据了。

  20.        * 所以要用local[n],n>Receiver的数量。

  21.        * */

  22. val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SocketWordCount")

  23. //构建StreamingContext

  24. /**

  25.        * 创建StreamingContext需要两个参数:sparkConf和batch interval

  26.        * */

  27. val ssc = new StreamingContext(sparkConf,Seconds(5))

  28. val lines = ssc.socketTextStream("node1",6789)

  29. val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

  30. result.print()

  31. ssc.start()

  32. ssc.awaitTermination()

  33. }

  34. }

在node1的控制台启动

nc -lk 6789

IDEA点击run启动程序

在控制台nc监控下输入a a a b b c d d

查看控制台输出


如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

¥ 打赏支持
259人赞 举报
分享到
用户评价(0)

暂无评价,你也可以发布评价哦:)

扫码APP

扫描使用APP

扫码使用

扫描使用小程序