Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ object LPAExample {
.getOrElse(throw new Exception("checkpoint dir not provided"))
sc.setCheckpointDir(cpDir)

val maxIter = params.getOrElse("maxIter", "10").toInt

val sep = params.getOrElse("sep", "space") match {
case "space" => " "
case "comma" => ","
Expand All @@ -59,6 +61,7 @@ object LPAExample {
.setSrcNodeIdCol("src")
.setDstNodeIdCol("dst")
.setUseBalancePartition(useBalancePartition)
.setMaxIter(maxIter)

val df = GraphIO.load(input, isWeighted = false, srcIndex, dstIndex, sep = sep)
val mapping = lpa.transform(df)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.tencent.angel.spark.context.PSContext
import com.tencent.angel.graph.params._
import org.apache.spark.SparkContext
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.{IntParam, ParamMap}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row}
Expand All @@ -29,11 +29,13 @@ import org.apache.spark.storage.StorageLevel
class LPA(override val uid: String) extends Transformer
with HasSrcNodeIdCol with HasDstNodeIdCol with HasOutputNodeIdCol with HasOutputCoreIdCol
with HasStorageLevel with HasPartitionNum with HasPSPartitionNum with HasUseBalancePartition {



final val maxIter = new IntParam(this, "maxIter", "maxIter")
final def setMaxIter(numIters: Int): this.type = set(maxIter, numIters)
setDefault(maxIter, 10)

def this() = this(Identifiable.randomUID("LPA"))



override def transform(dataset: Dataset[_]): DataFrame = {
val edges = dataset.select($(srcNodeIdCol), $(dstNodeIdCol)).rdd
.filter(row => !row.anyNull)
Expand Down Expand Up @@ -68,7 +70,7 @@ class LPA(override val uid: String) extends Transformer
var curIteration = 0
var numMsgs = model.numMsgs()
var prev = graph

val maxIterNum = $(maxIter)
println(s"numMsgs = $numMsgs")

do {
Expand All @@ -81,7 +83,7 @@ class LPA(override val uid: String) extends Transformer
model.resetMsgs()
numMsgs = model.numMsgs()
println(s"curIteration=$curIteration numMsgs=$numMsgs")
} while (curIteration < 10)
} while (curIteration < maxIterNum)

val retRDD = graph.map(_.save).flatMap(f => f._1.zip(f._2))
.sortBy(_._2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ import org.apache.spark.storage.StorageLevel


class WCC(override val uid: String) extends Transformer
with HasWeightCol with HasSrcNodeIdCol with HasDstNodeIdCol
with HasOutputNodeIdCol with HasOutputCoreIdCol with HasBalancePartitionPercent
with HasIsWeighted with HasPartitionNum with HasPSPartitionNum
with HasStorageLevel with HasBatchSize with HasPullBatchSize
with HasBufferSize with HasUseBalancePartition {
with HasWeightCol with HasSrcNodeIdCol with HasDstNodeIdCol
with HasOutputNodeIdCol with HasOutputCoreIdCol with HasBalancePartitionPercent
with HasIsWeighted with HasPartitionNum with HasPSPartitionNum
with HasStorageLevel with HasBatchSize with HasPullBatchSize
with HasBufferSize with HasUseBalancePartition {

def this() = this(Identifiable.randomUID("WCC"))

override def transform(dataset: Dataset[_]): DataFrame = {
// read edges
val edges = dataset.select($(srcNodeIdCol), $(dstNodeIdCol)).rdd
Expand All @@ -43,56 +43,62 @@ class WCC(override val uid: String) extends Transformer
.filter(e => e._1 != e._2)

edges.persist(StorageLevel.DISK_ONLY)

val maxId = edges.map(e => math.max(e._1, e._2)).max() + 1
val minId = edges.map(e => math.min(e._1, e._2)).min()
val nodes = edges.flatMap(e => Iterator(e._1, e._2))
val numEdges = edges.count()

println(s"minId=$minId maxId=$maxId numEdges=$numEdges level=${$(storageLevel)}")

// Start PS and init the model
println("start to run ps")
PSContext.getOrCreate(SparkContext.getOrCreate())

val model = WCCPSModel.fromMinMax(minId, maxId, nodes, $(psPartitionNum), $(useBalancePartition), $(balancePartitionPercent))

// make un-directed graph, for wcc
var graph = edges.flatMap { case (srcId, dstId) => Iterator((srcId, dstId), (dstId, srcId)) }
.groupByKey($(partitionNum))
.mapPartitionsWithIndex((index, adjTable) => Iterator(WCCGraphPartition.apply(index, adjTable)))
.mapPartitionsWithIndex((index, adjTable) => Iterator((0, WCCGraphPartition.apply(index, adjTable))))
graph.persist($(storageLevel))
graph.foreachPartition(_ => Unit)
graph.foreach(_.initMsgs(model))
graph.foreach(_._2.initMsgs(model))

var numMsgs = model.numMsgs()
var curIteration = 0
var prev = graph
println(s"numMsgs=$numMsgs")

// each node change its label into the min id of its neighbors (including itself).
var changedCnt = 0
var changedCnt = 0
do {
curIteration += 1
changedCnt = 0
changedCnt = graph.map(_.process(model, numMsgs, curIteration == 1)).reduce((n1, n2) => n1 + n2)
changedCnt = 0
graph = prev.map(_._2.process(model, numMsgs, curIteration == 1))
graph.persist($(storageLevel))
graph.count()
changedCnt = graph.map(_._1).reduce((n1, n2) => n1 + n2)
prev.unpersist(true)
prev = graph
model.resetMsgs()

println(s"curIteration=$curIteration numMsgs=$changedCnt")
} while (changedCnt > 0)

val retRDD = graph.map(_.save()).flatMap(f => f._1.zip(f._2))
val retRDD = graph.map(_._2.save()).flatMap(f => f._1.zip(f._2))
.map(f => Row.fromSeq(Seq[Any](f._1, f._2)))
dataset.sparkSession.createDataFrame(retRDD, transformSchema(dataset.schema))
}

override def transformSchema(schema: StructType): StructType = {
StructType(Seq(
StructField(s"${$(outputNodeIdCol)}", LongType, nullable = false),
StructField(s"${$(outputCoreIdCol)}", LongType, nullable = false)
))
}

override def copy(extra: ParamMap): Transformer = defaultCopy(extra)

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,32 @@ class WCCGraphPartition(index: Int,
val msgs = VFactory.sparseLongKeyLongVector(model.dim)
for (i <- keys.indices){
msgs.set(keys(i), keys(i))
keyLabels(i) = keys(i)
}
model.initMsgs(msgs)
msgs.size().toInt
}

// if label of node is larger than its neighbors',
// change it into min among its neighbors' labels
def process(model: WCCPSModel, numMsgs: Long, isFirstIteration: Boolean): Int = {
def process(model: WCCPSModel, numMsgs: Long, isFirstIteration: Boolean): (Int, WCCGraphPartition) = {
var changedNum = 0
if (numMsgs > indices.length || isFirstIteration) {
val inMsgs = model.readMsgs(indices)
val outMsgs = VFactory.sparseLongKeyLongVector(inMsgs.dim())

for (idx <- keys.indices) {
keyLabels(idx) = inMsgs.get(keys(idx))
val newLabel = minNbrLabel(idx, inMsgs)
if (newLabel < keyLabels(idx)) {
keyLabels(idx) = newLabel
outMsgs.set(keys(idx), newLabel)
changedNum += 1
}
outMsgs.set(keys(idx), newLabel)
}
model.writeMsgs(outMsgs)
changedNum
(changedNum, new WCCGraphPartition(index, keys, indptr, neighbors, keyLabels, indices))

}
else {
val inMsgs = model.readAllMsgs()
Expand All @@ -64,20 +66,20 @@ class WCCGraphPartition(index: Int,
val newLabel = minNbrLabel(idx, inMsgs)
if (newLabel < keyLabels(idx)) {
keyLabels(idx) = newLabel
outMsgs.set(keys(idx), newLabel)
changedNum += 1
}
outMsgs.set(keys(idx), newLabel)
}

model.writeMsgs(outMsgs)
changedNum
(changedNum, new WCCGraphPartition(index, keys, indptr, neighbors, keyLabels, indices))
}
}

def save(): (Array[Long], Array[Long]) = {
(keys, keyLabels)
}

def minNbrLabel(idx: Int, inMsgs: LongLongVector): Long = {
var j = indptr(idx)
var minLabel = keyLabels(idx)
Expand All @@ -89,7 +91,7 @@ class WCCGraphPartition(index: Int,
j += 1
}
minLabel

}
}

Expand All @@ -98,7 +100,7 @@ object WCCGraphPartition {
val indptr = new IntArrayList()
val keys = new LongArrayList()
val neighbors = new LongArrayList()

indptr.add(0)

while (iterator.hasNext) {
Expand All @@ -107,7 +109,7 @@ object WCCGraphPartition {
ns.toArray.distinct.foreach(n => neighbors.add(n))
indptr.add(neighbors.size())
}

val keysArray = keys.toLongArray()
val neighborsArray = neighbors.toLongArray()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,42 +26,48 @@ import com.tencent.angel.spark.util.VectorUtils
import org.apache.spark.rdd.RDD

class WCCPSModel(var inMsgs: PSVector,
var outMsgs: PSVector) extends Serializable {
var outMsgs: PSVector) extends Serializable {
val dim: Long = inMsgs.dimension

def initMsgs(msgs: Vector): Unit = {
inMsgs.update(msgs)
}

def readMsgs(nodes: Array[Long]): LongLongVector = {
inMsgs.pull(nodes).asInstanceOf[LongLongVector]
}

def readAllMsgs(): LongLongVector = {
inMsgs.pull().asInstanceOf[LongLongVector]
}

def writeMsgs(msgs: Vector): Unit = {
inMsgs.update(msgs)
outMsgs.update(msgs)
}

def numMsgs(): Long = {
VectorUtils.nnz(inMsgs)
}


def resetMsgs(): Unit = {
val temp = inMsgs
inMsgs = outMsgs
outMsgs = temp
outMsgs.reset
}
}

object WCCPSModel {
def fromMinMax(minId: Long, maxId: Long, data: RDD[Long], psNumPartition: Int,
useBalancePartition: Boolean, balancePartitionPercent: Float): WCCPSModel = {
useBalancePartition: Boolean, balancePartitionPercent: Float): WCCPSModel = {
val matrix = new MatrixContext("labels", 2, minId, maxId)
matrix.setValidIndexNum(-1)
matrix.setRowType(RowType.T_LONG_SPARSE_LONGKEY)

if (useBalancePartition) {
LoadBalancePartitioner.partition(data, maxId, psNumPartition, matrix, balancePartitionPercent)
}

PSAgentContext.get().getMasterClient.createMatrix(matrix, 10000L)
val matrixId = PSAgentContext.get().getMasterClient.getMatrix("labels").getId
new WCCPSModel(new PSVectorImpl(matrixId, 0, maxId, matrix.getRowType),
Expand Down