[5 min IT]PySpark

Lisa C. L.
3 min readAug 26, 2020

Spark is a general-purpose distributed data processing engine

Past: process a huge dataset or run computationally-expensive programs on one computer

With Spark: tasks can be divided between multiple computers that communicate with each other to produce an output

MapReduce: processing engine of Hadoop, optimizing tasks to be processed on multiple nodes

Spark: a processing engine that can be used on its own, or used in place of Hadoop MapReduce, taking advantage of the other features of Hadoop

Partitioned data: one piece from the whole dataset; data that has been optimized to be able to be processed on multiple nodes

Fault tolerance: a distributed system’s ability to continue working properly even when a failure occurs

Strict evaluation: sequentially evaluates each expression it comes across
Lazy evaluation: waits until it is told to generate a result, and then performs all the evaluation all at once

Spark RDDs (Resilient Distributed Datasets): immutable, partitioned, no schema (row-by-row, records display like a list)

Spark DataFrames: have a schema + all features of RDDs

Spark DataSets: similar to above, specified upon the creation and not inferred from the type of records; can not used in PySpark because Python is a dynamically-typed language

Transformation: a lazy operation to create one or more new RDDs

Action: all RDD operations that don’t produce an RDD as an output

Lineage Graph: outlines a logical execution plan

Spark Application: a user built program that consists of a driver and its associated executors

Spark job: tasks to be executed with executor processes, as directed by the driver

Run PySpark in Google Colab

spark.apache.org/downloads.html → check latest Spark version 3.x.x

colab.research.google.com →new notebook → copy paste below code

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.x.x/spark-3.x.x-bin-hadoop2.7.tgz
!tar xf spark-3.x.x-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ[“JAVA_HOME”] = “/usr/lib/jvm/java-8-openjdk-amd64”
os.environ[“SPARK_HOME”] = “/content/spark-3.x.x-bin-hadoop2.7”
import findspark
findspark.init()

SparkSession: to use Spark SQL, copy paste below code

from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master(“local[*]”) \
.appName(“Learning_Spark”) \
.getOrCreate()

Without “\”, the line is way too long.

spark=SparkSession.builder.master(“local[*]”).appName(“Learning_Spark”).getOrCreate()

  • .builder — gives access to Builder API to configure the session
  • .master() — determines where the program will run; "local[*]" sets it to run locally on all cores; "local[1]" to run on one core
  • .appName() — optional method to name the Spark Application
  • .getOrCreate() — gets an existing SparkSession or creates new one if none exists

Open a local file on Google Colab

from google.colab import files
files.upload()

Load data to a DataFrame

data = spark.read.csv(‘xxx.csv’,inferSchema=True, header=True)

data.count() returns the number of rows
data.columns returns a list of column names
len(data.columns) returns the number of columns

data.show(n) displayed the first n rows along with the header
data.show() with no parameters will return the first 20 rows
data.printSchema() or data.dtypes: show string/numeric and nullable=T/F

Show selected columns, first n rows:

data.select(“XX”,”YY”).show(n, truncate=False)

Get summary statistics on selected columns: count, mean, stdv, min, max

data.describe([“XX”,”YY”]).show()

Order a column by count

data.groupBy(“XX”).count().orderBy(“count”, ascending=False).show(n)

Filtering data

filter1 = (data.XX.isNotNull()) | (data.YY.isNotNull())
filter2 = data.ZZ != “123”
data = data.filter(filter1).filter(filter2)

Linear regression

#record all predictors to be Doubles

from pyspark.sql.types import DoubleType
data = data.withColumn(“XX”,data[“XX”].cast(DoubleType()))

#let PySpark recognize the data and create a new DataFrame with predictors

from pyspark.ml.feature import VectorAssembler
inputcols = [“XX”, “YY”]
assembler = VectorAssembler(inputCols= inputcols, outputCol = “predictors”)
predictors = assembler.transform(data)
predictors.columns

Get the predictions and split the data

model_data = predictors.select(“predictors”, “XX”)
train_data,test_data = model_data.randomSplit([0.8,0.2])

Model training and parameters assessment

from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = ‘predictors’, labelCol = ‘XX’)
lrModel = lr.fit(train_data)
pred = lrModel.evaluate(test_data)
lrModel.coefficients
lrModel.intercept
pred.predictions.show(5)

Model evaluation

from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol=”XX”, predictionCol=”prediction”, metricName=”rmse”)
rmse = eval.evaluate(pred.predictions)
mse = eval.evaluate(pred.predictions, {eval.metricName: “mse”})
mae = eval.evaluate(pred.predictions, {eval.metricName: “mae”})
r2 = eval.evaluate(pred.predictions, {eval.metricName: “r2”})

Reference:

https://towardsdatascience.com/a-neanderthals-guide-to-apache-spark-in-python-9ef1f156d427

--

--