diff --git a/commons/Utils.py b/commons/Utils.py new file mode 100644 index 00000000..2038566d --- /dev/null +++ b/commons/Utils.py @@ -0,0 +1,5 @@ +import re + +class Utils(): + + COMMA_DELIMITER = re.compile(''',(?=(?:[^'"]|'[^']*'|"[^"]*")*$)''') diff --git a/rdd/WordCount.py b/rdd/WordCount.py index ac6e5e58..c95a04f1 100644 --- a/rdd/WordCount.py +++ b/rdd/WordCount.py @@ -2,9 +2,10 @@ from pyspark import SparkContext if __name__ == "__main__": - sc = SparkContext("local", "word count") - lines = sc.textFile("in/word_count.text") - words = lines.flatMap(lambda line: line.split(" ")) - wordCounts = words.countByValue() - for word, count in wordCounts.items(): - print(word, count) \ No newline at end of file + sc = SparkContext("local", "word count") + sc.setLogLevel("ERROR") + lines = sc.textFile("in/word_count.text") + words = lines.flatMap(lambda line: line.split(" ")) + wordCounts = words.countByValue() + for word, count in wordCounts.items(): + print(word, count) diff --git a/rdd/airports/AirportsByLatitudeProblem.py b/rdd/airports/AirportsByLatitudeProblem.py new file mode 100644 index 00000000..306f4a7b --- /dev/null +++ b/rdd/airports/AirportsByLatitudeProblem.py @@ -0,0 +1,17 @@ +from pyspark import SparkContext + +if __name__ == "__main__": + + ''' + Create a Spark program to read the airport data from in/airports.text, find all the airports whose latitude are bigger than 40. + Then output the airport's name and the airport's latitude to out/airports_by_latitude.text. + + Each row of the input file contains the following columns: + Airport ID, Name of airport, Main city served by airport, Country where airport is located, IATA/FAA code, + ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format + + Sample output: + "St Anthony", 51.391944 + "Tofino", 49.082222 + ... + ''' diff --git a/rdd/airports/AirportsByLatitudeProblem.scala b/rdd/airports/AirportsByLatitudeProblem.scala deleted file mode 100644 index 8af9b1cd..00000000 --- a/rdd/airports/AirportsByLatitudeProblem.scala +++ /dev/null @@ -1,20 +0,0 @@ -package com.sparkTutorial.rdd.airports - -object AirportsByLatitudeProblem { - - def main(args: Array[String]) { - - /* Create a Spark program to read the airport data from in/airports.text, find all the airports whose latitude are bigger than 40. - Then output the airport's name and the airport's latitude to out/airports_by_latitude.text. - - Each row of the input file contains the following columns: - Airport ID, Name of airport, Main city served by airport, Country where airport is located, IATA/FAA code, - ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format - - Sample output: - "St Anthony", 51.391944 - "Tofino", 49.082222 - ... - */ - } -} diff --git a/rdd/airports/AirportsByLatitudeSolution.py b/rdd/airports/AirportsByLatitudeSolution.py new file mode 100644 index 00000000..2ce476d5 --- /dev/null +++ b/rdd/airports/AirportsByLatitudeSolution.py @@ -0,0 +1,17 @@ +from pyspark import SparkContext +from commons.Utils import Utils + +def splitComma(line: str): + splits = Utils.COMMA_DELIMITER.split(line) + return "{}, {}".format(splits[1], splits[6]) + +if __name__ == "__main__": + sc = SparkContext("local", "airports") + + airports = sc.textFile("in/airports.text") + + airportsInUSA = airports.filter(lambda line: float(Utils.COMMA_DELIMITER.split(line)[6]) > 40) + + airportsNameAndCityNames = airportsInUSA.map(splitComma) + + airportsNameAndCityNames.saveAsTextFile("out/airports_by_latitude.text") \ No newline at end of file diff --git a/rdd/airports/AirportsByLatitudeSolution.scala b/rdd/airports/AirportsByLatitudeSolution.scala deleted file mode 100644 index 13faf3c9..00000000 --- a/rdd/airports/AirportsByLatitudeSolution.scala +++ /dev/null @@ -1,23 +0,0 @@ -package com.sparkTutorial.rdd.airports - -import com.sparkTutorial.commons.Utils -import org.apache.spark.{SparkConf, SparkContext} - -object AirportsByLatitudeSolution { - - def main(args: Array[String]) { - - val conf = new SparkConf().setAppName("airports").setMaster("local[2]") - val sc = new SparkContext(conf) - - val airports = sc.textFile("in/airports.text") - val airportsInUSA = airports.filter(line => line.split(Utils.COMMA_DELIMITER)(6).toFloat > 40) - - val airportsNameAndCityNames = airportsInUSA.map(line => { - val splits = line.split(Utils.COMMA_DELIMITER) - splits(1) + ", " + splits(6) - }) - - airportsNameAndCityNames.saveAsTextFile("out/airports_by_latitude.text") - } -} diff --git a/rdd/airports/AirportsInUsaProblem.py b/rdd/airports/AirportsInUsaProblem.py new file mode 100644 index 00000000..4e76489c --- /dev/null +++ b/rdd/airports/AirportsInUsaProblem.py @@ -0,0 +1,17 @@ +from pyspark import SparkContext + +if __name__ == "__main__": + + ''' + Create a Spark program to read the airport data from in/airports.text, find all the airports which are located in United States + and output the airport's name and the city's name to out/airports_in_usa.text. + + Each row of the input file contains the following columns: + Airport ID, Name of airport, Main city served by airport, Country where airport is located, IATA/FAA code, + ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format + + Sample output: + "Putnam County Airport", "Greencastle" + "Dowagiac Municipal Airport", "Dowagiac" + ... + ''' diff --git a/rdd/airports/AirportsInUsaProblem.scala b/rdd/airports/AirportsInUsaProblem.scala deleted file mode 100644 index 89dd6f5f..00000000 --- a/rdd/airports/AirportsInUsaProblem.scala +++ /dev/null @@ -1,19 +0,0 @@ -package com.sparkTutorial.rdd.airports - -object AirportsInUsaProblem { - def main(args: Array[String]) { - - /* Create a Spark program to read the airport data from in/airports.text, find all the airports which are located in United States - and output the airport's name and the city's name to out/airports_in_usa.text. - - Each row of the input file contains the following columns: - Airport ID, Name of airport, Main city served by airport, Country where airport is located, IATA/FAA code, - ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format - - Sample output: - "Putnam County Airport", "Greencastle" - "Dowagiac Municipal Airport", "Dowagiac" - ... - */ - } -} diff --git a/rdd/airports/AirportsInUsaSolution.py b/rdd/airports/AirportsInUsaSolution.py new file mode 100644 index 00000000..96ec733b --- /dev/null +++ b/rdd/airports/AirportsInUsaSolution.py @@ -0,0 +1,15 @@ +from pyspark import SparkContext +from commons.Utils import Utils + +def splitComma(line: str): + splits = Utils.COMMA_DELIMITER.split(line) + return "{}, {}".format(splits[1], splits[2]) + +if __name__ == "__main__": + sc = SparkContext("local", "airports") + + airports = sc.textFile("in/airports.text") + airportsInUSA = airports.filter(lambda line : Utils.COMMA_DELIMITER.split(line)[3] == "\"United States\"") + + airportsNameAndCityNames = airportsInUSA.map(splitComma) + airportsNameAndCityNames.saveAsTextFile("out/airports_in_usa.text") diff --git a/rdd/airports/AirportsInUsaSolution.scala b/rdd/airports/AirportsInUsaSolution.scala deleted file mode 100644 index 04621547..00000000 --- a/rdd/airports/AirportsInUsaSolution.scala +++ /dev/null @@ -1,22 +0,0 @@ -package com.sparkTutorial.rdd.airports - -import com.sparkTutorial.commons.Utils -import org.apache.spark.{SparkConf, SparkContext} - -object AirportsInUsaSolution { - - def main(args: Array[String]) { - - val conf = new SparkConf().setAppName("airports").setMaster("local[2]") - val sc = new SparkContext(conf) - - val airports = sc.textFile("in/airports.text") - val airportsInUSA = airports.filter(line => line.split(Utils.COMMA_DELIMITER)(3) == "\"United States\"") - - val airportsNameAndCityNames = airportsInUSA.map(line => { - val splits = line.split(Utils.COMMA_DELIMITER) - splits(1) + ", " + splits(2) - }) - airportsNameAndCityNames.saveAsTextFile("out/airports_in_usa.text") - } -} diff --git a/rdd/collect/CollectExample.py b/rdd/collect/CollectExample.py new file mode 100644 index 00000000..fe01119d --- /dev/null +++ b/rdd/collect/CollectExample.py @@ -0,0 +1,10 @@ +from pyspark import SparkContext + +if __name__ == "__main__": + sc = SparkContext("local", "collect") + sc.setLogLevel("ERROR") + inputWords = ["spark", "hadoop", "spark", "hive", "pig", "cassandra", "hadoop"] + wordRdd = sc.parallelize(inputWords) + words = wordRdd.collect() + for word in words: + print(word) \ No newline at end of file diff --git a/rdd/collect/CollectExample.scala b/rdd/collect/CollectExample.scala deleted file mode 100644 index 1a450778..00000000 --- a/rdd/collect/CollectExample.scala +++ /dev/null @@ -1,20 +0,0 @@ -package com.sparkTutorial.rdd.collect - -import org.apache.log4j.Level -import org.apache.log4j.Logger -import org.apache.spark.{SparkConf, SparkContext} - -object CollectExample { - def main(args: Array[String]) { - Logger.getLogger("org").setLevel(Level.ERROR) - val conf = new SparkConf().setAppName("collect").setMaster("local[*]") - val sc = new SparkContext(conf) - - val inputWords = List("spark", "hadoop", "spark", "hive", "pig", "cassandra", "hadoop") - val wordRdd = sc.parallelize(inputWords) - - val words = wordRdd.collect() - - for (word <- words) println(word) - } -} diff --git a/rdd/count/CountExample.py b/rdd/count/CountExample.py new file mode 100644 index 00000000..93bbf45e --- /dev/null +++ b/rdd/count/CountExample.py @@ -0,0 +1,12 @@ +from pyspark import SparkContext + +if __name__ == "__main__": + sc = SparkContext("local", "count") + sc.setLogLevel("ERROR") + inputWords = ["spark", "hadoop", "spark", "hive", "pig", "cassandra", "hadoop"] + wordRdd = sc.parallelize(inputWords) + print("Count: {}".format(wordRdd.count())) + worldCountByValue = wordRdd.countByValue() + print("CountByValue: ") + for word, count in worldCountByValue.items(): + print("{} : {}".format(word, count)) diff --git a/rdd/count/CountExample.scala b/rdd/count/CountExample.scala deleted file mode 100644 index 1993183b..00000000 --- a/rdd/count/CountExample.scala +++ /dev/null @@ -1,23 +0,0 @@ -package com.sparkTutorial.rdd.count - -import org.apache.log4j.Level -import org.apache.log4j.Logger -import org.apache.spark.{SparkConf, SparkContext} - -object CountExample { - - def main(args: Array[String]) { - Logger.getLogger("org").setLevel(Level.ERROR) - val conf = new SparkConf().setAppName("count").setMaster("local[*]") - val sc = new SparkContext(conf) - - val inputWords = List("spark", "hadoop", "spark", "hive", "pig", "cassandra", "hadoop") - val wordRdd = sc.parallelize(inputWords) - println("Count: " + wordRdd.count()) - - val wordCountByValue = wordRdd.countByValue() - println("CountByValue:") - - for ((word, count) <- wordCountByValue) println(word + " : " + count) - } -} diff --git a/rdd/nasaApacheWebLogs/SameHostsProblem.py b/rdd/nasaApacheWebLogs/SameHostsProblem.py new file mode 100644 index 00000000..cf728235 --- /dev/null +++ b/rdd/nasaApacheWebLogs/SameHostsProblem.py @@ -0,0 +1,20 @@ +from pyspark import SparkContext + +if __name__ == "__main__": + + ''' + "in/nasa_19950701.tsv" file contains 10000 log lines from one of NASA's apache server for July 1st, 1995. + "in/nasa_19950801.tsv" file contains 10000 log lines for August 1st, 1995 + Create a Spark program to generate a new RDD which contains the hosts which are accessed on BOTH days. + Save the resulting RDD to "out/nasa_logs_same_hosts.csv" file. + + Example output: + vagrant.vf.mmc.com + www-a1.proxy.aol.com + ..... + + Keep in mind, that the original log files contains the following header lines. + host logname time method url response bytes + + Make sure the head lines are removed in the resulting RDD. + ''' diff --git a/rdd/nasaApacheWebLogs/SameHostsProblem.scala b/rdd/nasaApacheWebLogs/SameHostsProblem.scala deleted file mode 100644 index d55a8b82..00000000 --- a/rdd/nasaApacheWebLogs/SameHostsProblem.scala +++ /dev/null @@ -1,23 +0,0 @@ -package com.sparkTutorial.rdd.nasaApacheWebLogs - -object SameHostsProblem { - - def main(args: Array[String]) { - - /* "in/nasa_19950701.tsv" file contains 10000 log lines from one of NASA's apache server for July 1st, 1995. - "in/nasa_19950801.tsv" file contains 10000 log lines for August 1st, 1995 - Create a Spark program to generate a new RDD which contains the hosts which are accessed on BOTH days. - Save the resulting RDD to "out/nasa_logs_same_hosts.csv" file. - - Example output: - vagrant.vf.mmc.com - www-a1.proxy.aol.com - ..... - - Keep in mind, that the original log files contains the following header lines. - host logname time method url response bytes - - Make sure the head lines are removed in the resulting RDD. - */ - } -} diff --git a/rdd/nasaApacheWebLogs/SameHostsSolution.py b/rdd/nasaApacheWebLogs/SameHostsSolution.py new file mode 100644 index 00000000..7081dce9 --- /dev/null +++ b/rdd/nasaApacheWebLogs/SameHostsSolution.py @@ -0,0 +1,15 @@ +from pyspark import SparkContext + +if __name__ == "__main__": + sc = SparkContext("local", "sameHosts") + + julyFirstLogs = sc.textFile("in/nasa_19950701.tsv") + augustFirstLogs = sc.textFile("in/nasa_19950801.tsv") + + julyFirstHosts = julyFirstLogs.map(lambda line: line.split("\t")[0]) + augustFirstHosts = augustFirstLogs.map(lambda line: line.split("\t")[0]) + + intersection = julyFirstHosts.intersection(augustFirstHosts) + + cleanedHostIntersection = intersection.filter(lambda host: host != "host") + cleanedHostIntersection.saveAsTextFile("out/nasa_logs_same_hosts.csv") diff --git a/rdd/nasaApacheWebLogs/SameHostsSolution.scala b/rdd/nasaApacheWebLogs/SameHostsSolution.scala deleted file mode 100644 index 168f9454..00000000 --- a/rdd/nasaApacheWebLogs/SameHostsSolution.scala +++ /dev/null @@ -1,23 +0,0 @@ -package com.sparkTutorial.rdd.nasaApacheWebLogs - -import org.apache.spark.{SparkConf, SparkContext} - -object SameHostsSolution { - - def main(args: Array[String]) { - - val conf = new SparkConf().setAppName("sameHosts").setMaster("local[1]") - val sc = new SparkContext(conf) - - val julyFirstLogs = sc.textFile("in/nasa_19950701.tsv") - val augustFirstLogs = sc.textFile("in/nasa_19950801.tsv") - - val julyFirstHosts = julyFirstLogs.map(line => line.split("\t")(0)) - val augustFirstHosts = augustFirstLogs.map(line => line.split("\t")(0)) - - val intersection = julyFirstHosts.intersection(augustFirstHosts) - - val cleanedHostIntersection = intersection.filter(host => host != "host") - cleanedHostIntersection.saveAsTextFile("out/nasa_logs_same_hosts.csv") - } -} diff --git a/rdd/nasaApacheWebLogs/UnionLogProblem.py b/rdd/nasaApacheWebLogs/UnionLogProblem.py new file mode 100644 index 00000000..1d7a783b --- /dev/null +++ b/rdd/nasaApacheWebLogs/UnionLogProblem.py @@ -0,0 +1,15 @@ +from pyspark import SparkContext + +if __name__ == "__main__": + + ''' + "in/nasa_19950701.tsv" file contains 10000 log lines from one of NASA's apache server for July 1st, 1995. + "in/nasa_19950801.tsv" file contains 10000 log lines for August 1st, 1995 + Create a Spark program to generate a new RDD which contains the log lines from both July 1st and August 1st, + take a 0.1 sample of those log lines and save it to "out/sample_nasa_logs.tsv" file. + + Keep in mind, that the original log files contains the following header lines. + host logname time method url response bytes + + Make sure the head lines are removed in the resulting RDD. + ''' diff --git a/rdd/nasaApacheWebLogs/UnionLogProblem.scala b/rdd/nasaApacheWebLogs/UnionLogProblem.scala deleted file mode 100644 index d56dc986..00000000 --- a/rdd/nasaApacheWebLogs/UnionLogProblem.scala +++ /dev/null @@ -1,18 +0,0 @@ -package com.sparkTutorial.rdd.nasaApacheWebLogs - -object UnionLogProblem { - - def main(args: Array[String]) { - - /* "in/nasa_19950701.tsv" file contains 10000 log lines from one of NASA's apache server for July 1st, 1995. - "in/nasa_19950801.tsv" file contains 10000 log lines for August 1st, 1995 - Create a Spark program to generate a new RDD which contains the log lines from both July 1st and August 1st, - take a 0.1 sample of those log lines and save it to "out/sample_nasa_logs.tsv" file. - - Keep in mind, that the original log files contains the following header lines. - host logname time method url response bytes - - Make sure the head lines are removed in the resulting RDD. - */ - } -} diff --git a/rdd/nasaApacheWebLogs/UnionLogSolutions.py b/rdd/nasaApacheWebLogs/UnionLogSolutions.py new file mode 100644 index 00000000..c69c0332 --- /dev/null +++ b/rdd/nasaApacheWebLogs/UnionLogSolutions.py @@ -0,0 +1,17 @@ +from pyspark import SparkContext + +def isNotHeader(line: str): + return not (line.startswith("host") and "bytes" in line) + +if __name__ == "__main__": + sc = SparkContext("local", "unionLogs") + + julyFirstLogs = sc.textFile("in/nasa_19950701.tsv") + augustFirstLogs = sc.textFile("in/nasa_19950801.tsv") + + aggregatedLogLines = julyFirstLogs.union(augustFirstLogs) + + cleanLogLines = aggregatedLogLines.filter(isNotHeader) + sample = cleanLogLines.sample(withReplacement = True, fraction = 0.1) + + sample.saveAsTextFile("out/sample_nasa_logs.csv") \ No newline at end of file diff --git a/rdd/nasaApacheWebLogs/UnionLogsSolution.scala b/rdd/nasaApacheWebLogs/UnionLogsSolution.scala deleted file mode 100644 index f6db5473..00000000 --- a/rdd/nasaApacheWebLogs/UnionLogsSolution.scala +++ /dev/null @@ -1,26 +0,0 @@ -package com.sparkTutorial.rdd.nasaApacheWebLogs - -import org.apache.spark.{SparkConf, SparkContext} - -object UnionLogsSolution { - - def main(args: Array[String]) { - - val conf = new SparkConf().setAppName("unionLogs").setMaster("local[*]") - - val sc = new SparkContext(conf) - - val julyFirstLogs = sc.textFile("in/nasa_19950701.tsv") - val augustFirstLogs = sc.textFile("in/nasa_19950801.tsv") - - val aggregatedLogLines = julyFirstLogs.union(augustFirstLogs) - - val cleanLogLines = aggregatedLogLines.filter(line => isNotHeader(line)) - - val sample = cleanLogLines.sample(withReplacement = true, fraction = 0.1) - - sample.saveAsTextFile("out/sample_nasa_logs.csv") - } - - def isNotHeader(line: String): Boolean = !(line.startsWith("host") && line.contains("bytes")) -} diff --git a/rdd/persist/PersistExample.py b/rdd/persist/PersistExample.py new file mode 100644 index 00000000..947a6e14 --- /dev/null +++ b/rdd/persist/PersistExample.py @@ -0,0 +1,9 @@ +from pyspark import SparkContext, StorageLevel + +if __name__ == "__main__": + sc = SparkContext("local", "persist") + inputIntegers = [1, 2, 3, 4, 5] + integerRdd = sc.parallelize(inputIntegers) + integerRdd.persist(StorageLevel.MEMORY_ONLY) + integerRdd.reduce(lambda x, y: x*y) + integerRdd.count() diff --git a/rdd/persist/PersistExample.scala b/rdd/persist/PersistExample.scala deleted file mode 100644 index 48ff5821..00000000 --- a/rdd/persist/PersistExample.scala +++ /dev/null @@ -1,22 +0,0 @@ -package com.sparkTutorial.rdd.persist - -import org.apache.log4j.{Level, Logger} -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.storage.StorageLevel - -object PersistExample { - - def main(args: Array[String]) { - Logger.getLogger("org").setLevel(Level.ERROR) - val conf = new SparkConf().setAppName("reduce").setMaster("local[*]") - val sc = new SparkContext(conf) - - val inputIntegers = List(1, 2, 3, 4, 5) - val integerRdd = sc.parallelize(inputIntegers) - - integerRdd.persist(StorageLevel.MEMORY_ONLY) - - integerRdd.reduce((x, y) => x * y) - integerRdd.count() - } -} diff --git a/rdd/reduce/ReduceExample.py b/rdd/reduce/ReduceExample.py new file mode 100644 index 00000000..7eb8392a --- /dev/null +++ b/rdd/reduce/ReduceExample.py @@ -0,0 +1,9 @@ +from pyspark import SparkContext + +if __name__ == "__main__": + sc = SparkContext("local", "reduce") + sc.setLogLevel("ERROR") + inputIntegers = [1, 2, 3, 4, 5] + integerRdd = sc.parallelize(inputIntegers) + product = integerRdd.reduce(lambda x, y: x * y) + print("product is :{}".format(product)) diff --git a/rdd/reduce/ReduceExample.scala b/rdd/reduce/ReduceExample.scala deleted file mode 100644 index 53b863b9..00000000 --- a/rdd/reduce/ReduceExample.scala +++ /dev/null @@ -1,18 +0,0 @@ -package com.sparkTutorial.rdd.reduce -import org.apache.log4j.{Level, Logger} -import org.apache.spark.{SparkConf, SparkContext} - -object ReduceExample { - - def main(args: Array[String]) { - Logger.getLogger("org").setLevel(Level.OFF) - val conf = new SparkConf().setAppName("reduce").setMaster("local[*]") - val sc = new SparkContext(conf) - - val inputIntegers = List(1, 2, 3, 4, 5) - val integerRdd = sc.parallelize(inputIntegers) - - val product = integerRdd.reduce((x, y) => x * y) - println("product is :" + product) - } -} diff --git a/rdd/sumOfNumbers/SumOfNumbersProblem.py b/rdd/sumOfNumbers/SumOfNumbersProblem.py index 327b903e..72671962 100644 --- a/rdd/sumOfNumbers/SumOfNumbersProblem.py +++ b/rdd/sumOfNumbers/SumOfNumbersProblem.py @@ -1,11 +1,10 @@ - import sys from pyspark import SparkContext if __name__ == "__main__": - ''' + ''' Create a Spark program to read the first 100 prime numbers from in/prime_nums.text, print the sum of those numbers to console. Each row of the input file contains 10 prime numbers separated by spaces. - ''' \ No newline at end of file + ''' diff --git a/rdd/sumOfNumbers/SumOfNumbersSolution.py b/rdd/sumOfNumbers/SumOfNumbersSolution.py index 18bbb9e8..b0315c51 100644 --- a/rdd/sumOfNumbers/SumOfNumbersSolution.py +++ b/rdd/sumOfNumbers/SumOfNumbersSolution.py @@ -3,9 +3,10 @@ if __name__ == "__main__": sc = SparkContext("local", "primeNumbers") + sc.setLogLevel("ERROR") lines = sc.textFile("in/prime_nums.text") numbers = lines.flatMap(lambda line: line.split("\t")) validNumbers = numbers.filter(lambda number: number) intNumbers = validNumbers.map(lambda number: int(number)) print("Sum is: ") - print(intNumbers.reduce(lambda x, y: x + y)) \ No newline at end of file + print(intNumbers.reduce(lambda x, y: x + y)) diff --git a/rdd/take/TakeExample.py b/rdd/take/TakeExample.py index 6a91e063..fc73a781 100644 --- a/rdd/take/TakeExample.py +++ b/rdd/take/TakeExample.py @@ -2,9 +2,10 @@ from pyspark import SparkContext if __name__ == "__main__": - sc = SparkContext("local", "take") - inputWords = ["spark", "hadoop", "spark", "hive", "pig", "cassandra", "hadoop"] - wordRdd = sc.parallelize(inputWords) - words = wordRdd.take(3) - for word in words: - print(word) \ No newline at end of file + sc = SparkContext("local", "take") + sc.setLogLevel("ERROR") + inputWords = ["spark", "hadoop", "spark", "hive", "pig", "cassandra", "hadoop"] + wordRdd = sc.parallelize(inputWords) + words = wordRdd.take(3) + for word in words: + print(word)