DataFrame-based machine learning APIs to let users quickly assemble and configure practical machine learning pipelines.
class pyspark.ml.feature.StringIndexer(inputCol=None, outputCol=None, inputCols=None, outputCols=None, handleInvalid='error', stringOrderType='frequencyDesc') - StringIndexer encodes a string column of labels to a column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels).
By default, this is ordered by label frequencies so the most frequent label gets index 0. The ordering behavior is controlled by setting stringOrderType. Its default value is 'frequencyDesc'. In case of equal frequency when under frequencyDesc/Asc, the strings are further sorted alphabetically
four ordering options are supported:
1. "frequencyDesc": descending order by label frequency (most frequent label assigned 0)
2. "frequencyAsc": ascending order by label frequency (least frequent label assigned 0)
3. "alphabetDesc": descending alphabetical order
4. "alphabetAsc": ascending alphabetical order (default = "frequencyDesc")
#import SparkSession
from pyspark.sql import SparkSession
SparkSession is an entry point to Spark to work with RDD, DataFrame, and Dataset. To create SparkSession in Python, we need to use the builder() method and calling getOrCreate() method.
If SparkSession already exists it returns otherwise create a new SparkSession.
spark = SparkSession.builder.appName('xvspark').getOrCreate()
from pyspark.sql.types import *
StructType class to define the structure of the DataFrame.
#create the structure of schema
schema = StructType().add("id","integer").add("name","string").add("qualification","string").add("age", "integer").add("gender", "string")
#create data
data = [
(1,'John',"B.A.", 20, "Male"),
(2,'Martha',"B.Com.", 20, "Female"),
(3,'Mona',"B.Com.", 21, "Female"),
(4,'Harish',"B.Sc.", 22, "Male"),
(5,'Jonny',"B.A.", 22, "Male"),
(6,'Maria',"B.A.", 23, "Female"),
(7,'Monalisa',"B.A.", 21, "Female")
]
#create dataframe
df = spark.createDataFrame(data, schema=schema)
#columns of dataframe
df.columns
df.show()
#import required libraries
from pyspark.ml.feature import StringIndexer
qualification is a string column with three different labels. Applying StringIndexer with qualification as the input column and qualificationIndex as the output column.
qualification_indexer = StringIndexer(inputCol="qualification", outputCol="qualificationIndex")
#Fits a model to the input dataset with optional parameters.
df1 = qualification_indexer.fit(df).transform(df)
df1.show()
"B.A." gets index 0 because it is the most frequent, then "B.Com" gets index 1 and "B.Sc." gets index 2.
gender_indexer = StringIndexer(inputCol="gender", outputCol="genderIndex")
#Fits a model to the input dataset with optional parameters.
df2 = gender_indexer.fit(df).transform(df)
df2.show()
In machine learning, it is common to run a sequence of algorithms to process and learn from data. E.g., a simple text document processing workflow might include several stages:
Split each document’s text into words.
Convert each document’s words into a numerical feature vector.
Learn a prediction model using the feature vectors and labels.
MLlib represents such a workflow as a Pipeline, which consists of a sequence of PipelineStages (Transformers and Estimators) to be run in a specific order. We will use this simple workflow as a running example in this section.
A Pipeline is specified as a sequence of stages, and each stage is either a Transformer or an Estimator. These stages are run in order, and the input DataFrame is transformed as it passes through each stage. For Transformer stages, the transform() method is called on the DataFrame. For Estimator stages, the fit() method is called to produce a Transformer (which becomes part of the PipelineModel, or fitted Pipeline), and that Transformer’s transform() method is called on the DataFrame.
#import module
from pyspark.ml import Pipeline
schema = StructType().add("id","integer").add("name","string").add("qualification","string").add("age", "integer").add("gender", "string")
df = spark.createDataFrame(data, schema=schema)
qualification_indexer = StringIndexer(inputCol="qualification", outputCol="qualificationIndex")
gender_indexer = StringIndexer(inputCol="gender", outputCol="genderIndex")
pipeline = Pipeline(stages=[qualification_indexer, gender_indexer])
model = pipeline.fit(df).transform(df)
model.show()