Document Classification using Apache Spark in Scala

Posted on 26 Sep, 2016

Email Spam Identification, category classification of news and organization of web pages by search engines are the modern world examples for document classification. It is a technique to systematically classify a text document in one of the fixed category, or In other words, tagging of text document can be described as document classification process. This technique is really helpful when the amount of data is too large, specially for organizing, information filtering and storage purposes.

In this article, we will discuss an approach to implement an end to end document classification pipeline using Apache Spark, and we will use Scala as the core programming language. Apache Spark is the ideal choice while dealing with a greater volume and variety of data. Apache Spark’s machine learning library - Mllib is scalable, easy to deploy and is hundred times faster than mapreduce operations.

Table of Contents

1. Document Pre Processing
2. Initializing Apache Spark
3. Dataset Preparation
4. Creating the MLlib pipeline
5. Training the model
6. Prediction on Test Data
7. Conclusion and Next Steps


1. Document Pre Processing

The first component of the pipeline in is pre-processing block which involves removal of noisy content from the document. This included cleaning of URLs, punctuations, digits, short words, extra whitespace and english stopwords etc. Below are the scala utility functions used for cleaning various regular expressions and custom words.

```
\\ Utility function to remove particular regex from text
  def removeRegex(txt: String, flag: String): String = {
    val regex = RegexList.get(flag)
    var cleaned = txt
    regex match {
      case Some(value) =>
        if (value.equals("white_space")) cleaned = txt.replaceAll(value, "")
        else cleaned = txt.replaceAll(value, " ")
      case None => println("No regex flag matched")
    }
    cleaned
  }

\\ Particular function to remove stopwords from text
  def removeCustomWords(txt: String, flag: String): String ={
    var words = txt.split(" ")
    val stopwords = Stopwords.get(flag)
    stopwords match {
      case Some(value) => words = words.filter(x => !value.contains(x))
      case None => println("No stopword flag matched")
    }
    words.mkString(" ")
  }
```


To use these functions, next step is to create the regular expressions and cleaning the entire documents step by step.


```
\\ Building a List of Regex for PreProcessing the text
  var RegexList = Map[String, String]()
  RegexList += ("punctuation" -> "[^a-zA-Z0-9]")
  RegexList += ("digits" -> "\\b\\d+\\b")
  RegexList += ("white_space" -> "\\s+")
  RegexList += ("small_words" -> "\\b[a-zA-Z0-9]{1,2}\\b")
  RegexList += ("urls" -> "(https?\\://)\\S+")

\\ Loading a stopwords list
  var Stopwords = Map[String, List[String]]()
  Stopwords += ("english" -> Source.fromFile("stopwords.txt").getLines().toList)

\\ Function to perform step by step text preprocessing and cleaning on documents 
def cleanDocument(document_text: String) : String = {
	\\ Converting all words to lowercase
	\\ Removing URLs from document 
   	\\ Removing Punctuations from document text
   	\\ Removing Digits from document text
    	\\ Removing all words with length less than or equal to 2 
\\ Removing extra whitespaces from text 
	\\ Removing English Stopwords
	\\ Returning the preprocessing and cleaned document text

var text = document_text.toLowerCase
text = removeRegex(text,"urls")
	text = removeRegex(text,"punctuation")
	text = removeRegex(text,"digits")
	text = removeRegex(text,"small_words")
	text = removeRegex(text,"white_space")
	text = removeCustomWords(text, "english")
	text
  }
```


2. Initializing Sparkcontext

To use Spark, we need to initialize it and create contexts to be used for training the classifiers, building the pipelines and making necessary transformations. Following lines of code can be used for this purpose.

       ```	
    val conf = new SparkConf().setMaster("local[*]").setAppName("DC")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    ```


3. Dataset Preparation - Loading the Documents

Now, we need to load the documents and create a dataframe using sql context and splitting it into test data and training dataframes. This will include reading the text file (containing the documents), creating file RDD to data frame and finally slicing the dataframe into training and test.

    ```
    // Loading the text file using sc.textFile function and creating an RDD
    // RDD shape: “CleanedText”,Category”

    val input_path = "/path/to/data.txt"
    val input_RDD = sc.textFile(input_path).map(x => {
      val row = x.split(",")
      (cleanDocument(row(1)),row(2))
    })
   
   // Converting an RDD to DataFrame
   val trainingDF = sqlContext.createDataFrame(input_RDD)
			.toDF("id","cleaned","category")

   // Slicing the data into 70:30 ratio for training and testing data 
   val Array(trainingData, testData) = trainingDF.randomSplit(Array(0.7, 0.3))
    
    // print the training data
    trainingData.show()
   ```
   


4. Creating the MLlib PipeLine

In the next step, we will prepare the processing and classification pipeline using MLlib. This pipeline consists of: Indexer (to convert category names into Indexes) Tokenization (for converting text into tokens (words)) hashingTF (a term frequency matrix for every document. The role of term frequency is to act as features of every document. MLlib provides Hashing trick implementation) For classification component, we will use logistic regression. When the problem is multi class classification, we will wrap the model in one vs. rest model.

   ```
      // Processing
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("label")
  	val tokenizer = new Tokenizer()
.setInputCol("cleaned")
.setOutputCol("tokens")
   	val hashingTF = new HashingTF()
.setInputCol("tokens").setOutputCol("features")
.setNumFeatures(20)


// Classification   
 	 val lr = new LogisticRegression().setMaxIter(100).setRegParam(0.001)
  	 val ovr = new OneVsRest().setClassifier(lr)
   ```
   


5. Training The Model

Creating the final pipeline of all the components, and fitting the model on training data.

     ```
   val pipeline = new Pipeline()
     .setStages(Array(indexer, tokenizer, hashingTF, ovr))
   val model = pipeline.fit(trainingData)
   ```


6. Prediction on Test Data

Once the model is trained, it can be used for making predictions on test data. One can use Confusion Matrix or Cross Validation techniques in order to measure the accuracies of the pipeline.

   ```
   // create the classification pipeline and train the model
   val prediction = model.transform(testData)
.select("id","cleaned_text","category","prediction")
   
   // print the predictions
   prediction.foreach(println)
      ```


7. Conclusion and Next Steps

The full code of this tutorial can be found here, This tutorial explains about creating a pipeline for document classification in spark using scala. This end to end pipeline is capable for predicting the unknown classes of different text with decent accuracies. Next Steps, are obviously about improvement of each component involved in this pipeline. Refer to the official MLlib Link and Spark programming Guide for more detailed documentation. Feel free to share your thoughts in the comments.