diff --git a/advanced/accumulator/StackOverFlowSurvey.py b/advanced/accumulator/StackOverFlowSurvey.py new file mode 100644 index 00000000..be9b0b3a --- /dev/null +++ b/advanced/accumulator/StackOverFlowSurvey.py @@ -0,0 +1,25 @@ +from pyspark import SparkContext +from commons.Utils import Utils + +def filterResponseFromCanada(response, total, missingSalaryMidPoint): + splits = Utils.COMMA_DELIMITER.split(response) + total.add(1) + if not splits[14]: + missingSalaryMidPoint.add(1) + return splits[2] == "Canada" + +if __name__ == "__main__": + sc = SparkContext("local", "StackOverFlowSurvey") + sc.setLogLevel("ERROR") + + total = sc.accumulator(0) + missingSalaryMidPoint = sc.accumulator(0) + + responseRDD = sc.textFile("in/2016-stack-overflow-survey-responses.csv") + + responseFromCanada = responseRDD.filter(lambda response: \ + filterResponseFromCanada(response, total, missingSalaryMidPoint)) + + print("Count of responses from Canada: {}".format(responseFromCanada.count())) + print("Total count of responses: {}".format(total.value)) + print("Count of responses missing salary middle point: {}".format(missingSalaryMidPoint.value)) diff --git a/advanced/accumulator/StackOverFlowSurvey.scala b/advanced/accumulator/StackOverFlowSurvey.scala deleted file mode 100644 index afd71f39..00000000 --- a/advanced/accumulator/StackOverFlowSurvey.scala +++ /dev/null @@ -1,34 +0,0 @@ -package com.sparkTutorial.advanced.accumulator - -import com.sparkTutorial.commons.Utils -import org.apache.log4j.{Level, Logger} -import org.apache.spark.{SparkConf, SparkContext} - -object StackOverFlowSurvey { - - def main(args: Array[String]) { - Logger.getLogger("org").setLevel(Level.ERROR) - val conf = new SparkConf().setAppName("StackOverFlowSurvey").setMaster("local[1]") - val sparkContext = new SparkContext(conf) - - val total = sparkContext.longAccumulator - val missingSalaryMidPoint = sparkContext.longAccumulator - - val responseRDD = sparkContext.textFile("in/2016-stack-overflow-survey-responses.csv") - - val responseFromCanada = responseRDD.filter(response => { - val splits = response.split(Utils.COMMA_DELIMITER, -1) - total.add(1) - - if (splits(14).isEmpty) { - missingSalaryMidPoint.add(1) - } - - splits(2) == "Canada" - }) - - println("Count of responses from Canada: " + responseFromCanada.count()) - println("Total count of responses: " + total.value) - println("Count of responses missing salary middle point: " + missingSalaryMidPoint.value) - } -} diff --git a/advanced/accumulator/StackOverFlowSurveyFollowUp.py b/advanced/accumulator/StackOverFlowSurveyFollowUp.py new file mode 100644 index 00000000..8db80d1f --- /dev/null +++ b/advanced/accumulator/StackOverFlowSurveyFollowUp.py @@ -0,0 +1,28 @@ +from pyspark import SparkContext +from commons.Utils import Utils + +def filterResponseFromCanada(response, total, missingSalaryMidPoint, processedBytes): + processedBytes.add(len(response.encode('utf-8'))) + splits = Utils.COMMA_DELIMITER.split(response) + total.add(1) + if not splits[14]: + missingSalaryMidPoint.add(1) + return splits[2] == "Canada" + +if __name__ == "__main__": + sc = SparkContext("local", "StackOverFlowSurvey") + sc.setLogLevel("ERROR") + + total = sc.accumulator(0) + missingSalaryMidPoint = sc.accumulator(0) + processedBytes = sc.accumulator(0) + + responseRDD = sc.textFile("in/2016-stack-overflow-survey-responses.csv") + + responseFromCanada = responseRDD.filter(lambda response: \ + filterResponseFromCanada(response, total, missingSalaryMidPoint, processedBytes)) + + print("Count of responses from Canada: {}".format(responseFromCanada.count())) + print("Number of bytes processed: {}".format(processedBytes.value)) + print("Total count of responses: {}".format(total.value)) + print("Count of responses missing salary middle point: {}".format(missingSalaryMidPoint.value)) diff --git a/advanced/accumulator/StackOverFlowSurveyFollowUp.scala b/advanced/accumulator/StackOverFlowSurveyFollowUp.scala deleted file mode 100644 index a4698393..00000000 --- a/advanced/accumulator/StackOverFlowSurveyFollowUp.scala +++ /dev/null @@ -1,39 +0,0 @@ -package com.sparkTutorial.advanced.accumulator - -import com.sparkTutorial.commons.Utils -import org.apache.log4j.{Level, Logger} -import org.apache.spark.{SparkConf, SparkContext} - -object StackOverFlowSurveyFollowUp { - - def main(args: Array[String]) { - - Logger.getLogger("org").setLevel(Level.ERROR) - val conf = new SparkConf().setAppName("StackOverFlowSurvey").setMaster("local[1]") - val sparkContext = new SparkContext(conf) - - val total = sparkContext.longAccumulator - val missingSalaryMidPoint = sparkContext.longAccumulator - val processedBytes = sparkContext.longAccumulator - - val responseRDD = sparkContext.textFile("in/2016-stack-overflow-survey-responses.csv") - - val responseFromCanada = responseRDD.filter(response => { - - processedBytes.add(response.getBytes().length) - val splits = response.split(Utils.COMMA_DELIMITER, -1) - total.add(1) - - if (splits(14).isEmpty) { - missingSalaryMidPoint.add(1) - } - splits(2) == "Canada" - - }) - - println("Count of responses from Canada: " + responseFromCanada.count()) - println("Number of bytes processed: " + processedBytes.value) - println("Total count of responses: " + total.value) - println("Count of responses missing salary middle point: " + missingSalaryMidPoint.value) - } -} diff --git a/advanced/broadcast/UkMakerSpaces.py b/advanced/broadcast/UkMakerSpaces.py new file mode 100644 index 00000000..7cc172d0 --- /dev/null +++ b/advanced/broadcast/UkMakerSpaces.py @@ -0,0 +1,29 @@ +from pyspark import SparkContext +from commons.Utils import Utils + +def getPostPrefix(line: str): + splits = Utils.COMMA_DELIMITER.split(line) + postcode = splits[4] + return None if not postcode else postcode.split(" ")[0] + +def loadPostCodeMap(): + lines = open("in/uk-postcode.csv", "r").read().split("\n") + splitsForLines = [Utils.COMMA_DELIMITER.split(line) for line in lines if line != ""] + return {splits[0]: splits[7] for splits in splitsForLines} + +if __name__ == "__main__": + sc = SparkContext("local", "UkMakerSpaces") + sc.setLogLevel("ERROR") + + postCodeMap = sc.broadcast(loadPostCodeMap()) + + makerSpaceRdd = sc.textFile("in/uk-makerspaces-identifiable-data.csv") + + regions = makerSpaceRdd \ + .filter(lambda line: Utils.COMMA_DELIMITER.split(line)[0] != "Timestamp") \ + .filter(lambda line: getPostPrefix(line) is not None) \ + .map(lambda line: postCodeMap.value[getPostPrefix(line)] \ + if getPostPrefix(line) in postCodeMap.value else "Unknow") + + for region, count in regions.countByValue().items(): + print("{} : {}".format(region, count)) diff --git a/advanced/broadcast/UkMakerSpaces.scala b/advanced/broadcast/UkMakerSpaces.scala deleted file mode 100644 index 7ef0ff0a..00000000 --- a/advanced/broadcast/UkMakerSpaces.scala +++ /dev/null @@ -1,40 +0,0 @@ -package com.sparkTutorial.advanced.broadcast - -import com.sparkTutorial.commons.Utils -import org.apache.log4j.{Level, Logger} -import org.apache.spark.{SparkConf, SparkContext} - -import scala.io.Source - -object UkMakerSpaces { - - def main(args: Array[String]) { - Logger.getLogger("org").setLevel(Level.ERROR) - val conf = new SparkConf().setAppName("UkMakerSpaces").setMaster("local[1]") - val sparkContext = new SparkContext(conf) - - val postCodeMap = sparkContext.broadcast(loadPostCodeMap()) - - val makerSpaceRdd = sparkContext.textFile("in/uk-makerspaces-identifiable-data.csv") - - val regions = makerSpaceRdd - .filter(line => line.split(Utils.COMMA_DELIMITER, -1)(0) != "Timestamp") - .filter(line => getPostPrefix(line).isDefined) - .map(line => postCodeMap.value.getOrElse(getPostPrefix(line).get, "Unknown")) - - for ((region, count) <- regions.countByValue()) println(region + " : " + count) - } - - def getPostPrefix(line: String): Option[String] = { - val splits = line.split(Utils.COMMA_DELIMITER, -1) - val postcode = splits(4) - if (postcode.isEmpty) None else Some(postcode.split(" ")(0)) - } - - def loadPostCodeMap(): Map[String, String] = { - Source.fromFile("in/uk-postcode.csv").getLines.map(line => { - val splits = line.split(Utils.COMMA_DELIMITER, -1) - splits(0) -> splits(7) - }).toMap - } -} diff --git a/advanced/broadcast/UkMakerSpacesWithoutBroadcast.py b/advanced/broadcast/UkMakerSpacesWithoutBroadcast.py new file mode 100644 index 00000000..4854f417 --- /dev/null +++ b/advanced/broadcast/UkMakerSpacesWithoutBroadcast.py @@ -0,0 +1,26 @@ +from pyspark import SparkContext +from commons.Utils import Utils + +def getPostPrefixes(line: str): + postcode = Utils.COMMA_DELIMITER.split(line)[4] + cleanedPostCode = postcode.replace("\\s+", "") + return [cleanedPostCode[0:i] for i in range(0,len(cleanedPostCode)+1)] + +def loadPostCodeMap(): + lines = open("in/uk-postcode.csv", "r").read().split("\n") + splitsForLines = [Utils.COMMA_DELIMITER.split(line) for line in lines if line != ""] + return {splits[0]: splits[7] for splits in splitsForLines} + +if __name__ == "__main__": + sc = SparkContext("local", "UkMakerSpaces") + sc.setLogLevel("ERROR") + postCodeMap = loadPostCodeMap() + makerSpaceRdd = sc.textFile("in/uk-makerspaces-identifiable-data.csv") + + regions = makerSpaceRdd \ + .filter(lambda line: Utils.COMMA_DELIMITER.split(line)[0] != "Timestamp") \ + .map(lambda line: next((postCodeMap[prefix] for prefix in getPostPrefixes(line) \ + if prefix in postCodeMap), "Unknow")) + + for region, count in regions.countByValue().items(): + print("{} : {}".format(region, count)) diff --git a/advanced/broadcast/UkMakerSpacesWithoutBroadcast.scala b/advanced/broadcast/UkMakerSpacesWithoutBroadcast.scala deleted file mode 100644 index 8fbee265..00000000 --- a/advanced/broadcast/UkMakerSpacesWithoutBroadcast.scala +++ /dev/null @@ -1,40 +0,0 @@ -package com.sparkTutorial.advanced.broadcast - -import com.sparkTutorial.commons.Utils -import org.apache.log4j.{Level, Logger} -import org.apache.spark.{SparkConf, SparkContext} - -import scala.io.Source - -object UkMakerSpacesWithoutBroadcast { - - def main(args: Array[String]) { - Logger.getLogger("org").setLevel(Level.ERROR) - val conf = new SparkConf().setAppName("UkMakerSpaces").setMaster("local[1]") - val sparkContext = new SparkContext(conf) - val postCodeMap = loadPostCodeMap() - val makerSpaceRdd = sparkContext.textFile("in/uk-makerspaces-identifiable-data.csv") - - val regions = makerSpaceRdd - .filter(line => line.split(Utils.COMMA_DELIMITER, -1)(0) != "Timestamp") - .map(line => { - getPostPrefixes(line).filter(prefix => postCodeMap.contains(prefix)) - .map(prefix => postCodeMap(prefix)) - .headOption.getOrElse("Unknow") - }) - for ((region, count) <- regions.countByValue()) println(region + " : " + count) - } - - def getPostPrefixes(line: String): List[String] = { - val postcode = line.split(Utils.COMMA_DELIMITER, -1)(4) - val cleanedPostCode = postcode.replaceAll("\\s+", "") - (1 to cleanedPostCode.length).map(i => cleanedPostCode.substring(0, i)).toList - } - - def loadPostCodeMap(): Map[String, String] = { - Source.fromFile("in/uk-postcode.csv").getLines.map(line => { - val splits = line.split(Utils.COMMA_DELIMITER, -1) - splits(0) -> splits(7) - }).toMap - } -}