Distributed implementation of the outlier detection algorithm isolation forest

Distributed implementation of the outlier detection algorithm isolation forest

In the unsupervised field, there is an outlier detection algorithm with excellent accuracy and efficiency. I have used it several times in practice, and the effect is amazing. It is the very popular isolation forest in recent years. The algorithm has a ready-made package in sklearn, but if it runs on a big data cluster, there is currently no encapsulated interface, which brings a lot of inconvenience to the deployment of distributed tasks (the algorithm integrated in spark mllib is really too few ), this article uses scala to implement the distributed implementation of the algorithm on spark from scratch, and demonstrates the entire process of task execution on the cluster.

1. Introduction to the algorithm

Let me talk about the minimum necessary knowledge of the algorithm first, and the details will be explained in the code.

1. Training process: build the trees of the forest

iForest is composed of iTree. When constructing each iTree, extract N samples from the training data, and then randomly select a feature from these samples, and then randomly select a value under the feature, divide the samples into two branches, and then divide them into the left and right sides. The above process is repeated on the data set, and the termination condition is directly reached, and the construction of a tree is completed.

2. Prediction process: Calculate the abnormal score of the sample

Go down the test data along the corresponding conditional branch on each tree until it reaches the leaf node, and record the path length (denoted by h(x)) during this process. And from this, an abnormal score is obtained. When the score exceeds a certain threshold, it can be judged as an abnormal sample.

2. scala implementation

The main body of the code is not original, refer to a great foreign god: github.com/hsperr/firs... , with some modifications

1. 1. import the packages needed to write the spark program and the Random module of scala for randomly selecting functions.
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.util.Random
2. Define a single tree iTree. The second and third lines mean that the left and right branches of each tree ITreeBranch and leaf nodes ITreeLeaf belong to the subcategories of iTree.
sealed trait ITree
case class ITreeBranch(left: ITree, right: ITree, split_column: Int, split_value: Double) extends ITree
case class ITreeLeaf(size: Long) extends ITree
3. Define the class of the isolated forest and complete the training part of the algorithm, that is, the construction of all trees.

3.1. Sampling from the sample to build a single iTree

