JobPlus知识库 IT 大数据 文章
Spark Streaming接入HDFS的数据Local模式(使用Scala语言)

Spark Streaming接入HDFS的数据模拟一个wordcount的功能,结果打印到控制台,使用Local模式,使用Scala语言。

项目目录

pom.xml

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

  2. <modelVersion>4.0.0</modelVersion>

  3. <groupId>com.sid.spark</groupId>

  4. <artifactId>spark-train</artifactId>

  5. <version>1.0</version>

  6. <inceptionYear>2008</inceptionYear>

  7. <properties>

  8. <scala.version>2.11.8</scala.version>

  9. <kafka.version>0.9.0.0</kafka.version>

  10. <spark.version>2.2.0</spark.version>

  11. <hadoop.version>2.9.0</hadoop.version>

  12. <hbase.version>1.4.4</hbase.version>

  13. </properties>

  14. <repositories>

  15. <repository>

  16. <id>scala-tools.org</id>

  17. <name>Scala-Tools Maven2 Repository</name>

  18. <url>http://scala-tools.org/repo-releases</url>

  19. </repository>

  20. </repositories>

  21. <pluginRepositories>

  22. <pluginRepository>

  23. <id>scala-tools.org</id>

  24. <name>Scala-Tools Maven2 Repository</name>

  25. <url>http://scala-tools.org/repo-releases</url>

  26. </pluginRepository>

  27. </pluginRepositories>

  28. <dependencies>

  29. <dependency>

  30. <groupId>org.scala-lang</groupId>

  31. <artifactId>scala-library</artifactId>

  32. <version>${scala.version}</version>

  33. </dependency>

  34. <dependency>

  35. <groupId>org.apache.kafka</groupId>

  36. <artifactId>kafka_2.11</artifactId>

  37. <version>${kafka.version}</version>

  38. </dependency>

  39. <dependency>

  40. <groupId>org.apache.hadoop</groupId>

  41. <artifactId>hadoop-client</artifactId>

  42. <version>${hadoop.version}</version>

  43. </dependency>

  44. <!--<dependency>-->

  45. <!--<groupId>org.apache.hbase</groupId>-->

  46. <!--<artifactId>hbase-clinet</artifactId>-->

  47. <!--<version>${hbase.version}</version>-->

  48. <!--</dependency>-->

  49. <!--<dependency>-->

  50. <!--<groupId>org.apache.hbase</groupId>-->

  51. <!--<artifactId>hbase-server</artifactId>-->

  52. <!--<version>${hbase.version}</version>-->

  53. <!--</dependency>-->

  54. <dependency>

  55. <groupId>org.apache.spark</groupId>

  56. <artifactId>spark-streaming_2.11</artifactId>

  57. <version>${spark.version}</version>

  58. </dependency>

  59. <dependency>

  60. <groupId>net.jpountz.lz4</groupId>

  61. <artifactId>lz4</artifactId>

  62. <version>1.3.0</version>

  63. </dependency>

  64. </dependencies>

  65. <build>

  66. <sourceDirectory>src/main/scala</sourceDirectory>

  67. <testSourceDirectory>src/test/scala</testSourceDirectory>

  68. <plugins>

  69. <plugin>

  70. <groupId>org.scala-tools</groupId>

  71. <artifactId>maven-scala-plugin</artifactId>

  72. <executions>

  73. <execution>

  74. <goals>

  75. <goal>compile</goal>

  76. <goal>testCompile</goal>

  77. </goals>

  78. </execution>

  79. </executions>

  80. <configuration>

  81. <scalaVersion>${scala.version}</scalaVersion>

  82. <args>

  83. <arg>-target:jvm-1.5</arg>

  84. </args>

  85. </configuration>

  86. </plugin>

  87. <plugin>

  88. <groupId>org.apache.maven.plugins</groupId>

  89. <artifactId>maven-eclipse-plugin</artifactId>

  90. <configuration>

  91. <downloadSources>true</downloadSources>

  92. <buildcommands>

  93. <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>

  94. </buildcommands>

  95. <additionalProjectnatures>

  96. <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>

  97. </additionalProjectnatures>

  98. <classpathContainers>

  99. <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>

  100. <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>

  101. </classpathContainers>

  102. </configuration>

  103. </plugin>

  104. </plugins>

  105. </build>

  106. <reporting>

  107. <plugins>

  108. <plugin>

  109. <groupId>org.scala-tools</groupId>

  110. <artifactId>maven-scala-plugin</artifactId>

  111. <configuration>

  112. <scalaVersion>${scala.version}</scalaVersion>

  113. </configuration>

  114. </plugin>

  115. </plugins>

  116. </reporting>

  117. </project>

代码

  1. package com.sid.spark

  2. import org.apache.spark.SparkConf

  3. import org.apache.spark.streaming.{Seconds, StreamingContext}

  4. /**

  5.  * Created by jy02268879 on 2018/7/16.

  6.  *

  7.  * Spark Streaming 处理 HDFS文件数据

  8.  */

  9. object HDFSWordCount {

  10. def  main(args: Array[String]): Unit = {

  11. val sparkConf = new SparkConf().setMaster("local").setAppName("HDFSWordCount")

  12. val ssc = new StreamingContext(sparkConf, Seconds(5))

  13. /**

  14.      * Create an input stream that monitors a Hadoop-compatible filesystem

  15.      * for new files and reads them as text files (using key as LongWritable, value

  16.      * as Text and input format as TextInputFormat). Files must be written to the

  17.      * monitored directory by "moving" them from another location within the same

  18.      * file system. File names starting with . are ignored.

  19.      *

  20.      * param directory HDFS directory to monitor for new file

  21.      */

  22. val lines = ssc.textFileStream("hdfs://node1:9000/testdata/sparkstreaming/hdfswordcount/")

  23. val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

  24. result.print()

  25. ssc.start()

  26. ssc.awaitTermination()

  27. }

  28. }

 

启动Hadoop

(只在namenode上面执行,即是node1)

cd /app/hadoop/hadoop-2.9.0/sbin

start-all.sh

在HDFS上创建路径

cd /app/hadoop/hadoop-2.9.0/bin

hdfs dfs -mkdir -p /testdata/sparkstreaming/hdfswordcount

创建测试数据

cd /app/spark/test_data/monitor_file

vi test.log

IDEA点击run启动程序

 

将要计数的文件放到HDFS上

cd /app/spark/test_data/monitor_file

hdfs dfs -put test.log /testdata/sparkstreaming/hdfswordcount

查看IDEA控制台输出


如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

¥ 打赏支持
54人赞 举报
分享到
用户评价(0)

暂无评价,你也可以发布评价哦:)

扫码APP

扫描使用APP

扫码使用

扫描使用小程序