Logistic regression is a popular Machine Learning classification algorithm to predict a categorical response.
In statistics, the logistic model is used to model the probability of a certain class or event existing such as yes/no, pass/fail, win/lose, alive/dead or healthy/sick.
That binary variable contains data coded as 1 (yes, pass, win, alive, etc.) or 0 (no, fail, lose, dead, etc.).
This is one of three domains provided by the Oncology Institute that has repeatedly appeared in the machine learning literature.
If you want to download data, you can download here.
This data set includes 201 instances of one class and 85 instances of another class. The instances are described by 9 attributes, some of which are linear and some are nominal.
Attribute Information:
- Class: no-recurrence-events, recurrence-events
- age: 10-19, 20-29, 30-39, 40-49, 50-59, 60-69, 70-79, 80-89, 90-99.
- menopause: lt40, ge40, premeno.
- tumor-size: 0-4, 5-9, 10-14, 15-19, 20-24, 25-29, 30-34, 35-39, 40-44, 45-49, 50-54, 55-59.
- inv-nodes: 0-2, 3-5, 6-8, 9-11, 12-14, 15-17, 18-20, 21-23, 24-26, 27-29, 30-32, 33-35, 36-39.
- node-caps: yes, no.
- deg-malig: 1, 2, 3.
- breast: left, right.
- breast-quad: left-up, left-low, right-up, right-low, central.
- irradiat: yes, no.
#import SparkSession
from pyspark.sql import SparkSession
SparkSession is an entry point to Spark to work with RDD, DataFrame, and Dataset. To create SparkSession in Python, we need to use the builder() method and calling getOrCreate() method.
If SparkSession already exists it returns otherwise create a new SparkSession.
spark = SparkSession.builder.appName('regression').getOrCreate()
#read the dataset
df = spark.read.csv('input/breast-cancer.csv', inferSchema=True, header=True)
#view five records
df.show(5)
#print dataframe columns and count
print(df.columns)
print(df.count())
df.printSchema()
from pyspark.sql.functions import isnan, when, count, col
df.filter(df['age'].isNull()).show()
We can see there is no null value for age column.
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()
We can see, there is no null values.
df.na.drop().show(5)
As there were no missing values, the number of records remains the same.
print(df.count())
We are going to use StringIndexer and OneHotEncoder of PySpark ML feature to convert string columns to numeric.
If you want to know more about them, you can go through my previous articles:
Role of StringIndexer and Pipelines in PySpark ML Feature - Part 1
Role of OneHotEncoder and Pipelines in PySpark ML Feature — Part 2
#import libraries
from pyspark.ml.feature import StringIndexer, OneHotEncoder
Let us check 'class' column:
df.groupBy('class').count().show()
class is the target column, that has two distinct values, 'no-recurrence-events' and 'recurrence-events'. We are going to change them into numeric.
As there will be only two values 0 and 1 after converting to numeric, we will not use one-hot encoding.
class_indexer = StringIndexer(inputCol="class", outputCol="label")
#Fit and transform the dataframe
df = class_indexer.fit(df).transform(df)
df.show(5)
Now, you can see a new column named 'label'. You can print just 'class' and 'label' columsn to see the transformation.
df.select(['class', 'label']).show(5)
Create function to transform string column to numeric
We have lot of string columns, so we will create a function to convert string columns to numeric.
We will use a pattern for StringIndexer output columns: input column name + '-index'. For example:
'age' -> 'age-index'.
The output column shall be passed to OneHotEncoder. Similar to the above, we will use a pattern for OneHotEncoder output columns: input column name + '-vector'. Thus we will finally get, for example,
'age' -> 'age-index' -> 'age-vector'.
def transformColumnsToNumeric(df, inputCol):
#apply StringIndexer to inputCol
inputCol_indexer = StringIndexer(inputCol = inputCol, outputCol = inputCol + "-index").fit(df)
df = inputCol_indexer.transform(df)
onehotencoder_vector = OneHotEncoder(inputCol = inputCol + "-index", outputCol = inputCol + "-vector")
df = onehotencoder_vector.fit(df).transform(df)
return df
pass
df = transformColumnsToNumeric(df, "age")
df = transformColumnsToNumeric(df, "menopause")
df = transformColumnsToNumeric(df, "tumor-size")
df = transformColumnsToNumeric(df, "inv-nodes")
df = transformColumnsToNumeric(df, "node-caps")
df = transformColumnsToNumeric(df, "breast")
df = transformColumnsToNumeric(df, "breast-quad")
df = transformColumnsToNumeric(df, "irradiat")
df.show(5)
You can put all these in a Pipeline, but I wanted to keep it as simple as possible.
Feature transformer - VectorAssembler
If you are new to VectorAssembler, you may read my article
Feature Transformer VectorAssembler in PySpark ML Feature — Part 3
from pyspark.ml.feature import VectorAssembler
Let us list the columns
df.columns
We select only columns as inputCols that we need to feed to our Spark ML model. Let us define a VectorAssembler:
inputCols=[
'deg-malig',
'age-vector',
'menopause-vector',
'tumor-size-vector',
'inv-nodes-vector',
'node-caps-vector',
'breast-vector',
'breast-quad-vector',
'irradiat-vector']
df_va = VectorAssembler(inputCols = inputCols, outputCol="features")
Now, we can transform dataset with our VectorAssembler. It will outputCol 'features' as we stated in our VectorAssembler.
df = df_va.transform(df)
let us check the input and output columns
df.select(inputCols + ["features"] ).show(5)
As we need only 'features' and 'label' columns for our model, as the data from other columns have been merged into 'features' column and 'label' is our target, let us list and see a few records.
Use False flag to avoid truncation.
df.select(['features','label']).show(10,False)
So, finally we create a new dataset with just these two columns.
Let us view label wise dataframe
df_transformed = df.select(['features','label'])
df_transformed.show(5)
df_transformed.groupBy('label').count().show()
#split the data
train_df, test_df = df_transformed.randomSplit([0.75,0.25])
We have split data into 75, 25 ration.
train_df.count()
test_df.count()
train_df.groupBy('label').count().show()
test_df.groupBy('label').count().show()
We are going to use LogisticRegression model as we have a binary classification problem with only two possible values: 0 and 1.
from pyspark.ml.classification import LogisticRegression
model = LogisticRegression(labelCol='label')
model
Now, it is time to train our model:
trained_model = model.fit(train_df)
Let us get some predictions with our trained model. We will use training data first.
train_predictions = trained_model.evaluate(train_df).predictions
train_predictions.show(5)
Using multiple filters, we can count various metrics. Let us first count 0 and 1 in lables:
train_df_count_1 = train_df.filter(train_df['label'] == 1).count()
train_df_count_0 = train_df.filter(train_df['label'] == 0).count()
train_df_count_1, train_df_count_0
cp = train_predictions.filter(
train_predictions['label'] == 1).filter(
train_predictions['prediction'] == 1).select(
['label','prediction','probability'])
print("Correct predictions: ", cp.count())
accuracy = (cp.count()) / train_df_count_1
print(f"Accuracy: {accuracy}\n")
cp.show(5,False)
fp = train_predictions.filter(
train_predictions['label'] == 0).filter(
train_predictions['prediction'] == 1).select(
['label','prediction','probability'])
print("False positive: ", fp.count())
fp.show(5,False)
fn = train_predictions.filter(
train_predictions['label'] == 1).filter(
train_predictions['prediction'] == 0).select(
['label','prediction','probability'])
print("False negative: ", fn.count())
fn.show(5,False)
test_predictions = trained_model.evaluate(test_df).predictions
test_predictions.show(5, False)
Using multiple filters, we can count various metrics. Let us first count 0 and 1 in lables:
test_df_count_1 = test_df.filter(test_df['label'] == 1).count()
test_df_count_0 = test_df.filter(test_df['label'] == 0).count()
test_df_count_1, test_df_count_0
cp = test_predictions.filter(
test_predictions['label'] == 1).filter(
test_predictions['prediction'] == 1).select(
['label','prediction','probability'])
print("Correct predictions: ", cp.count())
accuracy = (cp.count()) / test_df_count_1
print(f"Accuracy: {accuracy}\n")
cp.show(5,False)
fp = test_predictions.filter(
test_predictions['label'] == 0).filter(
test_predictions['prediction'] == 1).select(
['label','prediction','probability'])
print("False positive: ", fp.count())
fp.show(5,False)
fn = test_predictions.filter(
test_predictions['label'] == 1).filter(
test_predictions['prediction'] == 0).select(
['label','prediction','probability'])
print("False negative: ", fn.count())
fn.show(5,False)