Skip to content

Latest commit

 

History

History
231 lines (155 loc) · 12.8 KB

README.md

File metadata and controls

231 lines (155 loc) · 12.8 KB

Build Status Coverage Status

MCL Spark

License: MIT

MCL Spark is an experimental project which goal is to implement a graph clustering algorithm in Spark, using especially distributed matrix tools embedded in the scala API.

Why MCL algorithm? Because it responds to Spark MLLib contribution policy first four points:

  • Be widely known
  • Be used and accepted
  • Be highly scalable
  • Be well documented

Please do not hesitate to post comments or questions.

Most of the following content is based on Stijn van Dongen website (http://micans.org/mcl/).

Table of Contents

Getting Started

Online Documentation

A Scaladoc is available here.

Requirements

Building From Sources

This library is built with SBT. To build a JAR file simply run "sbt package" from the project root. Currently project was built under scala 2.10.5.

Use embarked example


$MCL_SPARK_HOME/sbt "run [--expansionRate num] [--inflationRate num] [--epsilon num] [--maxIterations num]  [--selfLoopWeight num] [--graphOrientationStrategy string]"

Import MCL into your Spark Shell


$SPARK_HOME/bin/spark-shell --jars $MCL_SPARK_HOME/target/scala-2.10/mcl_spark_2.10-1.0.0.jar 

Then use MCL as follows:

import org.apache.spark.graphx._
import org.apache.spark.mllib.clustering.{Assignment, MCL}
import org.apache.spark.rdd.RDD

val users: RDD[(VertexId, String)] =
            sc.parallelize(Array((0L,"Node1"), (1L,"Node2"),
                (2L,"Node3"), (3L,"Node4"),(4L,"Node5"),
                (5L,"Node6"), (6L,"Node7"), (7L, "Node8"),
                (8L, "Node9"), (9L, "Node10"), (10L, "Node11")))

// Create an RDD for edges
val relationships: RDD[Edge[Double]] =
            sc.parallelize(
              Seq(Edge(0, 1, 1.0), Edge(1, 0, 1.0),
                Edge(0, 2, 1.0), Edge(2, 0, 1.0),
                Edge(0, 3, 1.0), Edge(3, 0, 1.0),
                Edge(1, 2, 1.0), Edge(2, 1, 1.0),
                Edge(1, 3, 1.0), Edge(3, 1, 1.0),
                Edge(2, 3, 1.0), Edge(3, 2, 1.0),
                Edge(4, 5, 1.0), Edge(5, 4, 1.0),
                Edge(4, 6, 1.0), Edge(6, 4, 1.0),
                Edge(4, 7, 1.0), Edge(7, 4, 1.0),
                Edge(5, 6, 1.0), Edge(6, 5, 1.0),
                Edge(5, 7, 1.0), Edge(7, 5, 1.0),
                Edge(6, 7, 1.0), Edge(7, 6, 1.0),
                Edge(3, 8, 1.0), Edge(8, 3, 1.0),
                Edge(9, 8, 1.0), Edge(8, 9, 1.0),
                Edge(9, 10, 1.0), Edge(10, 9, 1.0),
                Edge(4, 10, 1.0), Edge(10, 4, 1.0)
              ))

// Build the initial Graph
val graph = Graph(users, relationships)
graph.cache()

val clusters: RDD[Assignment] =
    MCL.train(graph).assignments
clusters
    .map(assignment => (assignment.cluster, assignment.id))
    .groupByKey()
    .foreach(cluster =>
        println(cluster._1 + " => " + cluster._2.map(node => node).toString)
    )

Parameters choices

Inflation and Expansion rates => The two parameters influence what we call cluster granularity, so how many and how strong should be detected groups of nodes. Inflation increases intra cluster links and decreases inter cluster links while expansion connects nodes to further and new parts of the graph. Default = 2

  1. A big inflation rate will strengthen existing clusters.
  2. A big expansion rate will boost clusters merging.

Nota bene: Only integers are accepted for expansion rate for now (for computational reasons).

Epsilon => It is used to set to zero some negligible values (see Optimization paragraph for more details). Default = 0.01

Maximum number of iterations => Regarding Stijn van Dongen recommendations, a steady state is usually reached after 10 iterations (default value of maxIterations). Default = 10

Self loops weight management => A percentage of the maximum weight can be applied to self loops addition. For example, for a binary graph, 1 is the maximum weight to allocate (see Optimization paragraph for more details). Default = 0.1

Directed and undirected graphs management => To deal with directed graphs. Default = "undirected"

  1. "undirected": graph is supposed undirected. No edges are added.
  2. "directed": graph is supposed directed. Each edge inverse is added so graph becomes undirected.
  3. "bidirected": graph already owns bidirected edges. Excepted for already existing undirected edges, each edge inverse is added so graph becomes undirected.

See Implementation thoughts for more details.

MCL (Markov Cluster) algorithm theory

Recall about Markov chains

"A Markov chain is a sequence of random variables X1, X2, X3, ... with the Markov property, namely that the probability of moving to next state depends only on the present state and not on the previous states." (wikipedia definition)

Defintion: a state is absorbent when it cannot be left.

Definition: a Markov chain is aperiodic, if it at least one of its state has a period of 1, so returning to the original state occurs irregularly.

Definition: a Markov chain is irreducible, if it is possible to get to any state from any state.

Definition: a Markov chain is ergodic, if it is both aperiodic and irreducible.

Principle

To detect clusters inside a graph, MCL algorithm uses a Column Stochastic Matrix representation and the concept of random walks. The idea is that random walks between two nodes of a same group are more frequent than between two nodes belonging to different ones. So we should compute probability that a node reach each other node of the graph to have a better insight of clusters.

Definition: a Column Stochastic Matrix (CSM) is a non-negative matrix which each column sum is equal to 1. In our case, we will prefer Row Stochastic Matrix (RSM) instead of CSM to use Spark API tools (see Implementation thoughts for more details).

Two steps are needed to simulate random walks on a graph: expansion and inflation. Each step is associated with a specific rate (respectively eR and iR). In the following formula, n is the number of nodes in the graph.

Expansion

To perform expansion, we raise the stochastic matrix to the power eR using the normal matrix product.

, for eR = 2.

Inflation

To perform inflation, we apply the Hadamard power on the RSM (powers entrywise) and we then normalize each row to get back to probabilities.

Convergence and clusters interpretation

After each loop (expansion and inflation), a convergence test is applied on the new matrix. When it remains stable regarding the previous iteration, then the algorithm stops. Otherwise, a maximum number of iterations is defined to force the process to reach a steady state.

, where n is the number of rows and columns of adjacency matrix.

Each non-empty column (with non-zero values) of A, corresponds to a cluster and its composition. A cluster will be a star with one or several attractor(s) in the center (see example below).

Graph shape for different convergence status (http://micans.org)

A node can belong to one or several cluster(s).

Optimizations

Most of the following solutions were developed by Stijn van Dongen. More could come based on matrix distribution state.

  • Add self loop to each node. This is generally used to satisfy aperiodic condition of graph Markov chain. More than an optimization, this is required to avoid the non-convergence of MCL because of the infinite alternation between different states (depending on the period). Default weight allocated is the maximum weight of every edges related to the current node. To stay as closed as possible of the true graph, self loop weights can be decreased.
  • Most of big graphs are sparse because of their nature. For example, in a social graph, people are not related to every other users but mostly to relatives, friends or colleagues (depending on the nature of the social network). In inflation and expansion steps, "weak" connections weight tends to zero (since it is the goal to detect strong connections in order to bring out clusters) without reaching it. In order to take advantage of sparsity representation of the graph, this value should be set to zero after each iteration, if it is lower than a very small epsilon (e.g. 0.01).
  • In order to improve convergence test speed, MCL author proposed a more efficient way to proceed. (Not Implemented Yet)

Implementation thoughts

Spark matrices universe

As explained in introduction, this program is exclusively based on scala matrices Spark API. Two main matrix types are explored to realize inflation, expansion and normalization step: IndexedRowMatrix and BlockMatrix.

IndexedRowMatrix

  • Advantages: Each row can be stored in a sparse way, normalization is easy to apply since we apply it per row (instead of column like in the original implementation).
  • Disadvantages: No multiplication between two IndexedRowMatrix available.

BlockMatrix

  • Advantages: Fully scalable => Blocks of adjustable size (1024x1024 by default), with sparse matrices using Compressed Sparse Column
  • Disadvantages: Hard to implement normalization.

The last option available is to transform adjacency matrix from BlockMatrix to IndexedRowMatrix (and vice versa) which can be a very expensive operation for large graph.

Directed graphs management

To respect the irreducibility of graphs Markov chain, MCL is only applied on undirected ones. For example, in a directed bipartite graph, there are a bunch of absorbent states, so associated markov chain is reducible and does not respect ergodic condition.

To offer the possibility to users to apply MCL on directed graphs, the only way is to make the graph symmetric by adding each edge inverse. This is due to GraphX API where edges are only directed. For the particular case of bidirected graphs (where some edges and their inverse already exist), birected edges remain as it is.

Note that symmetry (same weight for an edge and its inverse) is preferred for more efficiency.

Hypergraph

When two nodes are related to each other with several edges, those edges are merged so there remains only one and its weight is the sum of every weights.

References

  • Stijn van Dongen. MCL - a cluster algorithm for graphs. Official Website
  • Kathy Macropol. Clustering on Graphs: The Markov Cluster Algorithm (MCL). A Presentation
  • Jean-Benoist Leger, Corinne Vacher, Jean-Jacques Daudin. Detection of structurally homogeneous subsets in graphs. A Survey