object IsolationForest {
    def getRandomSubsample(data: RDD[Array[Double]], sampleRatio: Double, seed: Long = Random.nextLong): RDD[Array[Double]] = {
        data.sample(false, sampleRatio, seed=seed)

3.2. Recursive construction to generate a single iTree.
data: the sample data extracted in the previous step;
maxHeight: the maximum height of the tree, which is the condition for the tree to stop growing;
numColumns: the number of features of the data;
currentHeight: the current height of the tree.
a complete ITree

    def growTree(data: RDD[Array[Double]], maxHeight:Int, numColumns:Int, currentHeight:Int = 0): ITree = {
        val numSamples = data.count()
       //maxHeight 1
        if(currentHeight>=maxHeight || numSamples <= 1){
            return new ITreeLeaf(numSamples)
        val split_column = Random.nextInt(numColumns)
        val column = data.map(s => s(split_column))
        val col_min = column.min()
        val col_max = column.max()
        val split_value = col_min + Random.nextDouble()*(col_max-col_min)
        val X_left = data.filter(s => s(split_column) < split_value).cache()
        val X_right = data.filter(s => s(split_column) >= split_value).cache()

        new ITreeBranch(growTree(X_left, maxHeight, numColumns, currentHeight + 1),
            growTree(X_right, maxHeight, numColumns, currentHeight + 1),

3.3. Build multiple iTrees into a complete forest iforest
data: all training data;
numTrees: the number of trees in the forest;
subSampleSize: sample size of each tree;
seed: random seed.
Isolated Forest

def buildForest(data: RDD[Array[Double]], numTrees: Int = 2, subSampleSize: Int = 256, seed: Long = Random.nextLong) : IsolationForest = {
        val numSamples = data.count()
        val numColumns = data.take(1)(0).size
        val maxHeight = math.ceil(math.log(subSampleSize)).toInt
        val trees = Array.fill[ITree](numTrees)(ITreeLeaf(1))

        val trainedTrees = trees.map(s=>growTree(getRandomSubsample(data, subSampleSize/numSamples.toDouble, seed), maxHeight, numColumns))

        IsolationForest(numSamples, trainedTrees)
4. Define the prediction function class

4.1 The prediction function class is defined as the sample class of IsolationForest, and the
num_samples: the number of samples of a single class iTree
trees: the isolated forest iforest that has been constructed

The main function predict,
parameter x: a single sample array to be predicted,
return: Anomaly Score
on each iTree, calculate the length of the path that the sample traverses to reach the leaf node, and then calculate the different path lengths according to the following formula Calculate to get the abnormal score. The shorter the path, the higher the score, and the more abnormal it represents.


In the formula, h(x) represents the path length, E(h(x)) represents the average value of the path length on different iTrees, that is, group decision-making, and the denominator is used for normalization.

case class IsolationForest(num_samples: Long, trees: Array[ITree]) {
    def predict(x:Array[Double]): Double = {
        val predictions = trees.map(s => pathLength(x, s, 0)).toList
        math.pow(2, -(predictions.sum/predictions.size)/cost(num_samples))//Anomaly Score

The cost method and pathLength method used in the above code are defined as follows. The cost method parameter is the number of samples in the binary tree, and the range is the average path length of the binary tree. The formula is:

    def cost(num_items:Long): Int =
        (2*(math.log(num_items-1) + 0.5772156649)-(2*(num_items-1)/num_items)).toInt

The pathLength method is a recursive calculation, because each step is taken, the next step is still a tree, branch tree or leaf node.
Parameters: sample x, single tree, current path length path_length, the initial value should be 0.
Returns: the final path length

    final def pathLength(x:Array[Double], tree:ITree, path_length:Int): Double ={
        tree match{//match tree 
           //ITree size size 1 size size 1 1
            case ITreeLeaf(size) => 
                if (size > 1)
                    path_length + cost(size)
                    path_length + 1

           //ITree left right split_column split_value
            case ITreeBranch(left, right, split_column, split_value) => 
                val sample_value = x(split_column) //x 

                if (sample_value < split_value) //
                    pathLength(x, left, path_length + 1)
                    pathLength(x, right, path_length + 1)

5. Read data to make predictions

This section defines the main method that will eventually be called to run. I put the sample data locally or on hdfs. The csv format has been standardized, as follows

Training data overview.png

5.1, some basic settings for spark

object Runner{
    def main(args:Array[String]): Unit ={

        val conf = new SparkConf()

        val sc = new SparkContext(conf)
        sc.hadoopConfiguration.set("mapred.output.compress", "false")  

5.2. Read in csv data and preprocess it, lines are in RDD format, which is the basic unit of spark processing data

        val lines = sc.textFile("file:///tmp/spark_data/spark_if_train.csv")//
        val data = //
                .map(line => line.split(","))
                .map(s => s.slice(1,s.length))
        val header = data.first()// 

       // double 
        val rows = data.filter(line => line(0) != header(0)).map(s => s.map(_.toDouble)) 

        println("Loaded CSV File...")
        println(header.mkString("\n")) // 
        println(rows.take(5).deep.mkString("\n")) // 5 

5.3. Carry out iforest construction and sample prediction

       // rows 10 100
        val forest = IsolationForest.buildForest(rows, numTrees=10)

        val result_rdd = rows.map(row => row ++ Array(forest.predict(row)))

        result_rdd.map(lines => lines.mkString(",")).repartition(1).saveAsTextFile("file:///tmp/predict_label")

       // 10 
        val local_rows = rows.take(10)
        for(row <- local_rows){
            println("ForestScore", forest.predict(row))
        println("Finished Isolation")

Above, the isolation forest training part and prediction part have been completed.

3. deploy to spark and run

(The picture has coded my own machine, which is slightly ugly )

1. Basic environment configuration

Prerequisite 1: The spark cluster is configured to successfully enter the interactive state shown in the figure below. Google this part of the tutorial by yourself~

$ spark-shell
Enter the spark interactive command line to check whether it is running normally.

Premise 2: Configure sbt to manage project dependencies and build projects.
Reference tutorial: blog.csdn.net/zcf10027972...

sbt sbtVersion
Check the sbt version information to ensure that sbt is installed correctly.
2. Deployment script

2.1. Name the code file of the previous section Runner.scala 2.2. Create a directory structure

cd ~
mkdir -p mysparkapp/iforest_model/src/main/scala

2.3. Move Runner.scala to ~/mysparkapp/iforest_model/src/main/scala folder

mv Runner.scala ~/mysparkapp/iforest_model/src/main/scala/

2.4. Create a new configuration file conf.sbt, declare the name of our project and the dependency information on the relevant version

cd ~/mysparkapp/iforest_model
vim conf.sbt

In conf.sbt, add the following content, the version information is written according to the real information you configured:

name := "IsolationForest"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.1"

Now see if our project structure is as shown in the figure

find .
View the project directory structure.png

2.5. Package the program, still under ~/mysparkapp/iforest_model, execute:

sbt package
sbt package.png

Pay attention to the file address pointed by the yellow arrow. This is the packaged jar package for us to submit the task later.

2.6. Formally submit a spark task Before submitting a spark task, make sure that the output directory does not exist:

rm -r/tmp/predict_label

Then use the spark-submit command to submit the task, you need to pass in the path of the jar package that has just been packaged:

spark-submit --class "Runner" ~/mysparkapp/iforest_model/target/scala-2.11/isolationforest_2.11-1.0.jar

Start running~~ We printed out the column name of the data, the first 5 data, and the abnormal score of the first 10 data as shown in the figure:

Program execution.png

After the task is executed, look at the output file. The first five lines are shown in the figure. The last field is the prediction score. Then you can set a threshold. The original paper is recommended to be 0.6. If it is greater than the threshold, it is judged as abnormal.

Output file.png

The second row of data in the figure has a score of 0.69, and the scores of other data are all below 0.5. Observe the field in front of it. It is much larger than other data. It is indeed an abnormal point~

4. summary

The isolation forest is composed of multiple trees, and the growth process of the tree is not affected by other trees, so it is a perfect algorithm for distributed parallelism. The sample data and code are put on github.com/scarlettgin...