[5 min IT]PySpark
Spark is a general-purpose distributed data processing engine
Past: process a huge dataset or run computationally-expensive programs on one computer
With Spark: tasks can be divided between multiple computers that communicate with each other to produce an output
MapReduce: processing engine of Hadoop, optimizing tasks to be processed on multiple nodes
Spark: a processing engine that can be used on its own, or used in place of Hadoop MapReduce, taking advantage of the other features of Hadoop
Partitioned data: one piece from the whole dataset; data that has been optimized to be able to be processed on multiple nodes
Fault tolerance: a distributed system’s ability to continue working properly even when a failure occurs
Strict evaluation: sequentially evaluates each expression it comes across
Lazy evaluation: waits until it is told to generate a result, and then performs all the evaluation all at once
Spark RDDs (Resilient Distributed Datasets): immutable, partitioned, no schema (row-by-row, records display like a list)
Spark DataFrames: have a schema + all features of RDDs
Spark DataSets: similar to above, specified upon the creation and not inferred from the type of records; can not used in PySpark because Python is a dynamically-typed language
Transformation: a lazy operation to create one or more new RDDs
Action: all RDD operations that don’t produce an RDD as an output
Lineage Graph: outlines a logical execution plan
Spark Application: a user built program that consists of a driver and its associated executors
Spark job: tasks to be executed with executor processes, as directed by the driver
Run PySpark in Google Colab
spark.apache.org/downloads.html → check latest Spark version 3.x.x
colab.research.google.com →new notebook → copy paste below code
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.x.x/spark-3.x.x-bin-hadoop2.7.tgz
!tar xf spark-3.x.x-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ[“JAVA_HOME”] = “/usr/lib/jvm/java-8-openjdk-amd64”
os.environ[“SPARK_HOME”] = “/content/spark-3.x.x-bin-hadoop2.7”
import findspark
findspark.init()
SparkSession: to use Spark SQL, copy paste below code
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master(“local[*]”) \
.appName(“Learning_Spark”) \
.getOrCreate()
Without “\”, the line is way too long.
spark=SparkSession.builder.master(“local[*]”).appName(“Learning_Spark”).getOrCreate()
.builder
— gives access to Builder API to configure the session.master()
— determines where the program will run;"local[*]"
sets it to run locally on all cores;"local[1]"
to run on one core.appName()
— optional method to name the Spark Application.getOrCreate()
— gets an existingSparkSession
or creates new one if none exists
Open a local file on Google Colab
from google.colab import files
files.upload()
Load data to a DataFrame
data = spark.read.csv(‘xxx.csv’,inferSchema=True, header=True)
data.count()
returns the number of rows data.columns
returns a list of column nameslen(data.columns)
returns the number of columns
data.show(n)
displayed the first n rows along with the headerdata.show()
with no parameters will return the first 20 rowsdata.printSchema()
or data.dtypes
: show string/numeric and nullable=T/F
Show selected columns, first n rows:
data.select(“XX”,”YY”).show(n, truncate=False)
Get summary statistics on selected columns: count, mean, stdv, min, max
data.describe([“XX”,”YY”]).show()
Order a column by count
data.groupBy(“XX”).count().orderBy(“count”, ascending=False).show(n)
Filtering data
filter1 = (data.XX.isNotNull()) | (data.YY.isNotNull())
filter2 = data.ZZ != “123”
data = data.filter(filter1).filter(filter2)
Linear regression
#record all predictors to be Doubles
from pyspark.sql.types import DoubleType
data = data.withColumn(“XX”,data[“XX”].cast(DoubleType()))
#let PySpark recognize the data and create a new DataFrame with predictors
from pyspark.ml.feature import VectorAssembler
inputcols = [“XX”, “YY”]
assembler = VectorAssembler(inputCols= inputcols, outputCol = “predictors”)
predictors = assembler.transform(data)
predictors.columns
Get the predictions and split the data
model_data = predictors.select(“predictors”, “XX”)
train_data,test_data = model_data.randomSplit([0.8,0.2])
Model training and parameters assessment
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = ‘predictors’, labelCol = ‘XX’)
lrModel = lr.fit(train_data)
pred = lrModel.evaluate(test_data)
lrModel.coefficients
lrModel.intercept
pred.predictions.show(5)
Model evaluation
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol=”XX”, predictionCol=”prediction”, metricName=”rmse”)
rmse = eval.evaluate(pred.predictions)
mse = eval.evaluate(pred.predictions, {eval.metricName: “mse”})
mae = eval.evaluate(pred.predictions, {eval.metricName: “mae”})
r2 = eval.evaluate(pred.predictions, {eval.metricName: “r2”})
Reference:
https://towardsdatascience.com/a-neanderthals-guide-to-apache-spark-in-python-9ef1f156d427