Module emse-mms.models.classification
Expand source code
import pandas as pd
from pandas import DataFrame
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.tag import pos_tag
from nltk.corpus import wordnet as wn
from sklearn.metrics import accuracy_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import LabelEncoder
from typing import Union
import numpy
from pandas.core.series import Series
class Classify:
"""
Predict the class for the given Module dataset based on TF-IDF vectorization and KNN (supervised) clustering algorithm.
Args:
path (str): Path to the training dataset.
testPath (str, None): Path to the test dataset.
outputPath (str): Path to the output directory.
toDownload (bool): Whether to download the nltk corpus or not.
verbose (bool): Whether to print the logs or not.
visualize (bool): Whether to show EDA visualizations or not.
"""
def __init__(
self,
path: str = "input/603_trans_3.tsv",
testPath: Union[str, None] = None,
outputPath: str = "output/",
toDownload: bool = False,
verbose: bool = False,
visualize: bool = False,
concat: bool = False,
trainFiles: list[str] = [],
) -> None:
import logging
self.logger = logging.getLogger(__name__)
self.filePath: str = path
self.testPath: str = testPath
self.outputPath: str = outputPath
self.logPath: str = "logs/classification.log"
self.toDownload: bool = toDownload
self.concat: bool = concat
self.data: Union[DataFrame, None] = None
self.trainFiles: list[str] = trainFiles
self.testData: Union[DataFrame, None] = None
self.verbose: bool = verbose
self.viz: bool = visualize
self.stop_words = set(stopwords.words("english"))
self.N_CLUSTER: int = 0
self.N_TEST_CLUSTER: int = 0
self.train_x: Union[Series, None] = None
self.train_y: Union[Series, None] = None
self.test_x: Union[Series, None] = None
self.test_y: Union[Series, None] = None
self.train_x_vector: Union[None, numpy.ndarray] = None
self.test_x_vector: Union[None, numpy.ndarray] = None
self.lemmatizer = WordNetLemmatizer()
self.vectorizer = TfidfVectorizer(max_features=10000)
self.encoder = LabelEncoder()
if self.verbose:
logging.basicConfig(level=logging.INFO)
else:
logging.basicConfig(level=logging.DEBUG)
self.download()
self._configure()
def _configure(self) -> None:
"""
Configure the logger and the stop words set
"""
self.stop_words.add("module")
self.stop_words.add("problem")
self.stop_words.add("use")
self.stop_words.add("model")
self.stop_words.add("solution")
self.stop_words.add("solve")
self.stop_words.add("analyze")
self.stop_words.add("example")
self.stop_words.add("application")
self.stop_words.add("computer")
self.stop_words.add("computers")
self.stop_words.add("one")
self.stop_words.add("two")
self.stop_words.add("three")
self.stop_words.add("four")
self.stop_words.add("five")
self.stop_words.add("six")
self.stop_words.add("seven")
self.stop_words.add("eight")
self.stop_words.add("x")
self.stop_words.add("c")
self.stop_words.add("go")
self.stop_words.add("constraint")
self.stop_words.add("get")
self.stop_words.add("ok")
self.stop_words.add("uh")
self.stop_words.add("shift")
def download(self):
"""
Download the required libraries for the classification.
"""
import nltk
if self.toDownload:
nltk.download("stopwords", quiet=not self.verbose)
nltk.download("wordnet", quiet=not self.verbose)
nltk.download("omw-1.4", quiet=not self.verbose)
self._log("Library downloads complete")
else:
self._log("Library downloads skipped")
def read(self, sep="\t") -> None:
"""
Read the data from the file path. The default separator is tab.
"""
data = None
if self.concat:
for file in self.trainFiles:
data = self._merge_frames(data, pd.read_csv(file, sep=sep))
else:
data = pd.read_csv(self.filePath, sep=sep)
self.data = data
if self.data is None:
raise Exception("Data is empty")
if self.testPath is not None:
self.testData = pd.read_csv(self.testPath, sep=sep)
if self.verbose:
row, col = self.data.shape
self._log(f"Shape of train data read \nRows: {row}, Columns: {col}")
if self.testPath is not None:
row, col = self.testData.shape
self._log(f"Shape of test data read \nRows: {row}, Columns: {col}")
self._log("Data read successfully")
else:
self._log("Data read successfully")
if self.testPath is not None:
self._log("Test data read successfully")
def _scalar_to_string(self, df: DataFrame, col: str) -> DataFrame:
"""
Convert the scalar values for each row of the DataFrame column to a string.
"""
lst = list(df[col])
scalar_to_string = []
for obj in lst:
print(obj)
scalar_to_string.append(str(obj))
df[col] = scalar_to_string
if self.verbose:
self._log("Scalar values converted to string successfully")
print(df.head())
else:
self._log("Scalar values converted to string successfully")
return df
def _merge_frames(self, df1: DataFrame, df2: DataFrame) -> DataFrame:
"""
Merge the two DataFrames.
"""
df = pd.concat([df1, df2], ignore_index=True)
if self.verbose:
self._log("DataFrames merged successfully")
print(df.head())
else:
self._log("DataFrames merged successfully")
return df
def _merge_columns(
self, df: DataFrame, destinationCol: str, originCol: str
) -> DataFrame:
"""
Merge the values of the two DataFrame columns.
"""
lst1 = list(df[destinationCol])
lst2 = list(df[originCol])
merged = []
for i in range(len(lst1)):
cleaned_transcript = self._clean_transcript(str(lst2[i]))
merged.append(lst1[i] + " " + cleaned_transcript)
df[destinationCol] = merged
if self.verbose:
self._log("Columns merged successfully")
print(df.head())
else:
self._log("Columns merged successfully")
return df
def _clean_transcript(self, input: str) -> DataFrame:
"""
Clean the transcript by removing special characters and numbers from text.
"""
import re
from string import digits
if input == "nan":
return ""
else:
cleaned = re.sub(
"([A-Z][a-z]+)",
r" \1",
re.sub(
"([A-Z]+)",
r" \1",
re.sub(
r"[^a-zA-Z0-9]+",
" ",
input.replace("\\'", ""),
),
),
)
remove_digits = str.maketrans("", "", digits)
return cleaned.translate(remove_digits)
def _clean_text(self, input: str) -> str:
"""
Clean the text by removing special characters, and digits from the input string.
"""
import re
if input == "nan":
return ""
return re.sub(
r"[^a-zA-Z]+",
" ",
input.replace("\\'", ""),
)
def _split_camel_case(self, df: DataFrame, col: str) -> DataFrame:
"""
Split the words in the DataFrame column which are in camel case.
"""
lst = list(df[col])
split_words = []
for obj in lst:
split_words.append(self._clean_text(obj).split())
payload = [" ".join(entry) for entry in split_words]
df[col] = payload
if self.verbose:
self._log("Camel case text split successfully")
print(df.head())
else:
self._log("Camel case text split successfully")
return df
def _stemmer(self, df: DataFrame, col: str) -> DataFrame:
"""
Stem the words in the DataFrame column.
"""
from collections import defaultdict
data = df.copy()
tag_map = defaultdict(lambda: wn.NOUN)
tag_map["J"] = wn.ADJ
tag_map["V"] = wn.VERB
tag_map["R"] = wn.ADV
data["tokens"] = [word_tokenize(entry) for entry in data[col]]
stemmer = self.lemmatizer
for index, entry in enumerate(data["tokens"]):
final_words = []
for word, tag in pos_tag(entry):
word_final = stemmer.lemmatize(word, tag_map[tag[0]])
if word_final not in self.stop_words and word_final.isalpha():
final_words.append(word_final)
data.loc[index, "target"] = " ".join(final_words)
return data
def _preprocess_features(self, col: str, data: DataFrame) -> DataFrame:
"""
1. turn entries to lower case
2. remove special characters
4. remove stop words
5. stem words
6. tokenize words
7. return feature column
"""
# create a copy of the data
df = data.copy()
# convert camel case text to separate words
df = self._split_camel_case(df, col)
# for each row of the feature column, turn text to lowercase
df[col] = [entry.lower() for entry in df[col]]
# tokenize entries, stem words and remove stop words
df = self._stemmer(df, col)
return df
def prepare(self, col: str) -> DataFrame:
"""
Prepare the data for the classification.
"""
import numpy as np
if self.testPath is None:
df = self._merge_columns(
self.data, destinationCol="features", originCol="transcript"
)
df = self._preprocess_features(col, self.data)
df = df.drop(
["features", "tokens", "hours", "prefix", "transcript", "number"],
axis=1,
)
if self.verbose:
self._log("Data prepared successfully")
print(df.head())
else:
self._log("Data prepared successfully")
self.data = df
self.N_CLUSTER = int(np.sqrt(len(df)))
if self.viz:
self.generate_count_plot(data=df)
self._save_data_frame(df, fileName="603_clean.csv")
else:
dfTrain = self._merge_columns(
self.data, destinationCol="features", originCol="transcript"
)
dfTrain = self._preprocess_features(col, self.data)
dfTrain = dfTrain.drop(
["features", "tokens", "hours", "prefix", "transcript", "number"],
axis=1,
)
dfTest = self._merge_columns(
self.testData, destinationCol="features", originCol="transcript"
)
dfTest = self._preprocess_features(col, self.testData)
dfTest = dfTest.drop(
["features", "tokens", "hours", "prefix", "transcript", "number"],
axis=1,
)
if self.verbose:
self._log("Train data prepared successfully")
print(dfTrain.head())
self._log("Test data prepared successfully")
print(dfTest.head())
else:
self._log("Data prepared successfully")
self.data = dfTrain
self.testData = dfTest
self.N_CLUSTER = int(np.sqrt(len(dfTrain)))
self.N_TEST_CLUSTER = int(np.sqrt(len(dfTest)))
if self.viz:
self.generate_count_plot(data=dfTrain)
self.generate_count_plot(data=dfTest)
self._save_data_frame(dfTrain, fileName="603_clean.csv")
self._save_data_frame(dfTest, fileName="614_test.csv")
def _create_tf_idf(self, train, test) -> tuple:
"""
Create the TF-IDF vectorizer.
"""
from sklearn.feature_extraction.text import TfidfVectorizer
self.vectorizer = TfidfVectorizer(
analyzer="word",
max_features=10000,
stop_words=list(self.stop_words),
)
tfidf_train = self.vectorizer.fit_transform(train).toarray()
tfidf_test = self.vectorizer.transform(test).toarray()
return (tfidf_train, tfidf_test)
def _data_transformer(self, size: float = 0.3):
"""
Splits, fits, and transforms the data for the classification.
"""
from sklearn.model_selection import train_test_split
if self.testPath is None:
df = self._data_encoder(df=self.data, col="cluster")
Train_X, Test_X, Train_Y, Test_Y = train_test_split(
df["target"],
df["cluster"],
test_size=size,
random_state=42,
shuffle=True,
stratify=None,
)
Train_X_Tfidf, Test_X_Tfidf = self._create_tf_idf(Train_X, Test_X)
self.data = df
self.train_x = Train_X
self.test_x = Test_X
self.train_y = Train_Y
self.test_y = Test_Y
self.train_x_vector = Train_X_Tfidf
self.test_x_vector = Test_X_Tfidf
else:
dfTrain = self._data_encoder(df=self.data, col="cluster")
dfTest = self._data_encoder(df=self.testData, col="cluster")
Train_X_Tfidf, Test_X_Tfidf = self._create_tf_idf(
dfTrain["target"], dfTest["target"]
)
self.data = dfTrain
self.testData = dfTest
self.train_x = dfTrain["target"]
self.test_x = dfTest["target"]
self.train_y = dfTrain["cluster"]
self.test_y = dfTest["cluster"]
self.train_x_vector = Train_X_Tfidf
self.test_x_vector = Test_X_Tfidf
if self.verbose:
self._log("Data transformed successfully")
if self.testPath is not None:
print(dfTrain.head())
print(dfTest.head())
else:
print(df.head())
else:
self._log("Data transformed successfully")
def _data_encoder(self, df: DataFrame, col: str = "cluster") -> DataFrame:
"""
Encode the data for the classification.
"""
df[col] = self.encoder.fit_transform(df[col])
return df
def _create_model(self):
"""
Create the classification model.
"""
self._data_transformer()
self._run_nearest_neighbors(
Train_X_Tfidf=self.train_x_vector,
Test_X_Tfidf=self.test_x_vector,
Train_Y=self.train_y,
Test_Y=self.test_y,
algo="brute",
metric="cosine",
weights="distance",
)
self._print_top_words_per_cluster(
vectorizer=self.vectorizer, df=self.data, X=self.train_x_vector
)
sim, mask = self._calculate_similarity(
X=self.train_x_vector,
)
self.generate_heat_map(
arr=sim,
mask=mask,
fileName="heatmap_train.png",
)
self._print_sorted_similarities(sim_arr=sim)
self._run_pca(X=self.train_x_vector, df=self.data, fileName="pca_train.png")
self._run_naive_bayes(
X_vector=self.train_x_vector,
X_test_vector=self.test_x_vector,
Y_test=self.test_y,
Y_train=self.train_y,
)
if self.testPath is not None:
self._print_top_words_per_cluster(
vectorizer=self.vectorizer,
df=self.testData,
X=self.test_x_vector,
train=False,
)
simTest, maskTest = self._calculate_similarity(
X=self.test_x_vector,
)
self.generate_heat_map(
arr=simTest,
mask=maskTest,
fileName="heatmap_test.png",
)
self._print_sorted_similarities(sim_arr=simTest)
self._run_pca(
X=self.test_x_vector, df=self.testData, fileName="pca_test.png"
)
self._log("Model created successfully")
def _print_top_words_per_cluster(
self, vectorizer, df: DataFrame, X: list, n=10, train: bool = True
):
"""
This function returns the keywords for each centroid of the KNN clustering algorithm.
Parameters
----------
vectorizer : TfidfVectorizer
The TF-IDF vectorizer.
df : DataFrame
The data frame.
X : list
The list of TF-IDF vectors.
n : int, optional
The number of keywords to return, by default 10
train : bool, optional
Whether the data is training or test data, by default True
"""
import numpy as np
data = pd.DataFrame(X).groupby(df["cluster"]).mean()
terms = vectorizer.get_feature_names_out()
print("\n")
if train:
self._log("Top keywords per cluster in training set:")
else:
self._log("Top keywords per cluster in test set:")
for i, r in data.iterrows():
self._log(
"Cluster {} keywords: {}".format(
i, ", ".join([terms[t] for t in np.argsort(r)[-n:]])
)
)
self._log("Mean TF-IDF score -> %0.4f" % np.max(r))
print("\n")
def _train_model(self):
"""
Train the classification model.
"""
pass
def _evaluate_model(self):
"""
Evaluate the classification model.
"""
pass
def _predict(self):
"""
Predict the classification model.
"""
pass
def _save_model(self):
"""
Save the classification model.
"""
pass
def _run_nearest_neighbors(
self,
Train_X_Tfidf: numpy.ndarray,
Test_X_Tfidf: numpy.ndarray,
Train_Y: Series,
Test_Y: Series,
algo: str,
metric: str,
weights: str,
n_neighbors: int = 5,
):
"""
Runs the nearest neighbors algorithm on the input data.
Args:
Train_X_Tfidf (numpy.ndarray): The training data to be used for fitting the model.
Test_X_Tfidf (numpy.ndarray): The test data to be used for predicting the model.
Train_Y (pandas.Series): The target values for the training data.
Test_Y (pandas.Series): The target values for the test data.
algo (str): The algorithm to be used for computing the nearest neighbors.
metric (str): The distance metric to be used for computing the nearest neighbors.
weights (str): The weight function to be used for computing the nearest neighbors.
n_neighbors (int): The number of neighbors to be used for computing the nearest neighbors.
Returns:
None
"""
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
from sklearn.model_selection import cross_val_score
knn = KNeighborsClassifier(
n_neighbors=n_neighbors,
weights=weights,
algorithm=algo,
metric=metric,
n_jobs=1,
metric_params=None,
leaf_size=30,
p=2,
)
knn.fit(Train_X_Tfidf, Train_Y)
predicted = knn.predict(Test_X_Tfidf)
self._log("KNN Predictions -> %s" % predicted)
self.testData["cluster"] = predicted
self._save_data_frame(
df=self.testData,
fileName="614_pred.csv",
)
acc = accuracy_score(Test_Y, predicted)
self._log("KNN Accuracy -> %0.4f" % (acc * 100))
print("\n")
best_cv = 2
best_score = 0
best_cv_index = 0
y = []
x = []
max_range: int = 10
if self.testPath is None:
max_range = 7
for i in range(2, max_range):
scores = cross_val_score(knn, Train_X_Tfidf, Train_Y, cv=i)
self._log(
"Cross Validation Accuracy: %0.2f (+/- %0.2f)"
% (scores.mean(), scores.std() * 2)
)
self._log("Number of predicted classes -> %s" % len(predicted))
print("\n")
if scores.mean() > best_score:
best_cv = i
best_score = scores.mean()
best_cv_index = scores
y.append(scores.mean())
x.append(i)
self._log(
"Best Cross Validation Accuracy: %0.2f (+/- %0.2f)"
% (best_score, best_cv_index.std() * 2)
)
self._log("Best Cross Validation Number of Folds: %s" % best_cv)
self.generate_cross_validation_plot(x, y)
def _run_pca(
self, X: numpy.ndarray, df: DataFrame, fileName: str = "pca_scatter.png"
):
"""
Applies Principal Component Analysis (PCA) to the input data X and generates a scatter plot of the reduced features.
Args:
X (numpy.ndarray): The input data to be reduced.
df (pandas.DataFrame): The dataframe containing the data to be plotted.
Returns:
None
"""
from sklearn.decomposition import PCA
pca = PCA(n_components=2, random_state=42)
red_feat = pca.fit_transform(X)
x = red_feat[:, 0]
y = red_feat[:, 1]
df["x"] = x
df["y"] = y
self.generate_scatter_plot(data=df, fileName=fileName)
self._log("PCA run successfully")
def _run_naive_bayes(self, X_vector, Y_train, X_test_vector, Y_test):
"""
Run the naive bayes classification model.
"""
from sklearn.naive_bayes import MultinomialNB
Naive = MultinomialNB()
Naive.fit(X_vector, Y_train)
predictions_NB = Naive.predict(X_test_vector)
if self.verbose:
self._log("Naive Bayes run successfully")
print(
"Naive Bayes Accuracy Score -> ",
accuracy_score(predictions_NB, Y_test) * 100,
)
print("\n")
else:
self._log("Naive Bayes run successfully")
def _run_svm(self, X_train, Y_train, X_test, Y_test):
"""
Run the svm classification model.
"""
from sklearn import svm
SVM = svm.SVC(C=1.0, kernel="poly", degree=3, gamma="auto")
SVM.fit(X_train, Y_train)
predictions_SVM = SVM.predict(X_test)
print("SVM Accuracy Score -> ", accuracy_score(predictions_SVM, Y_test) * 100)
if self.verbose:
self._log("SVM run successfully")
print(predictions_SVM)
else:
self._log("SVM run successfully")
def _run_word_cloud_per_cluster(self, df: DataFrame):
"""
Run the word cloud per cluster.
"""
for i in sorted(df["cluster"].array.unique()):
corpus = " ".join(list(df[df["cluster"] == i]["target"]))
self.generate_word_cloud(corpus=corpus, fileName="cluster_%s.png" % i)
def _save_data_frame(self, df: DataFrame, fileName: str):
"""
Save the data frame as a CSV file.
"""
df.to_csv(str(self.outputPath + fileName), index=False)
def generate_word_cloud(self, corpus: str, fileName: str = "word_cloud.png"):
"""
Generate the word cloud for the data.
"""
from wordcloud import WordCloud
from matplotlib import pyplot as plt
wordcloud = WordCloud(
max_words=700,
background_color="white",
stopwords=self.stop_words,
).generate(corpus)
plt.figure(figsize=(10, 10))
plt.axis("off")
if self.viz:
plt.imshow(wordcloud, interpolation="bilinear")
plt.show()
else:
# plt.savefig(str(self.outputPath + fileName))
wordcloud.to_file(str(self.outputPath + fileName))
def generate_scatter_plot(
self, data: DataFrame, fileName: str = "scatter_plot.png"
):
"""
Generate the scatter plot for the data.
"""
import seaborn as sns
from matplotlib import pyplot as plt
plt.figure(figsize=(10, 10))
sns.scatterplot(data=data, x="x", y="y", hue="cluster", palette="tab10")
if self.viz:
plt.show()
else:
plt.savefig(str(self.outputPath + fileName))
def generate_elbow_plot(self, X: numpy.ndarray):
"""
Generate the elbow plot for the data that shows the most optimal number of clusters that should be used based on sum of squared distances.
"""
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
Sum_of_squared_distances = []
K = range(2, self.N_CLUSTER * 2)
for k in K:
km = KMeans(n_clusters=k, max_iter=200, n_init=10)
km = km.fit(X)
Sum_of_squared_distances.append(km.inertia_)
plt.figure(figsize=(10, 10))
plt.plot(K, Sum_of_squared_distances, "bx-")
plt.xlabel("k")
plt.ylabel("Sum_of_squared_distances")
plt.title("Elbow Method For Optimal k")
if self.viz:
plt.show()
else:
plt.savefig(str(self.outputPath + "kmeans_elbow_plot.png"))
def generate_count_plot(self, data: DataFrame, countCol: str = "cluster"):
"""
Generate a bar plot that sums the number of rows that share the same prefix value.
Args:
data (DataFrame): The data to plot.
countCol (str, optional): The name of the column to count. Defaults to "cluster".
"""
import seaborn as sns
from matplotlib import pyplot as plt
sns.countplot(x=countCol, data=data)
if self.viz:
plt.show()
else:
plt.savefig(str(self.outputPath + "count_plot.png"))
def generate_cross_validation_plot(self, x: list, y: list):
"""
Generate a cross validation plot that shows the accuracy of the model.
Args:
x (list): The list of x values.
y (list): The list of y values.
"""
import matplotlib.pyplot as plt
plt.plot(x, y, "bx-")
plt.xlabel("fold")
plt.ylabel("Accuracy")
plt.title("Cross Validation Accuracy over 10 folds")
if self.viz:
plt.show()
else:
plt.savefig(str(self.outputPath + "cross_validation_plot.png"))
def _calculate_similarity(self, X) -> tuple[numpy.ndarray, list]:
"""
Calculate the similarity between the documents.
Args:
X (numpy.ndarray): The array of documents.
Returns:
tuple: The similarity array and the mask to apply to the array.
"""
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
sim_arr = cosine_similarity(X)
mask = np.triu(np.ones_like(sim_arr, dtype=bool))
return sim_arr, mask
def generate_heat_map(
self, arr: numpy.ndarray, mask: list, fileName: str = "heat_map.png"
):
"""
Generate a heat map that shows the correlation between the documents, using the name column of the data frame as the tick label.
Args:
arr (numpy.ndarray): The similarity array.
mask (list): The mask to apply to the array. This is used to remove the diagonal and upper duplicate values.
"""
import seaborn as sns
from matplotlib import pyplot as plt
plt.figure(figsize=(25, 15))
sns.heatmap(
arr,
mask=mask,
square=False,
robust=True,
annot=True,
cmap="YlGnBu",
fmt=".2f",
cbar=False,
)
if self.viz:
plt.show()
else:
plt.savefig(str(self.outputPath + fileName))
def _print_sorted_similarities(self, sim_arr, threshold=0) -> DataFrame:
"""
Store the similarities between the documents in a data frame that is sorted by the similarity score in descending order.
Removing the diagonal values.
Args:
sim_arr (numpy.ndarray): The similarity array.
threshold (int, optional): The threshold to filter the similarity scores by. Defaults to 0.
"""
import pandas as pd
df = pd.DataFrame(sim_arr)
df = df.stack().reset_index()
df.columns = ["Document 1", "Document 2", "Similarity Score"]
df = df.sort_values(by=["Similarity Score"], ascending=False)
filtered_df = df[df["Document 1"] != df["Document 2"]]
top = filtered_df[filtered_df["Similarity Score"] > threshold]
print(top.head(10))
return top
def _log(self, text: str):
"""
Append the text to the log file.
Args:
text (str): The text to append to the log file.
"""
import time
t = time.localtime()
current_time = time.strftime("%H:%M:%S", t)
if self.verbose:
self.logger.info(f"\n[{current_time}]: {text}")
else:
self.logger.debug(f"\n[{current_time}]: {text}")
with open(self.logPath, "a") as f:
f.write(f"\n[{current_time}]: {text}")
def run(self) -> None:
"""
Run the classification model.
"""
self.read()
self.prepare(col="features")
self._create_model()
if self.viz:
self._run_word_cloud_per_cluster(df=self.data)
if self.testPath is not None:
# TODO: Fix test data not having x and y columns
# self.generate_scatter_plot(data=self.testData)
pass
if self.verbose:
self._log("Successfully ran the classification model")
def main():
import argparse
parser = argparse.ArgumentParser(description="Classification of text")
parser.add_argument(
"--path",
"-p",
type=str,
help="Path to the training dataset",
)
parser.add_argument(
"--test",
"-t",
type=str,
help="Path to the testing dataset",
)
parser.add_argument(
"--download",
help="Download the required libraries",
required=True,
action=argparse.BooleanOptionalAction,
)
parser.add_argument(
"--verbose",
help="Print the logs",
required=True,
action=argparse.BooleanOptionalAction,
)
parser.add_argument(
"--sep", "-s", type=str, default="\t", help="Separator for the data"
)
parser.add_argument(
"--out",
"-o",
type=str,
default="output/",
help="File name of the output file",
)
parser.add_argument(
"--viz",
help="Decide whether to visualize the EDA process and display classification visualization.",
required=True,
action=argparse.BooleanOptionalAction,
)
parser.add_argument(
"--concat",
help="Concatenate the training data with the test data",
required=True,
action=argparse.BooleanOptionalAction,
)
parser.add_argument(
"--trainFiles",
"-tf",
nargs="+",
help="List of training files to concatenate",
)
args = parser.parse_args()
print(args)
classify = Classify(
path=args.path,
toDownload=args.download,
verbose=args.verbose,
outputPath=args.out,
visualize=args.viz,
testPath=args.test,
concat=args.concat,
trainFiles=args.trainFiles,
)
classify.run()
if __name__ == "__main__":
main()
Functions
def main()
-
Expand source code
def main(): import argparse parser = argparse.ArgumentParser(description="Classification of text") parser.add_argument( "--path", "-p", type=str, help="Path to the training dataset", ) parser.add_argument( "--test", "-t", type=str, help="Path to the testing dataset", ) parser.add_argument( "--download", help="Download the required libraries", required=True, action=argparse.BooleanOptionalAction, ) parser.add_argument( "--verbose", help="Print the logs", required=True, action=argparse.BooleanOptionalAction, ) parser.add_argument( "--sep", "-s", type=str, default="\t", help="Separator for the data" ) parser.add_argument( "--out", "-o", type=str, default="output/", help="File name of the output file", ) parser.add_argument( "--viz", help="Decide whether to visualize the EDA process and display classification visualization.", required=True, action=argparse.BooleanOptionalAction, ) parser.add_argument( "--concat", help="Concatenate the training data with the test data", required=True, action=argparse.BooleanOptionalAction, ) parser.add_argument( "--trainFiles", "-tf", nargs="+", help="List of training files to concatenate", ) args = parser.parse_args() print(args) classify = Classify( path=args.path, toDownload=args.download, verbose=args.verbose, outputPath=args.out, visualize=args.viz, testPath=args.test, concat=args.concat, trainFiles=args.trainFiles, ) classify.run()
Classes
class Classify (path: str = 'input/603_trans_3.tsv', testPath: Optional[str] = None, outputPath: str = 'output/', toDownload: bool = False, verbose: bool = False, visualize: bool = False, concat: bool = False, trainFiles: list[str] = [])
-
Predict the class for the given Module dataset based on TF-IDF vectorization and KNN (supervised) clustering algorithm.
Args
path
:str
- Path to the training dataset.
testPath
:str, None
- Path to the test dataset.
outputPath
:str
- Path to the output directory.
toDownload
:bool
- Whether to download the nltk corpus or not.
verbose
:bool
- Whether to print the logs or not.
visualize
:bool
- Whether to show EDA visualizations or not.
Expand source code
class Classify: """ Predict the class for the given Module dataset based on TF-IDF vectorization and KNN (supervised) clustering algorithm. Args: path (str): Path to the training dataset. testPath (str, None): Path to the test dataset. outputPath (str): Path to the output directory. toDownload (bool): Whether to download the nltk corpus or not. verbose (bool): Whether to print the logs or not. visualize (bool): Whether to show EDA visualizations or not. """ def __init__( self, path: str = "input/603_trans_3.tsv", testPath: Union[str, None] = None, outputPath: str = "output/", toDownload: bool = False, verbose: bool = False, visualize: bool = False, concat: bool = False, trainFiles: list[str] = [], ) -> None: import logging self.logger = logging.getLogger(__name__) self.filePath: str = path self.testPath: str = testPath self.outputPath: str = outputPath self.logPath: str = "logs/classification.log" self.toDownload: bool = toDownload self.concat: bool = concat self.data: Union[DataFrame, None] = None self.trainFiles: list[str] = trainFiles self.testData: Union[DataFrame, None] = None self.verbose: bool = verbose self.viz: bool = visualize self.stop_words = set(stopwords.words("english")) self.N_CLUSTER: int = 0 self.N_TEST_CLUSTER: int = 0 self.train_x: Union[Series, None] = None self.train_y: Union[Series, None] = None self.test_x: Union[Series, None] = None self.test_y: Union[Series, None] = None self.train_x_vector: Union[None, numpy.ndarray] = None self.test_x_vector: Union[None, numpy.ndarray] = None self.lemmatizer = WordNetLemmatizer() self.vectorizer = TfidfVectorizer(max_features=10000) self.encoder = LabelEncoder() if self.verbose: logging.basicConfig(level=logging.INFO) else: logging.basicConfig(level=logging.DEBUG) self.download() self._configure() def _configure(self) -> None: """ Configure the logger and the stop words set """ self.stop_words.add("module") self.stop_words.add("problem") self.stop_words.add("use") self.stop_words.add("model") self.stop_words.add("solution") self.stop_words.add("solve") self.stop_words.add("analyze") self.stop_words.add("example") self.stop_words.add("application") self.stop_words.add("computer") self.stop_words.add("computers") self.stop_words.add("one") self.stop_words.add("two") self.stop_words.add("three") self.stop_words.add("four") self.stop_words.add("five") self.stop_words.add("six") self.stop_words.add("seven") self.stop_words.add("eight") self.stop_words.add("x") self.stop_words.add("c") self.stop_words.add("go") self.stop_words.add("constraint") self.stop_words.add("get") self.stop_words.add("ok") self.stop_words.add("uh") self.stop_words.add("shift") def download(self): """ Download the required libraries for the classification. """ import nltk if self.toDownload: nltk.download("stopwords", quiet=not self.verbose) nltk.download("wordnet", quiet=not self.verbose) nltk.download("omw-1.4", quiet=not self.verbose) self._log("Library downloads complete") else: self._log("Library downloads skipped") def read(self, sep="\t") -> None: """ Read the data from the file path. The default separator is tab. """ data = None if self.concat: for file in self.trainFiles: data = self._merge_frames(data, pd.read_csv(file, sep=sep)) else: data = pd.read_csv(self.filePath, sep=sep) self.data = data if self.data is None: raise Exception("Data is empty") if self.testPath is not None: self.testData = pd.read_csv(self.testPath, sep=sep) if self.verbose: row, col = self.data.shape self._log(f"Shape of train data read \nRows: {row}, Columns: {col}") if self.testPath is not None: row, col = self.testData.shape self._log(f"Shape of test data read \nRows: {row}, Columns: {col}") self._log("Data read successfully") else: self._log("Data read successfully") if self.testPath is not None: self._log("Test data read successfully") def _scalar_to_string(self, df: DataFrame, col: str) -> DataFrame: """ Convert the scalar values for each row of the DataFrame column to a string. """ lst = list(df[col]) scalar_to_string = [] for obj in lst: print(obj) scalar_to_string.append(str(obj)) df[col] = scalar_to_string if self.verbose: self._log("Scalar values converted to string successfully") print(df.head()) else: self._log("Scalar values converted to string successfully") return df def _merge_frames(self, df1: DataFrame, df2: DataFrame) -> DataFrame: """ Merge the two DataFrames. """ df = pd.concat([df1, df2], ignore_index=True) if self.verbose: self._log("DataFrames merged successfully") print(df.head()) else: self._log("DataFrames merged successfully") return df def _merge_columns( self, df: DataFrame, destinationCol: str, originCol: str ) -> DataFrame: """ Merge the values of the two DataFrame columns. """ lst1 = list(df[destinationCol]) lst2 = list(df[originCol]) merged = [] for i in range(len(lst1)): cleaned_transcript = self._clean_transcript(str(lst2[i])) merged.append(lst1[i] + " " + cleaned_transcript) df[destinationCol] = merged if self.verbose: self._log("Columns merged successfully") print(df.head()) else: self._log("Columns merged successfully") return df def _clean_transcript(self, input: str) -> DataFrame: """ Clean the transcript by removing special characters and numbers from text. """ import re from string import digits if input == "nan": return "" else: cleaned = re.sub( "([A-Z][a-z]+)", r" \1", re.sub( "([A-Z]+)", r" \1", re.sub( r"[^a-zA-Z0-9]+", " ", input.replace("\\'", ""), ), ), ) remove_digits = str.maketrans("", "", digits) return cleaned.translate(remove_digits) def _clean_text(self, input: str) -> str: """ Clean the text by removing special characters, and digits from the input string. """ import re if input == "nan": return "" return re.sub( r"[^a-zA-Z]+", " ", input.replace("\\'", ""), ) def _split_camel_case(self, df: DataFrame, col: str) -> DataFrame: """ Split the words in the DataFrame column which are in camel case. """ lst = list(df[col]) split_words = [] for obj in lst: split_words.append(self._clean_text(obj).split()) payload = [" ".join(entry) for entry in split_words] df[col] = payload if self.verbose: self._log("Camel case text split successfully") print(df.head()) else: self._log("Camel case text split successfully") return df def _stemmer(self, df: DataFrame, col: str) -> DataFrame: """ Stem the words in the DataFrame column. """ from collections import defaultdict data = df.copy() tag_map = defaultdict(lambda: wn.NOUN) tag_map["J"] = wn.ADJ tag_map["V"] = wn.VERB tag_map["R"] = wn.ADV data["tokens"] = [word_tokenize(entry) for entry in data[col]] stemmer = self.lemmatizer for index, entry in enumerate(data["tokens"]): final_words = [] for word, tag in pos_tag(entry): word_final = stemmer.lemmatize(word, tag_map[tag[0]]) if word_final not in self.stop_words and word_final.isalpha(): final_words.append(word_final) data.loc[index, "target"] = " ".join(final_words) return data def _preprocess_features(self, col: str, data: DataFrame) -> DataFrame: """ 1. turn entries to lower case 2. remove special characters 4. remove stop words 5. stem words 6. tokenize words 7. return feature column """ # create a copy of the data df = data.copy() # convert camel case text to separate words df = self._split_camel_case(df, col) # for each row of the feature column, turn text to lowercase df[col] = [entry.lower() for entry in df[col]] # tokenize entries, stem words and remove stop words df = self._stemmer(df, col) return df def prepare(self, col: str) -> DataFrame: """ Prepare the data for the classification. """ import numpy as np if self.testPath is None: df = self._merge_columns( self.data, destinationCol="features", originCol="transcript" ) df = self._preprocess_features(col, self.data) df = df.drop( ["features", "tokens", "hours", "prefix", "transcript", "number"], axis=1, ) if self.verbose: self._log("Data prepared successfully") print(df.head()) else: self._log("Data prepared successfully") self.data = df self.N_CLUSTER = int(np.sqrt(len(df))) if self.viz: self.generate_count_plot(data=df) self._save_data_frame(df, fileName="603_clean.csv") else: dfTrain = self._merge_columns( self.data, destinationCol="features", originCol="transcript" ) dfTrain = self._preprocess_features(col, self.data) dfTrain = dfTrain.drop( ["features", "tokens", "hours", "prefix", "transcript", "number"], axis=1, ) dfTest = self._merge_columns( self.testData, destinationCol="features", originCol="transcript" ) dfTest = self._preprocess_features(col, self.testData) dfTest = dfTest.drop( ["features", "tokens", "hours", "prefix", "transcript", "number"], axis=1, ) if self.verbose: self._log("Train data prepared successfully") print(dfTrain.head()) self._log("Test data prepared successfully") print(dfTest.head()) else: self._log("Data prepared successfully") self.data = dfTrain self.testData = dfTest self.N_CLUSTER = int(np.sqrt(len(dfTrain))) self.N_TEST_CLUSTER = int(np.sqrt(len(dfTest))) if self.viz: self.generate_count_plot(data=dfTrain) self.generate_count_plot(data=dfTest) self._save_data_frame(dfTrain, fileName="603_clean.csv") self._save_data_frame(dfTest, fileName="614_test.csv") def _create_tf_idf(self, train, test) -> tuple: """ Create the TF-IDF vectorizer. """ from sklearn.feature_extraction.text import TfidfVectorizer self.vectorizer = TfidfVectorizer( analyzer="word", max_features=10000, stop_words=list(self.stop_words), ) tfidf_train = self.vectorizer.fit_transform(train).toarray() tfidf_test = self.vectorizer.transform(test).toarray() return (tfidf_train, tfidf_test) def _data_transformer(self, size: float = 0.3): """ Splits, fits, and transforms the data for the classification. """ from sklearn.model_selection import train_test_split if self.testPath is None: df = self._data_encoder(df=self.data, col="cluster") Train_X, Test_X, Train_Y, Test_Y = train_test_split( df["target"], df["cluster"], test_size=size, random_state=42, shuffle=True, stratify=None, ) Train_X_Tfidf, Test_X_Tfidf = self._create_tf_idf(Train_X, Test_X) self.data = df self.train_x = Train_X self.test_x = Test_X self.train_y = Train_Y self.test_y = Test_Y self.train_x_vector = Train_X_Tfidf self.test_x_vector = Test_X_Tfidf else: dfTrain = self._data_encoder(df=self.data, col="cluster") dfTest = self._data_encoder(df=self.testData, col="cluster") Train_X_Tfidf, Test_X_Tfidf = self._create_tf_idf( dfTrain["target"], dfTest["target"] ) self.data = dfTrain self.testData = dfTest self.train_x = dfTrain["target"] self.test_x = dfTest["target"] self.train_y = dfTrain["cluster"] self.test_y = dfTest["cluster"] self.train_x_vector = Train_X_Tfidf self.test_x_vector = Test_X_Tfidf if self.verbose: self._log("Data transformed successfully") if self.testPath is not None: print(dfTrain.head()) print(dfTest.head()) else: print(df.head()) else: self._log("Data transformed successfully") def _data_encoder(self, df: DataFrame, col: str = "cluster") -> DataFrame: """ Encode the data for the classification. """ df[col] = self.encoder.fit_transform(df[col]) return df def _create_model(self): """ Create the classification model. """ self._data_transformer() self._run_nearest_neighbors( Train_X_Tfidf=self.train_x_vector, Test_X_Tfidf=self.test_x_vector, Train_Y=self.train_y, Test_Y=self.test_y, algo="brute", metric="cosine", weights="distance", ) self._print_top_words_per_cluster( vectorizer=self.vectorizer, df=self.data, X=self.train_x_vector ) sim, mask = self._calculate_similarity( X=self.train_x_vector, ) self.generate_heat_map( arr=sim, mask=mask, fileName="heatmap_train.png", ) self._print_sorted_similarities(sim_arr=sim) self._run_pca(X=self.train_x_vector, df=self.data, fileName="pca_train.png") self._run_naive_bayes( X_vector=self.train_x_vector, X_test_vector=self.test_x_vector, Y_test=self.test_y, Y_train=self.train_y, ) if self.testPath is not None: self._print_top_words_per_cluster( vectorizer=self.vectorizer, df=self.testData, X=self.test_x_vector, train=False, ) simTest, maskTest = self._calculate_similarity( X=self.test_x_vector, ) self.generate_heat_map( arr=simTest, mask=maskTest, fileName="heatmap_test.png", ) self._print_sorted_similarities(sim_arr=simTest) self._run_pca( X=self.test_x_vector, df=self.testData, fileName="pca_test.png" ) self._log("Model created successfully") def _print_top_words_per_cluster( self, vectorizer, df: DataFrame, X: list, n=10, train: bool = True ): """ This function returns the keywords for each centroid of the KNN clustering algorithm. Parameters ---------- vectorizer : TfidfVectorizer The TF-IDF vectorizer. df : DataFrame The data frame. X : list The list of TF-IDF vectors. n : int, optional The number of keywords to return, by default 10 train : bool, optional Whether the data is training or test data, by default True """ import numpy as np data = pd.DataFrame(X).groupby(df["cluster"]).mean() terms = vectorizer.get_feature_names_out() print("\n") if train: self._log("Top keywords per cluster in training set:") else: self._log("Top keywords per cluster in test set:") for i, r in data.iterrows(): self._log( "Cluster {} keywords: {}".format( i, ", ".join([terms[t] for t in np.argsort(r)[-n:]]) ) ) self._log("Mean TF-IDF score -> %0.4f" % np.max(r)) print("\n") def _train_model(self): """ Train the classification model. """ pass def _evaluate_model(self): """ Evaluate the classification model. """ pass def _predict(self): """ Predict the classification model. """ pass def _save_model(self): """ Save the classification model. """ pass def _run_nearest_neighbors( self, Train_X_Tfidf: numpy.ndarray, Test_X_Tfidf: numpy.ndarray, Train_Y: Series, Test_Y: Series, algo: str, metric: str, weights: str, n_neighbors: int = 5, ): """ Runs the nearest neighbors algorithm on the input data. Args: Train_X_Tfidf (numpy.ndarray): The training data to be used for fitting the model. Test_X_Tfidf (numpy.ndarray): The test data to be used for predicting the model. Train_Y (pandas.Series): The target values for the training data. Test_Y (pandas.Series): The target values for the test data. algo (str): The algorithm to be used for computing the nearest neighbors. metric (str): The distance metric to be used for computing the nearest neighbors. weights (str): The weight function to be used for computing the nearest neighbors. n_neighbors (int): The number of neighbors to be used for computing the nearest neighbors. Returns: None """ from sklearn.neighbors import KNeighborsClassifier from sklearn.metrics import accuracy_score from sklearn.model_selection import cross_val_score knn = KNeighborsClassifier( n_neighbors=n_neighbors, weights=weights, algorithm=algo, metric=metric, n_jobs=1, metric_params=None, leaf_size=30, p=2, ) knn.fit(Train_X_Tfidf, Train_Y) predicted = knn.predict(Test_X_Tfidf) self._log("KNN Predictions -> %s" % predicted) self.testData["cluster"] = predicted self._save_data_frame( df=self.testData, fileName="614_pred.csv", ) acc = accuracy_score(Test_Y, predicted) self._log("KNN Accuracy -> %0.4f" % (acc * 100)) print("\n") best_cv = 2 best_score = 0 best_cv_index = 0 y = [] x = [] max_range: int = 10 if self.testPath is None: max_range = 7 for i in range(2, max_range): scores = cross_val_score(knn, Train_X_Tfidf, Train_Y, cv=i) self._log( "Cross Validation Accuracy: %0.2f (+/- %0.2f)" % (scores.mean(), scores.std() * 2) ) self._log("Number of predicted classes -> %s" % len(predicted)) print("\n") if scores.mean() > best_score: best_cv = i best_score = scores.mean() best_cv_index = scores y.append(scores.mean()) x.append(i) self._log( "Best Cross Validation Accuracy: %0.2f (+/- %0.2f)" % (best_score, best_cv_index.std() * 2) ) self._log("Best Cross Validation Number of Folds: %s" % best_cv) self.generate_cross_validation_plot(x, y) def _run_pca( self, X: numpy.ndarray, df: DataFrame, fileName: str = "pca_scatter.png" ): """ Applies Principal Component Analysis (PCA) to the input data X and generates a scatter plot of the reduced features. Args: X (numpy.ndarray): The input data to be reduced. df (pandas.DataFrame): The dataframe containing the data to be plotted. Returns: None """ from sklearn.decomposition import PCA pca = PCA(n_components=2, random_state=42) red_feat = pca.fit_transform(X) x = red_feat[:, 0] y = red_feat[:, 1] df["x"] = x df["y"] = y self.generate_scatter_plot(data=df, fileName=fileName) self._log("PCA run successfully") def _run_naive_bayes(self, X_vector, Y_train, X_test_vector, Y_test): """ Run the naive bayes classification model. """ from sklearn.naive_bayes import MultinomialNB Naive = MultinomialNB() Naive.fit(X_vector, Y_train) predictions_NB = Naive.predict(X_test_vector) if self.verbose: self._log("Naive Bayes run successfully") print( "Naive Bayes Accuracy Score -> ", accuracy_score(predictions_NB, Y_test) * 100, ) print("\n") else: self._log("Naive Bayes run successfully") def _run_svm(self, X_train, Y_train, X_test, Y_test): """ Run the svm classification model. """ from sklearn import svm SVM = svm.SVC(C=1.0, kernel="poly", degree=3, gamma="auto") SVM.fit(X_train, Y_train) predictions_SVM = SVM.predict(X_test) print("SVM Accuracy Score -> ", accuracy_score(predictions_SVM, Y_test) * 100) if self.verbose: self._log("SVM run successfully") print(predictions_SVM) else: self._log("SVM run successfully") def _run_word_cloud_per_cluster(self, df: DataFrame): """ Run the word cloud per cluster. """ for i in sorted(df["cluster"].array.unique()): corpus = " ".join(list(df[df["cluster"] == i]["target"])) self.generate_word_cloud(corpus=corpus, fileName="cluster_%s.png" % i) def _save_data_frame(self, df: DataFrame, fileName: str): """ Save the data frame as a CSV file. """ df.to_csv(str(self.outputPath + fileName), index=False) def generate_word_cloud(self, corpus: str, fileName: str = "word_cloud.png"): """ Generate the word cloud for the data. """ from wordcloud import WordCloud from matplotlib import pyplot as plt wordcloud = WordCloud( max_words=700, background_color="white", stopwords=self.stop_words, ).generate(corpus) plt.figure(figsize=(10, 10)) plt.axis("off") if self.viz: plt.imshow(wordcloud, interpolation="bilinear") plt.show() else: # plt.savefig(str(self.outputPath + fileName)) wordcloud.to_file(str(self.outputPath + fileName)) def generate_scatter_plot( self, data: DataFrame, fileName: str = "scatter_plot.png" ): """ Generate the scatter plot for the data. """ import seaborn as sns from matplotlib import pyplot as plt plt.figure(figsize=(10, 10)) sns.scatterplot(data=data, x="x", y="y", hue="cluster", palette="tab10") if self.viz: plt.show() else: plt.savefig(str(self.outputPath + fileName)) def generate_elbow_plot(self, X: numpy.ndarray): """ Generate the elbow plot for the data that shows the most optimal number of clusters that should be used based on sum of squared distances. """ import matplotlib.pyplot as plt from sklearn.cluster import KMeans Sum_of_squared_distances = [] K = range(2, self.N_CLUSTER * 2) for k in K: km = KMeans(n_clusters=k, max_iter=200, n_init=10) km = km.fit(X) Sum_of_squared_distances.append(km.inertia_) plt.figure(figsize=(10, 10)) plt.plot(K, Sum_of_squared_distances, "bx-") plt.xlabel("k") plt.ylabel("Sum_of_squared_distances") plt.title("Elbow Method For Optimal k") if self.viz: plt.show() else: plt.savefig(str(self.outputPath + "kmeans_elbow_plot.png")) def generate_count_plot(self, data: DataFrame, countCol: str = "cluster"): """ Generate a bar plot that sums the number of rows that share the same prefix value. Args: data (DataFrame): The data to plot. countCol (str, optional): The name of the column to count. Defaults to "cluster". """ import seaborn as sns from matplotlib import pyplot as plt sns.countplot(x=countCol, data=data) if self.viz: plt.show() else: plt.savefig(str(self.outputPath + "count_plot.png")) def generate_cross_validation_plot(self, x: list, y: list): """ Generate a cross validation plot that shows the accuracy of the model. Args: x (list): The list of x values. y (list): The list of y values. """ import matplotlib.pyplot as plt plt.plot(x, y, "bx-") plt.xlabel("fold") plt.ylabel("Accuracy") plt.title("Cross Validation Accuracy over 10 folds") if self.viz: plt.show() else: plt.savefig(str(self.outputPath + "cross_validation_plot.png")) def _calculate_similarity(self, X) -> tuple[numpy.ndarray, list]: """ Calculate the similarity between the documents. Args: X (numpy.ndarray): The array of documents. Returns: tuple: The similarity array and the mask to apply to the array. """ import numpy as np from sklearn.metrics.pairwise import cosine_similarity sim_arr = cosine_similarity(X) mask = np.triu(np.ones_like(sim_arr, dtype=bool)) return sim_arr, mask def generate_heat_map( self, arr: numpy.ndarray, mask: list, fileName: str = "heat_map.png" ): """ Generate a heat map that shows the correlation between the documents, using the name column of the data frame as the tick label. Args: arr (numpy.ndarray): The similarity array. mask (list): The mask to apply to the array. This is used to remove the diagonal and upper duplicate values. """ import seaborn as sns from matplotlib import pyplot as plt plt.figure(figsize=(25, 15)) sns.heatmap( arr, mask=mask, square=False, robust=True, annot=True, cmap="YlGnBu", fmt=".2f", cbar=False, ) if self.viz: plt.show() else: plt.savefig(str(self.outputPath + fileName)) def _print_sorted_similarities(self, sim_arr, threshold=0) -> DataFrame: """ Store the similarities between the documents in a data frame that is sorted by the similarity score in descending order. Removing the diagonal values. Args: sim_arr (numpy.ndarray): The similarity array. threshold (int, optional): The threshold to filter the similarity scores by. Defaults to 0. """ import pandas as pd df = pd.DataFrame(sim_arr) df = df.stack().reset_index() df.columns = ["Document 1", "Document 2", "Similarity Score"] df = df.sort_values(by=["Similarity Score"], ascending=False) filtered_df = df[df["Document 1"] != df["Document 2"]] top = filtered_df[filtered_df["Similarity Score"] > threshold] print(top.head(10)) return top def _log(self, text: str): """ Append the text to the log file. Args: text (str): The text to append to the log file. """ import time t = time.localtime() current_time = time.strftime("%H:%M:%S", t) if self.verbose: self.logger.info(f"\n[{current_time}]: {text}") else: self.logger.debug(f"\n[{current_time}]: {text}") with open(self.logPath, "a") as f: f.write(f"\n[{current_time}]: {text}") def run(self) -> None: """ Run the classification model. """ self.read() self.prepare(col="features") self._create_model() if self.viz: self._run_word_cloud_per_cluster(df=self.data) if self.testPath is not None: # TODO: Fix test data not having x and y columns # self.generate_scatter_plot(data=self.testData) pass if self.verbose: self._log("Successfully ran the classification model")
Methods
def download(self)
-
Download the required libraries for the classification.
Expand source code
def download(self): """ Download the required libraries for the classification. """ import nltk if self.toDownload: nltk.download("stopwords", quiet=not self.verbose) nltk.download("wordnet", quiet=not self.verbose) nltk.download("omw-1.4", quiet=not self.verbose) self._log("Library downloads complete") else: self._log("Library downloads skipped")
def generate_count_plot(self, data: pandas.core.frame.DataFrame, countCol: str = 'cluster')
-
Generate a bar plot that sums the number of rows that share the same prefix value.
Args
data
:DataFrame
- The data to plot.
countCol
:str
, optional- The name of the column to count. Defaults to "cluster".
Expand source code
def generate_count_plot(self, data: DataFrame, countCol: str = "cluster"): """ Generate a bar plot that sums the number of rows that share the same prefix value. Args: data (DataFrame): The data to plot. countCol (str, optional): The name of the column to count. Defaults to "cluster". """ import seaborn as sns from matplotlib import pyplot as plt sns.countplot(x=countCol, data=data) if self.viz: plt.show() else: plt.savefig(str(self.outputPath + "count_plot.png"))
def generate_cross_validation_plot(self, x: list, y: list)
-
Generate a cross validation plot that shows the accuracy of the model.
Args
x
:list
- The list of x values.
y
:list
- The list of y values.
Expand source code
def generate_cross_validation_plot(self, x: list, y: list): """ Generate a cross validation plot that shows the accuracy of the model. Args: x (list): The list of x values. y (list): The list of y values. """ import matplotlib.pyplot as plt plt.plot(x, y, "bx-") plt.xlabel("fold") plt.ylabel("Accuracy") plt.title("Cross Validation Accuracy over 10 folds") if self.viz: plt.show() else: plt.savefig(str(self.outputPath + "cross_validation_plot.png"))
def generate_elbow_plot(self, X: numpy.ndarray)
-
Generate the elbow plot for the data that shows the most optimal number of clusters that should be used based on sum of squared distances.
Expand source code
def generate_elbow_plot(self, X: numpy.ndarray): """ Generate the elbow plot for the data that shows the most optimal number of clusters that should be used based on sum of squared distances. """ import matplotlib.pyplot as plt from sklearn.cluster import KMeans Sum_of_squared_distances = [] K = range(2, self.N_CLUSTER * 2) for k in K: km = KMeans(n_clusters=k, max_iter=200, n_init=10) km = km.fit(X) Sum_of_squared_distances.append(km.inertia_) plt.figure(figsize=(10, 10)) plt.plot(K, Sum_of_squared_distances, "bx-") plt.xlabel("k") plt.ylabel("Sum_of_squared_distances") plt.title("Elbow Method For Optimal k") if self.viz: plt.show() else: plt.savefig(str(self.outputPath + "kmeans_elbow_plot.png"))
def generate_heat_map(self, arr: numpy.ndarray, mask: list, fileName: str = 'heat_map.png')
-
Generate a heat map that shows the correlation between the documents, using the name column of the data frame as the tick label.
Args
arr
:numpy.ndarray
- The similarity array.
mask
:list
- The mask to apply to the array. This is used to remove the diagonal and upper duplicate values.
Expand source code
def generate_heat_map( self, arr: numpy.ndarray, mask: list, fileName: str = "heat_map.png" ): """ Generate a heat map that shows the correlation between the documents, using the name column of the data frame as the tick label. Args: arr (numpy.ndarray): The similarity array. mask (list): The mask to apply to the array. This is used to remove the diagonal and upper duplicate values. """ import seaborn as sns from matplotlib import pyplot as plt plt.figure(figsize=(25, 15)) sns.heatmap( arr, mask=mask, square=False, robust=True, annot=True, cmap="YlGnBu", fmt=".2f", cbar=False, ) if self.viz: plt.show() else: plt.savefig(str(self.outputPath + fileName))
def generate_scatter_plot(self, data: pandas.core.frame.DataFrame, fileName: str = 'scatter_plot.png')
-
Generate the scatter plot for the data.
Expand source code
def generate_scatter_plot( self, data: DataFrame, fileName: str = "scatter_plot.png" ): """ Generate the scatter plot for the data. """ import seaborn as sns from matplotlib import pyplot as plt plt.figure(figsize=(10, 10)) sns.scatterplot(data=data, x="x", y="y", hue="cluster", palette="tab10") if self.viz: plt.show() else: plt.savefig(str(self.outputPath + fileName))
def generate_word_cloud(self, corpus: str, fileName: str = 'word_cloud.png')
-
Generate the word cloud for the data.
Expand source code
def generate_word_cloud(self, corpus: str, fileName: str = "word_cloud.png"): """ Generate the word cloud for the data. """ from wordcloud import WordCloud from matplotlib import pyplot as plt wordcloud = WordCloud( max_words=700, background_color="white", stopwords=self.stop_words, ).generate(corpus) plt.figure(figsize=(10, 10)) plt.axis("off") if self.viz: plt.imshow(wordcloud, interpolation="bilinear") plt.show() else: # plt.savefig(str(self.outputPath + fileName)) wordcloud.to_file(str(self.outputPath + fileName))
def prepare(self, col: str) ‑> pandas.core.frame.DataFrame
-
Prepare the data for the classification.
Expand source code
def prepare(self, col: str) -> DataFrame: """ Prepare the data for the classification. """ import numpy as np if self.testPath is None: df = self._merge_columns( self.data, destinationCol="features", originCol="transcript" ) df = self._preprocess_features(col, self.data) df = df.drop( ["features", "tokens", "hours", "prefix", "transcript", "number"], axis=1, ) if self.verbose: self._log("Data prepared successfully") print(df.head()) else: self._log("Data prepared successfully") self.data = df self.N_CLUSTER = int(np.sqrt(len(df))) if self.viz: self.generate_count_plot(data=df) self._save_data_frame(df, fileName="603_clean.csv") else: dfTrain = self._merge_columns( self.data, destinationCol="features", originCol="transcript" ) dfTrain = self._preprocess_features(col, self.data) dfTrain = dfTrain.drop( ["features", "tokens", "hours", "prefix", "transcript", "number"], axis=1, ) dfTest = self._merge_columns( self.testData, destinationCol="features", originCol="transcript" ) dfTest = self._preprocess_features(col, self.testData) dfTest = dfTest.drop( ["features", "tokens", "hours", "prefix", "transcript", "number"], axis=1, ) if self.verbose: self._log("Train data prepared successfully") print(dfTrain.head()) self._log("Test data prepared successfully") print(dfTest.head()) else: self._log("Data prepared successfully") self.data = dfTrain self.testData = dfTest self.N_CLUSTER = int(np.sqrt(len(dfTrain))) self.N_TEST_CLUSTER = int(np.sqrt(len(dfTest))) if self.viz: self.generate_count_plot(data=dfTrain) self.generate_count_plot(data=dfTest) self._save_data_frame(dfTrain, fileName="603_clean.csv") self._save_data_frame(dfTest, fileName="614_test.csv")
def read(self, sep='\t') ‑> None
-
Read the data from the file path. The default separator is tab.
Expand source code
def read(self, sep="\t") -> None: """ Read the data from the file path. The default separator is tab. """ data = None if self.concat: for file in self.trainFiles: data = self._merge_frames(data, pd.read_csv(file, sep=sep)) else: data = pd.read_csv(self.filePath, sep=sep) self.data = data if self.data is None: raise Exception("Data is empty") if self.testPath is not None: self.testData = pd.read_csv(self.testPath, sep=sep) if self.verbose: row, col = self.data.shape self._log(f"Shape of train data read \nRows: {row}, Columns: {col}") if self.testPath is not None: row, col = self.testData.shape self._log(f"Shape of test data read \nRows: {row}, Columns: {col}") self._log("Data read successfully") else: self._log("Data read successfully") if self.testPath is not None: self._log("Test data read successfully")
def run(self) ‑> None
-
Run the classification model.
Expand source code
def run(self) -> None: """ Run the classification model. """ self.read() self.prepare(col="features") self._create_model() if self.viz: self._run_word_cloud_per_cluster(df=self.data) if self.testPath is not None: # TODO: Fix test data not having x and y columns # self.generate_scatter_plot(data=self.testData) pass if self.verbose: self._log("Successfully ran the classification model")