diff --git a/commons/Utils.py b/commons/Utils.py index 2038566d..a9405bca 100644 --- a/commons/Utils.py +++ b/commons/Utils.py @@ -1,5 +1,5 @@ import re class Utils(): - - COMMA_DELIMITER = re.compile(''',(?=(?:[^'"]|'[^']*'|"[^"]*")*$)''') + + COMMA_DELIMITER = re.compile(''',(?=(?:[^"]*"[^"]*")*[^"]*$)''') diff --git a/commons/Utils.scala b/commons/Utils.scala deleted file mode 100644 index f9b8c8d3..00000000 --- a/commons/Utils.scala +++ /dev/null @@ -1,6 +0,0 @@ -package com.sparkTutorial.commons - -object Utils { - // a regular expression which matches commas but not commas within double quotations - val COMMA_DELIMITER = ",(?=([^\"]*\"[^\"]*\")*[^\"]*$)" -} diff --git a/sparkSql/HousePriceProblem.py b/sparkSql/HousePriceProblem.py new file mode 100644 index 00000000..a5f88bfa --- /dev/null +++ b/sparkSql/HousePriceProblem.py @@ -0,0 +1,38 @@ +if __name__ == "__main__": + + ''' + Create a Spark program to read the house data from in/RealEstate.csv, + group by location, aggregate the average price per SQ Ft and sort by average price per SQ Ft. + + The houses dataset contains a collection of recent real estate listings in San Luis Obispo county and + around it.  + + The dataset contains the following fields: + 1. MLS: Multiple listing service number for the house (unique ID). + 2. Location: city/town where the house is located. Most locations are in San Luis Obispo county and + northern Santa Barbara county (Santa Maria­Orcutt, Lompoc, Guadelupe, Los Alamos), but there + some out of area locations as well. + 3. Price: the most recent listing price of the house (in dollars). + 4. Bedrooms: number of bedrooms. + 5. Bathrooms: number of bathrooms. + 6. Size: size of the house in square feet. + 7. Price/SQ.ft: price of the house per square foot. + 8. Status: type of sale. Thee types are represented in the dataset: Short Sale, Foreclosure and Regular. + + Each field is comma separated. + + Sample output: + + +----------------+-----------------+ + | Location| avg(Price SQ Ft)| + +----------------+-----------------+ + | Oceano| 95.0| + | Bradley| 206.0| + | San Luis Obispo| 359.0| + | Santa Ynez| 491.4| + | Cayucos| 887.0| + |................|.................| + |................|.................| + |................|.................| + ''' + diff --git a/sparkSql/HousePriceProblem.scala b/sparkSql/HousePriceProblem.scala deleted file mode 100644 index 8a868018..00000000 --- a/sparkSql/HousePriceProblem.scala +++ /dev/null @@ -1,40 +0,0 @@ -package com.sparkTutorial.sparkSql - - -object HousePriceProblem { - - /* Create a Spark program to read the house data from in/RealEstate.csv, - group by location, aggregate the average price per SQ Ft and sort by average price per SQ Ft. - - The houses dataset contains a collection of recent real estate listings in San Luis Obispo county and - around it.  - - The dataset contains the following fields: - 1. MLS: Multiple listing service number for the house (unique ID). - 2. Location: city/town where the house is located. Most locations are in San Luis Obispo county and - northern Santa Barbara county (Santa Maria­Orcutt, Lompoc, Guadelupe, Los Alamos), but there - some out of area locations as well. - 3. Price: the most recent listing price of the house (in dollars). - 4. Bedrooms: number of bedrooms. - 5. Bathrooms: number of bathrooms. - 6. Size: size of the house in square feet. - 7. Price/SQ.ft: price of the house per square foot. - 8. Status: type of sale. Thee types are represented in the dataset: Short Sale, Foreclosure and Regular. - - Each field is comma separated. - - Sample output: - - +----------------+-----------------+ - | Location| avg(Price SQ Ft)| - +----------------+-----------------+ - | Oceano| 95.0| - | Bradley| 206.0| - | San Luis Obispo| 359.0| - | Santa Ynez| 491.4| - | Cayucos| 887.0| - |................|.................| - |................|.................| - |................|.................| - */ -} diff --git a/sparkSql/HousePriceSolution.py b/sparkSql/HousePriceSolution.py new file mode 100644 index 00000000..d6982f4c --- /dev/null +++ b/sparkSql/HousePriceSolution.py @@ -0,0 +1,17 @@ +from pyspark.sql import SparkSession + +PRICE_SQ_FT = "Price SQ Ft" + +if __name__ == "__main__": + + session = SparkSession.builder.appName("HousePriceSolution").master("local").getOrCreate() + session.sparkContext.setLogLevel("ERROR") + realEstate = session.read \ + .option("header","true") \ + .option("inferSchema", value=True) \ + .csv("in/RealEstate.csv") + + realEstate.groupBy("Location") \ + .avg(PRICE_SQ_FT) \ + .orderBy("avg(Price SQ FT)") \ + .show() diff --git a/sparkSql/HousePriceSolution.scala b/sparkSql/HousePriceSolution.scala deleted file mode 100644 index 53a4f9a8..00000000 --- a/sparkSql/HousePriceSolution.scala +++ /dev/null @@ -1,25 +0,0 @@ -package com.sparkTutorial.sparkSql - -import org.apache.log4j.{Level, Logger} -import org.apache.spark.sql.SparkSession - -object HousePriceSolution { - - val PRICE_SQ_FT = "Price SQ Ft" - - def main(args: Array[String]) { - - Logger.getLogger("org").setLevel(Level.ERROR) - val session = SparkSession.builder().appName("HousePriceSolution").master("local[1]").getOrCreate() - - val realEstate = session.read - .option("header", "true") - .option("inferSchema", value = true) - .csv("in/RealEstate.csv") - - realEstate.groupBy("Location") - .avg(PRICE_SQ_FT) - .orderBy("avg(Price SQ Ft)") - .show() - } -} diff --git a/sparkSql/RddDataframeConversion.py b/sparkSql/RddDataframeConversion.py new file mode 100644 index 00000000..76359eff --- /dev/null +++ b/sparkSql/RddDataframeConversion.py @@ -0,0 +1,39 @@ +from pyspark.sql import SparkSession +from commons.Utils import Utils + +def getColNames(line: str): + splits = Utils.COMMA_DELIMITER.split(line) + return [splits[2], splits[6], splits[9], splits[14]] + +def mapResponseRdd(line: str): + splits = Utils.COMMA_DELIMITER.split(line) + double1 = None if not splits[6] else float(splits[6]) + double2 = None if not splits[14] else float(splits[14]) + return splits[2], double1, splits[9], double2 + +if __name__ == "__main__": + + session = SparkSession.builder.appName("StackOverFlowSurvey").master("local").getOrCreate() + sc = session.sparkContext + sc.setLogLevel("ERROR") + + lines = sc.textFile("in/2016-stack-overflow-survey-responses.csv") + + colNames = lines \ + .filter(lambda line: Utils.COMMA_DELIMITER.split(line)[2] == "country") \ + .map(getColNames) + + responseRDD = lines \ + .filter(lambda line: not Utils.COMMA_DELIMITER.split(line)[2] == "country") \ + .map(mapResponseRdd) + + responseDataFrame = responseRDD.toDF(colNames.collect()[0]) + + print("=== Print out schema ===") + responseDataFrame.printSchema() + + print("=== Print 20 records of responses table ===") + responseDataFrame.show(20) + + for response in responseDataFrame.rdd.collect(): + print(response) diff --git a/sparkSql/RddDatasetConversion.scala b/sparkSql/RddDatasetConversion.scala deleted file mode 100644 index 6366c4dc..00000000 --- a/sparkSql/RddDatasetConversion.scala +++ /dev/null @@ -1,42 +0,0 @@ -package com.sparkTutorial.sparkSql - -import com.sparkTutorial.commons.Utils -import org.apache.log4j.{Level, Logger} -import org.apache.spark.sql.SparkSession -import org.apache.spark.{SparkConf, SparkContext} - -object RddDatasetConversion { - - def main(args: Array[String]) { - Logger.getLogger("org").setLevel(Level.ERROR) - val conf = new SparkConf().setAppName("StackOverFlowSurvey").setMaster("local[1]") - - val sc = new SparkContext(conf) - - val session = SparkSession.builder().appName("StackOverFlowSurvey").master("local[1]").getOrCreate() - - val lines = sc.textFile("in/2016-stack-overflow-survey-responses.csv") - - val responseRDD = lines - .filter(line => !line.split(Utils.COMMA_DELIMITER, -1)(2).equals("country")) - .map(line => { - val splits = line.split(Utils.COMMA_DELIMITER, -1) - Response(splits(2), toInt(splits(6)), splits(9), toInt(splits(14))) - }) - - import session.implicits._ - val responseDataset = responseRDD.toDS() - - System.out.println("=== Print out schema ===") - responseDataset.printSchema() - - System.out.println("=== Print 20 records of responses table ===") - responseDataset.show(20) - - for (response <- responseDataset.rdd.collect()) println(response) - } - - def toInt(split: String): Option[Double] = { - if (split.isEmpty) None else Some(split.toDouble) - } -} diff --git a/sparkSql/Response.scala b/sparkSql/Response.scala deleted file mode 100644 index 4a65d753..00000000 --- a/sparkSql/Response.scala +++ /dev/null @@ -1,3 +0,0 @@ -package com.sparkTutorial.sparkSql - -case class Response(country: String, age_midpoint: Option[Double], occupation: String, salary_midpoint: Option[Double]) diff --git a/sparkSql/StackOverFlowSurvey.py b/sparkSql/StackOverFlowSurvey.py new file mode 100644 index 00000000..e7e9d94a --- /dev/null +++ b/sparkSql/StackOverFlowSurvey.py @@ -0,0 +1,52 @@ +from pyspark.sql import SparkSession + +AGE_MIDPOINT = "age_midpoint" +SALARY_MIDPOINT = "salary_midpoint" +SALARY_MIDPOINT_BUCKET = "salary_midpoint_bucket" + +if __name__ == "__main__": + + session = SparkSession.builder.appName("StackOverFlowSurvey").master("local").getOrCreate() + session.sparkContext.setLogLevel("ERROR") + dataFrameReader = session.read + + responses = dataFrameReader \ + .option("header", "true") \ + .option("inferSchema", value = True) \ + .csv("in/2016-stack-overflow-survey-responses.csv") + + print("=== Print out schema ===") + responses.printSchema() + + responseWithSelectedColumns = responses.select("country", "occupation", AGE_MIDPOINT, SALARY_MIDPOINT) + + print("=== Print the selected columns of the table ===") + responseWithSelectedColumns.show() + + print("=== Print records where the response is from Afghanistan ===") + responseWithSelectedColumns.filter(responseWithSelectedColumns["country"] == "Afghanistan").show() + + print("=== Print the count of occupations ===") + groupedDataset = responseWithSelectedColumns.groupBy("occupation") + groupedDataset.count().show() + + print("=== Print records with average mid age less than 20 ===") + responseWithSelectedColumns.filter(responseWithSelectedColumns[AGE_MIDPOINT] < 20).show() + + print("=== Print the result by salary middle point in descending order ===") + responseWithSelectedColumns.orderBy(responseWithSelectedColumns[SALARY_MIDPOINT], ascending=False).show() + + print("=== Group by country and aggregate by average salary middle point ===") + datasetGroupByCountry = responseWithSelectedColumns.groupBy("country") + datasetGroupByCountry.avg(SALARY_MIDPOINT).show() + + responseWithSalaryBucket = responses.withColumn(SALARY_MIDPOINT_BUCKET, + ((responses[SALARY_MIDPOINT]/20000).cast("integer")*20000)) + + print("=== With salary bucket column ===") + responseWithSalaryBucket.select(SALARY_MIDPOINT, SALARY_MIDPOINT_BUCKET).show() + + print("=== Group by salary bucket ===") + responseWithSalaryBucket.groupBy(SALARY_MIDPOINT_BUCKET).count().orderBy(SALARY_MIDPOINT_BUCKET).show() + + session.stop() diff --git a/sparkSql/StackOverFlowSurvey.scala b/sparkSql/StackOverFlowSurvey.scala deleted file mode 100644 index bac8c27e..00000000 --- a/sparkSql/StackOverFlowSurvey.scala +++ /dev/null @@ -1,60 +0,0 @@ -package com.sparkTutorial.sparkSql - -import org.apache.log4j.{Level, Logger} -import org.apache.spark.sql.SparkSession - -object StackOverFlowSurvey { - - val AGE_MIDPOINT = "age_midpoint" - val SALARY_MIDPOINT = "salary_midpoint" - val SALARY_MIDPOINT_BUCKET = "salary_midpoint_bucket" - - def main(args: Array[String]) { - - Logger.getLogger("org").setLevel(Level.ERROR) - val session = SparkSession.builder().appName("StackOverFlowSurvey").master("local[1]").getOrCreate() - - val dataFrameReader = session.read - - val responses = dataFrameReader - .option("header", "true") - .option("inferSchema", value = true) - .csv("in/2016-stack-overflow-survey-responses.csv") - - System.out.println("=== Print out schema ===") - responses.printSchema() - - val responseWithSelectedColumns = responses.select("country", "occupation", AGE_MIDPOINT, SALARY_MIDPOINT) - - System.out.println("=== Print the selected columns of the table ===") - responseWithSelectedColumns.show() - - System.out.println("=== Print records where the response is from Afghanistan ===") - responseWithSelectedColumns.filter(responseWithSelectedColumns.col("country").===("Afghanistan")).show() - - System.out.println("=== Print the count of occupations ===") - val groupedDataset = responseWithSelectedColumns.groupBy("occupation") - groupedDataset.count().show() - - System.out.println("=== Print records with average mid age less than 20 ===") - responseWithSelectedColumns.filter(responseWithSelectedColumns.col(AGE_MIDPOINT) < 20).show() - - System.out.println("=== Print the result by salary middle point in descending order ===") - responseWithSelectedColumns.orderBy(responseWithSelectedColumns.col(SALARY_MIDPOINT).desc).show() - - System.out.println("=== Group by country and aggregate by average salary middle point ===") - val datasetGroupByCountry = responseWithSelectedColumns.groupBy("country") - datasetGroupByCountry.avg(SALARY_MIDPOINT).show() - - val responseWithSalaryBucket = responses.withColumn(SALARY_MIDPOINT_BUCKET, - responses.col(SALARY_MIDPOINT).divide(20000).cast("integer").multiply(20000)) - - System.out.println("=== With salary bucket column ===") - responseWithSalaryBucket.select(SALARY_MIDPOINT, SALARY_MIDPOINT_BUCKET).show() - - System.out.println("=== Group by salary bucket ===") - responseWithSalaryBucket.groupBy(SALARY_MIDPOINT_BUCKET).count().orderBy(SALARY_MIDPOINT_BUCKET).show() - - session.stop() - } -} diff --git a/sparkSql/TypedDataset.scala b/sparkSql/TypedDataset.scala deleted file mode 100644 index 993992d9..00000000 --- a/sparkSql/TypedDataset.scala +++ /dev/null @@ -1,55 +0,0 @@ -package com.sparkTutorial.sparkSql - -import org.apache.log4j.{Level, Logger} -import org.apache.spark.sql.SparkSession - -object TypedDataset { - - val AGE_MIDPOINT = "age_midpoint" - val SALARY_MIDPOINT = "salary_midpoint" - val SALARY_MIDPOINT_BUCKET = "salaryMidpointBucket" - - def main(args: Array[String]) { - Logger.getLogger("org").setLevel(Level.ERROR) - val session = SparkSession.builder().appName("StackOverFlowSurvey").master("local[*]").getOrCreate() - val dataFrameReader = session.read - - val responses = dataFrameReader - .option("header", "true") - .option("inferSchema", value = true) - .csv("in/2016-stack-overflow-survey-responses.csv") - - val responseWithSelectedColumns = responses.select("country", "age_midpoint", "occupation", "salary_midpoint") - - import session.implicits._ - val typedDataset = responseWithSelectedColumns.as[Response] - - System.out.println("=== Print out schema ===") - typedDataset.printSchema() - - System.out.println("=== Print 20 records of responses table ===") - typedDataset.show(20) - - System.out.println("=== Print the responses from Afghanistan ===") - typedDataset.filter(response => response.country == "Afghanistan").show() - - System.out.println("=== Print the count of occupations ===") - typedDataset.groupBy(typedDataset.col("occupation")).count().show() - - System.out.println("=== Print responses with average mid age less than 20 ===") - typedDataset.filter(response => response.age_midpoint.isDefined && response.age_midpoint.get < 20.0).show() - - System.out.println("=== Print the result by salary middle point in descending order ===") - typedDataset.orderBy(typedDataset.col(SALARY_MIDPOINT).desc).show() - - System.out.println("=== Group by country and aggregate by average salary middle point ===") - typedDataset.filter(response => response.salary_midpoint.isDefined).groupBy("country").avg(SALARY_MIDPOINT).show() - - System.out.println("=== Group by salary bucket ===") - typedDataset.map(response => response.salary_midpoint.map(point => Math.round(point / 20000) * 20000).orElse(None)) - .withColumnRenamed("value", SALARY_MIDPOINT_BUCKET) - .groupBy(SALARY_MIDPOINT_BUCKET) - .count() - .orderBy(SALARY_MIDPOINT_BUCKET).show() - } -} diff --git a/sparkSql/join/UkMakerSpaces.py b/sparkSql/join/UkMakerSpaces.py new file mode 100644 index 00000000..446cbf6c --- /dev/null +++ b/sparkSql/join/UkMakerSpaces.py @@ -0,0 +1,27 @@ +from pyspark.sql import SparkSession, functions as fs + +if __name__ == "__main__": + + session = SparkSession.builder.appName("UkMakerSpaces").master("local").getOrCreate() + sc = session.sparkContext + sc.setLogLevel("ERROR") + + makerSpace = session.read \ + .option("header", "true") \ + .csv("in/uk-makerspaces-identifiable-data.csv") + + postCode = session.read \ + .option("header", "true") \ + .csv("in/uk-postcode.csv") \ + .withColumn("PostCode", fs.concat_ws("", fs.col("PostCode"), fs.lit(" "))) + + print("=== Print 20 records of makerspace table ===") + makerSpace.select("Name of makerspace", "Postcode").show() + + print("=== Print 20 records of postcode table ===") + postCode.show() + + joined = makerSpace.join(postCode, makerSpace["Postcode"].startswith(postCode["Postcode"]), "left_outer") + + print("=== Group by Region ===") + joined.groupBy("Region").count().show(200) \ No newline at end of file diff --git a/sparkSql/join/UkMakerSpaces.scala b/sparkSql/join/UkMakerSpaces.scala deleted file mode 100644 index 5026f655..00000000 --- a/sparkSql/join/UkMakerSpaces.scala +++ /dev/null @@ -1,30 +0,0 @@ -package com.sparkTutorial.sparkSql.join - -import org.apache.log4j.{Level, Logger} -import org.apache.spark.sql.{SparkSession, functions} - -object UkMakerSpaces { - - def main(args: Array[String]) { - - Logger.getLogger("org").setLevel(Level.ERROR) - - val session = SparkSession.builder().appName("UkMakerSpaces").master("local[*]").getOrCreate() - - val makerSpace = session.read.option("header", "true").csv("in/uk-makerspaces-identifiable-data.csv") - - val postCode = session.read.option("header", "true").csv("in/uk-postcode.csv") - .withColumn("PostCode", functions.concat_ws("", functions.col("PostCode"), functions.lit(" "))) - - System.out.println("=== Print 20 records of makerspace table ===") - makerSpace.select("Name of makerspace", "Postcode").show() - - System.out.println("=== Print 20 records of postcode table ===") - postCode.show() - - val joined = makerSpace.join(postCode, makerSpace.col("Postcode").startsWith(postCode.col("Postcode")), "left_outer") - - System.out.println("=== Group by Region ===") - joined.groupBy("Region").count().show(200) - } -}