# code for loading the format for the notebook
import os
# path : store the current path to convert back to it later
path = os.getcwd()
os.chdir(os.path.join('..', 'notebook_format'))
from formats import load_style
load_style(plot_style = False)
os.chdir(path)
# 1. magic for inline plot
# 2. magic to print version
# 3. magic so that the notebook will reload external python modules
# 4. magic to enable retina (high resolution) plots
# https://gist.github.com/minrk/3301035
%matplotlib inline
%load_ext watermark
%load_ext autoreload
%autoreload 2
%config InlineBackend.figure_format = 'retina'
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.datasets import load_iris
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
# create the SparkSession class,
# which is the entry point into all functionality in Spark
# The .master part sets it to run on all cores on local, note
# that we should leave out the .master part if we're actually
# running the job on a cluster, or else we won't be actually
# using the cluster
spark = (SparkSession.
builder.
master('local[*]').
appName('PCA').
config(conf = SparkConf()).
getOrCreate())
%watermark -a 'Ethen' -d -t -v -p numpy,pandas,matplotlib,sklearn,pyspark
This is simply an API walkthough, for more details on PCA consider referring to the following documentation.
# load the data and convert it to a pandas DataFrame,
# then use that to create the spark DataFrame
iris = load_iris()
X = iris['data']
y = iris['target']
data = pd.DataFrame(X, columns = iris.feature_names)
dataset = spark.createDataFrame(data, iris.feature_names)
dataset.show(6)
Next, in order to train ML models in Spark later, we'll use the VectorAssembler
to combine a given list of columns into a single vector column.
# specify the input columns' name and
# the combined output column's name
assembler = VectorAssembler(
inputCols = iris.feature_names, outputCol = 'features')
# use it to transform the dataset and select just
# the output column
df = assembler.transform(dataset).select('features')
df.show(6)
Next, we standardize the features, notice here we only need to specify the assembled column as the input feature.
scaler = StandardScaler(
inputCol = 'features',
outputCol = 'scaledFeatures',
withMean = True,
withStd = True
).fit(df)
# when we transform the dataframe, the old
# feature will still remain in it
df_scaled = scaler.transform(df)
df_scaled.show(6)
After the preprocessing step, we fit the PCA model.
n_components = 2
pca = PCA(
k = n_components,
inputCol = 'scaledFeatures',
outputCol = 'pcaFeatures'
).fit(df_scaled)
df_pca = pca.transform(df_scaled)
print('Explained Variance Ratio', pca.explainedVariance.toArray())
df_pca.show(6)
Notice that unlike scikit-learn, we use transform
on the dataframe at hand for all ML models' class after fitting it (calling .fit
on the dataframe). This will return the result in a new column, where the name is specified by the outputCol
argument in the ML models' class.
We can convert it back to a numpy array by extracting the pcaFeatures
column from each row, and use collect
to bring the entire dataset back to a single machine.
# not sure if this is the best way to do it
X_pca = df_pca.rdd.map(lambda row: row.pcaFeatures).collect()
X_pca = np.array(X_pca)
# change default style figure and font size
plt.style.use('fivethirtyeight')
plt.rcParams['figure.figsize'] = 8, 6
plt.rcParams['font.size'] = 12
def plot_iris_pca(X_pca, y):
"""a scatter plot of the 2-dimensional iris data"""
markers = 's', 'x', 'o'
colors = list(plt.rcParams['axes.prop_cycle'])
target = np.unique(y)
for idx, (t, m) in enumerate(zip(target, markers)):
subset = X_pca[y == t]
plt.scatter(subset[:, 0], subset[:, 1], s = 50,
c = colors[idx]['color'], label = t, marker = m)
plt.xlabel('PC 1')
plt.ylabel('PC 2')
plt.legend(loc = 'lower left')
plt.tight_layout()
plt.show()
plot_iris_pca(X_pca, y)
# stop the current sparkSession
spark.stop()