操作系统
Window7/Mac
IDE
IntelliJ IDEA Community Edition 14.1.6
下载地址
JDK 1.8.0_65
下载地址
Scala 2.11.7
下载地址
其它环境
Spark:1.4.1
下载地址
Hadoop Yarn:Hadoop 2.5.0-cdh5.3.2
IDE项目创建
新建一个项目
New Project
使用Maven模型创建一个Scala项目
填写自己的GroupId、ArtifactId,Version不需要修改,Maven会根据GroupId生成相应的目录结构,GroupId的取值一般为a.b.c 结构,ArtifactId为项目名称。之后点击next,填写完项目名称和目录,点击finish就可以让maven帮你创建Scala项目
项目创建完成后,目录结构如下
4.为项目添加JDK以及Scala SDK
点击File->Project Structure,在SDKS和Global Libraries中为项目配置环境。
至此整个项目结构、项目环境都搭建好了
编写主函数
主函数的编写在 projectName/src/main/scala/…/下完成,如果按照上述步骤完成代码搭建,将在目录最后发现
MyRouteBuild
MyRouteMain
这两个文件为模块文件,删除MyRouteBuild,重命名MyRouteMain为DirectKafkaWordCount。这里,我使用Spark Streaming官方提供的一个代码为实例代码,代码如下
package org.apache.spark.examples.streaming
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
object DirectKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("...")
System.exit(1)
}
//StreamingExamples.setStreamingLogLevels()
val Array(brokers, topics) = args
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
将代码最上面的package org.apache.spark.examples.streaming,替换为DirectKafkaWordCount里的package部分即可。并覆盖DirectKafkaWordCount文件。
至此Spark处理代码已经编写完成。
修改pom.xml,为项目打包做准备
pom.xml中编写了整个项目的依赖关系,这个项目中我们需要导入一些Spark Streaming相关的包。
除此之外,如果需要把相关依赖打包到最终JAR包中,需要在pom.xml的bulid标签中写入以下配置:
pom.xml文件修改完成后,即可开始maven打包,操作如图:
点击右侧弹出窗口的Execute Maven Goal,在command line中输入clean package
Spark作业提交
在项目projectname/target目录下即可找到两个jar包,其中一个仅包含Scala代码,另一个包含所有依赖的包。
将jar包导到Spark服务器,运行Spark作业,运行操作如下
../bin/spark-submit –master yarn-client –jars ../lib/kafka_2.10-0.8.2.1.jar –class huochen.spark.example.DirectKafkaWordCount sparkExample-1.0-SNAPSHOT-jar-with-dependencies.jar kafka-broker topic
利用spark-submit把任务提交到Yarn集群,即可看到运行结果。