用Hadoop的各种语言进行wordcount(2):Apache Spark



继续昨天的内容,今天也是进行wordcount。今天是用Apache Spark (ScalaPythonJava来执行wordcount。





github 上的位置 https://github.com/kawamon/wordcount.git


Spark (Scala)




Cloudera Quickstart VM的Spark有版本问题,在spark-shell启动时会出现版本错误。




$ sudo -u hdfs hdfs dfs -chmod -R 777 /user/spark



另外,终止Quickstart VM进行启动的情况下,会有不能顺利与Spark Maste连接的情况。这时请试着重启Master与Worker(与History Server)





val file = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/input")
val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _, 1)  



$ spark-shell --master spark://quickstart.cloudera:7077
hadoop fs -cat spark_scala.output/part-00000







Spark (Python)







file = sc.textFile("hdfs://quickstart.cloudera/user/cloudera/input")
counts = file.flatMap(lambda line: line.split(" ")) \

.map(lambda word: (word, 1)) \

.reduceByKey(lambda a, b: a + b, 1)





$ MASTER=spark://quickstart.cloudera:7077 pyspark
$ hdfs dfs -cat pyspark.output/part-00000
(u'World', 2)

(u'Bye', 1)

(u'Hello', 2)

(u'Goodbye', 1)

(u'Hadoop', 2)



Spark (Java)









package com.example.sparkwordcount;

import java.util.ArrayList;

import java.util.Arrays;

import java.util.Collection;

import org.apache.spark.api.java.*;

import org.apache.spark.api.java.function.*;

import org.apache.spark.SparkConf;

import scala.Tuple2;

public class JavaWordCount {

public static void main(String[] args) {

JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"));

// split each document into words

JavaRDD<String> tokenized = sc.textFile(args[0]).flatMap(

new FlatMapFunction<String, String>() {


public Iterable<String> call(String s) {

return Arrays.asList(s.split(" "));




// count the occurrence of each word

JavaPairRDD<String, Integer> counts = tokenized.mapToPair(

new PairFunction<String, String, Integer>() {


public Tuple2<String, Integer> call(String s) {

return new Tuple2<String, Integer>(s, 1);




new Function2<Integer, Integer, Integer>() {


public Integer call(Integer i1, Integer i2) {

return i1 + i2;


}, 1      //number of reducers = 1










$ spark-submit --class com.example.sparkwordcount.JavaWordCount target/sparkwordcount-0.0.1-SNAPSHOT.jar hdfs://quickstart.cloudera/user/cloudera/input hdfs://quickstart.cloudera/user/cloudera/javaoutput
$ hadoop fs -cat javaoutput/part-00000













