[Spark] 파티셔닝 예제 - 페이지랭크(PageRank) 알고리즘

2021. 5. 6. 01:53 Big Data/Apache Spark

파티셔닝 예제 - 페이지랭크(PageRank) 알고리즘

  RDD 파티셔닝에 의한 효과를 볼 수 있는 좀 더 복잡한 알고리즘 예제로 페이지 랭크를 생각 할 수 있다. 페이지 랭크 알고리즘은 구글의 공동 창업자 Larry Page의 이름을 인용한 것으로 얼마나 많은 문서들이 해당 문서를 링크하고 있는지 기초하여 각 문서에 대해서 중요도를 매기는 알고리즘을 말한다. 페이지 랭크 알고리즘은 웹페이지의 중요도를 측정하는 척도로도 사용이 되지만, 과학 논문에서 어떤 논문이 중요한지 평가하거나, SNS의 영향력에 있는 허브유저를 찾아내는 데에도 사용이 되고 있다. 

  페이지 랭크의 단점은 많은 조인을 수행하는 반복알고리즘이다. 그렇기 때문에 많은 조인을 반복적을 효율적으로 처리가 가능한 RDD 파티셔닝을 이용하면 좋은 결과를 얻을 수 있다.(빠르게 중요도를 평가 할 수 있겠지요.)

  페이지 랭크 알고리즘은 (pageID, linklist)의 형태로 각 페이지와 그 이웃 페이지들을 가지고 있으며, 하나는 (pageID, rank)의 형태로 각 페이지의 현재 랭크를 관리합니다. 알고리즘은 아래와 같이 동작을 합니다.

  1. 각 페이지의 랭크를 1.0으로 초기화 한다.
  2. 매번의 반복 주기마다 페이지 p는 공헌치 "랭크(p)/이웃숫자(p)"를 이웃들(자신이 링크하고 있는 페이지)에게 보낸다.
  3. 각 페이지의 랭크를 0.15+ 0.85*(공헌치)로 갱신한다.

PageRank 알고리즘 소스코드

val links = sc.objectFile[(String, Seq[String])]("links").partitionBy(new HashPartitioner(100)).persist()

val ranks = links.mapValues(v => 1.0)

for (i <- 0 until 10) {
	val contributions = links.join(ranks).flatMap {
		case (pageId, (link, rank)) =>
		links.map(dest => (dest, rank / links.size))
	}
	rank = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v)
}

ranks.saveAsTextFile("ranks")
  • 여기서 links와 ranks가 계속 반복적으로 join되는 것을 볼 수 있다. 하지만 links RDD는 고정된 데이터셋이므로 시작점에서 바로 partitionBy()를 적용하였고, 결과적으로는 join을 할때 links는 셔플링을 할 필요가 없어졌다. 또한 persist()를 통해 메모리에 로딩을 하고 사용하기 때문에, 호출을 할때마다 links RDD를 다시 생성할 필요가 없다. 
  • ranks를 생성할때 links RDD의 파티셔닝을 유지하기 위해서 map()이 아닌 mapValues를 사용을 했다.
  • forloop에서 reduceByKey()다음에 mapValues()를 호출했다. reduceByKey()의 결과는 이미 해시 파티셔닝이되어 있으므로 다음 차례의 반복 주기에서 매핑된 결과와 다음 loop에서의 ranks와 links의 조인을 더 효율적으로 만든다. 

(파티셔닝 관련 최적화의 효과를 극대화하려면, 데이터의 키가 변경되어야 하는 경우가 아닐 때 반드시 mapValues()나 flatMapValues()를 사용해야 한다.)

 

사용자 파티셔너 지정

  파티셔닝은 스파크에서 성능에 상당한 영향을 주는 요소이다. 셔플링을 줄이기 위해서는 파티셔닝을 효율적으로 잘 해야 한다. 그럼 잘한다는게 어떤 말일지 풀어말하면 스파크에서는 HashPartitioner와 RangePartitioner를 이용해 다양한 상황에 적합하도록 구현이 되어있습니다. 하지만 위에 페이지 랭크와 같은 경우에는 어떻게 파티셔닝을 하는게 효율적일지 생각해보면 HashPartitioner와 RangePartitioner를 이요하기 보다는 Partitioner 객체를 만들어서 튜닝을 하는게 더 효율적입니다. 물론 스파크에서도 이처럼 사용자가 Partitioner를 생성을 할 수 있습니다. 

  그렇다면 왜 HashPartitioner와 RangePartitioner를 사용하는게 효율적이지 않을까요. 예를 들어서 페이지랭크에서는 key로 사용해야 하는 값이 각 페이지의 ID(URL)입니다. 기존 파티션을 사용한다면 유사한 URL을 갖고 있는 사이트가 다른 노드에 해싱이 되어 버릴 가능성이 있습니다. 하지만 페이지는 동일한 도메인 안에 있으면 당연히 서로 링크가 많이 되어 있을 것을 알고 있습니다. 그렇기 때문에 결과적으로는 같은 도메인을 갖고 있는 페이지는 동일한 노드에 파티셔닝이 되는것이 유리합니다. 이처럼 도메인 네임에 대해서 해싱을 하는 Partitioner를 구현하면 더 효율적으로 파티셔닝을 할 수 있습니다. 

  사용자 지정 파티셔너를 구현하기 위해서는 org.apache.spark.Partitioner 클래스를 상속받고, 세 개의 메소드를 오버라이딩 하면 됩니다. 

  • numParititions: Int, 생성할 파티션의 개수
  • getPartition(key: Any) 주어진 키에 대한 파티션ID를 반환
  • equals(): 동일한 객체인지 확인

소스코드

// 각 URL의 도메인 네임을 해싱해 DomainNamePartitioner를 구현
class DomainNamePartitioner(numParts: Int) extends Partitioner {
	override def numPartitions: Int = numParts
	override def getPartition(key: Any): Int = {
		// 
		val domain = new Java.net.URL(key.toString).getHost()
		val code = (domain.hashCode % numPartitions)
		// 자바의 hashCode()에 기반해서 구현을 한다면, 음수를 리턴할 수 있으므로 getPartition()이 음수를 반환하지 않도록 주의해야 한다. 
		if (code < 0) {
			code + numPartitions
		} else {
			code
		}
	}

	override def equals(other: Any): Boolean  = other match {
		case dnp: DomainNamePartitioner =>
			dnp.numPartitions == numPartitions
		case _ =>
			false
	}
}

[참고] 서적 - 러닝 스파크

 

출처 : ourcstory.tistory.com/157?category=630696