We have already discussed regarding StringIndexer (link)
class pyspark.ml.feature.OneHotEncoder(inputCols=None, outputCols=None, handleInvalid='error', dropLast=True, inputCol=None, outputCol=None) - One Hot Encoding is a technique for converting categorical attributes into a binary vector.
A one-hot encoder that maps a column of category indices to a column of binary vectors, with at most a single one-value per row that indicates the input category index.
For example with 5 categories, an input value of 2.0 would map to an output vector of [0.0, 0.0, 1.0, 0.0].
The last category is not included by default (configurable via dropLast), because it makes the vector entries sum up to one, and hence linearly dependent. So an input value of 4.0 maps to [0.0, 0.0, 0.0, 0.0].
#import SparkSession
from pyspark.sql import SparkSession
SparkSession is an entry point to Spark to work with RDD, DataFrame, and Dataset. To create SparkSession in Python, we need to use the builder() method and calling getOrCreate() method.
If SparkSession already exists it returns otherwise create a new SparkSession.
spark = SparkSession.builder.appName('xvspark').getOrCreate()
from pyspark.sql.types import *
StructType class to define the structure of the DataFrame.
#create the structure of schema
schema = StructType().add("id","integer").add("name","string").add("qualification","string").add("age", "integer").add("gender", "string")
#create data
data = [
(1,'John',"B.A.", 20, "Male"),
(2,'Martha',"B.Com.", 20, "Female"),
(3,'Mona',"B.Com.", 21, "Female"),
(4,'Harish',"B.Sc.", 22, "Male"),
(5,'Jonny',"B.A.", 22, "Male"),
(6,'Maria',"B.A.", 23, "Female"),
(7,'Monalisa',"B.A.", 21, "Female")
]
#create dataframe
df = spark.createDataFrame(data, schema=schema)
#columns of dataframe
df.columns
df.show()
We can not apply OneHotEncoder to string columns directly. We need to first convert string columns to numeric value. For that we will use StringIndexer. After that we can apply OneHotEncoder.
#import required libraries
from pyspark.ml.feature import StringIndexer
qualification_indexer = StringIndexer(inputCol="qualification", outputCol="qualificationIndex")
#Fits a model to the input dataset with optional parameters.
df1 = qualification_indexer.fit(df).transform(df)
df1.show()
"B.A." gets index 0 because it is the most frequent, then "B.Com" gets index 1 and "B.Sc." gets index 2.
gender_indexer = StringIndexer(inputCol="gender", outputCol="genderIndex")
#Fits a model to the input dataset with optional parameters.
df2 = gender_indexer.fit(df).transform(df)
df2.show()
from pyspark.ml.feature import OneHotEncoder
#onehotencoder to qualificationIndex
onehotencoder_qualification_vector = OneHotEncoder(inputCol="qualificationIndex", outputCol="qualification_vec")
df11 = onehotencoder_qualification_vector.fit(df1).transform(df1)
df11.show()
#onehotencoder to genderIndex
onehotencoder_gender_vector = OneHotEncoder(inputCol="genderIndex", outputCol="gender_vec")
df12 = onehotencoder_gender_vector.fit(df2).transform(df2)
df12.show()
onehotencoder_age_vector = OneHotEncoder(inputCol="age", outputCol="age_vec")
df13 = onehotencoder_age_vector.fit(df2).transform(df2)
df13.show()
age_indexer = StringIndexer(inputCol="age", outputCol="ageIndex")
df13 = age_indexer.fit(df).transform(df)
onehotencoder_age_vector = OneHotEncoder(inputCol="ageIndex", outputCol="age_vec")
df14 = onehotencoder_age_vector.fit(df13).transform(df13)
df14.show()
#import module
from pyspark.ml import Pipeline
schema = StructType().add("id","integer").add("name","string").add("qualification","string").add("age", "integer").add("gender", "string")
df = spark.createDataFrame(data, schema=schema)
qualification_indexer = StringIndexer(inputCol="qualification", outputCol="qualificationIndex")
gender_indexer = StringIndexer(inputCol="gender", outputCol="genderIndex")
age_indexer = StringIndexer(inputCol="age", outputCol="ageIndex")
onehotencoder_qualification_vector = OneHotEncoder(inputCol="qualificationIndex", outputCol="qualification_vec")
onehotencoder_gender_vector = OneHotEncoder(inputCol="genderIndex", outputCol="gender_vec")
onehotencoder_age_vector = OneHotEncoder(inputCol="ageIndex", outputCol="age_vec")
pipeline = Pipeline(stages=[qualification_indexer,
gender_indexer,
age_indexer,
onehotencoder_qualification_vector,
onehotencoder_gender_vector,
onehotencoder_age_vector
])
df_transformed = pipeline.fit(df).transform(df)
df_transformed.show()
df_transformed.toPandas()