[Spark] RDD데이터 파티셔닝 - 이론 및 예제

2021. 5. 6. 01:50 Big Data/Apache Spark

RDD데이터 파티셔닝 - 이론 및 예제 

  이번에 설명한 내용은 스파크에서 노드 간 데이터세트의 파티셔닝을 어떻게 제어할 것인가 하는 것이다. 분산 프로그램에서 통신은 비용이 매우 크므로 네트워크 부하를 최소화할 수 있는 데이터 배치는 프로그램 성능을 비약적으로 향상시킬 수 있습니다. 비분산프로그램이 수많은 데이터 레코드 처리를 위해 올바른 자료 구조를 선택할 피ㄹ요가 있는 것처럼, 스파크의 애플리케이션도 네트워크 비용을 줄이기 위해서는 RDD의 파티셔닝을 제어해야 합니다. 파티셔닝은 조인 같이 키 중심의 연산에서 데이터세트가 여러번 재활용 될 때만 의미가 있습니다. 

val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserId, UserInfo]("hdfs://...").persist()

// 5분간의 이벤트 로그 파일을 처리하기 위해 주기적으로 불리는 함수
// 여기서 처리하는 시퀀스 파일이 (UserId, LinkInfo) 쌍을 갖고 있다.
def processNewLogs(logFileName: String) {
	val events = sc.sequenceFile[UserId, LinkInfo](logFileName)
	val joined = userData.join(events) // (UserId, (UserInfo, LinkInfo)) RDD

	val offTopicVisits = joined.filter {
		case (userId, (userInfo, linkInfo)) =>
		!userInfo.topics.contains(linkInfo.topic)
	}.count()

	println("Number of visits to non-subscribed topics: " + offTopicVisits)
}

 

  스파크의 파티셔닝은 모든 RDD의 키/값 쌍에 대해 가능하며, 시스템이 각 키에 제공된 함수에 따라 값들을 그룹화하도록 합니다. 스파크에서는 각 키가 어떤 노드로 전달되는지 같은 명시적인 제어를 제공하지는 않지만, 어떤 키의 모음들이 임의의 노드에 함께 모여 있게 되는 것은 보장해 줍니다. 예를 들면 Hash-Partition과 Range-Partition이 있습니다. HashPartition은 100으로 나눈 나머지에 대해 동일한 해시 값을 갖는 키들은 동일한 노드에 오게 됩니다. RangePartition은 같은 범위의 키들이 같은 노드에 모이도록 RDD를 범위별로 파티셔닝을 수행한 것을 말합니다.

  예를들면 메모리에 커다란 사용자 정보 테이블을 가지는 애플리케이션을 생각해보면이 프로그램은 (UserID, UserInfo)의 쌍의 RDD를 쓰며 UserInfo는 사용자가 구독하는 주제 리스트를 갖고 있습니다. 애필리케이션은 주기적으로 이 테이블을 최근 5분간 일어난 이벤트 정보를 갖는 작은 파일과 연동 처리를 하게 되는데, 이 파일은 (UserID, LinkInfo) 쌍의 테이블이며 5분간 각 사용자가 어떤 웹 사이트를 클릭했는지의 정보를 갖고 있습니다. 이제 이 정보로 얼마나 많은 사용자가 직접 구독하는 주제와 상관없는 링크에 방문하는지 알 수 있습니다. 이 작업을 위해서는 join() 연산을 적용해 봅시다.

  이 코드는 기대한 대로 동작은하겠지만 효율적이지 않습니다. 이는 processNewLogs()가 매번 호출될 때마다 불리는 join()이 데이터세트에서 키가 어떻게 파티션되어 있는지에 대해 알지 못하기 때문입니다. 기본적으로 이 연산은 양쪽 데이터세트를 모두 해싱하고 동일 해싱키의 데이터끼리 네트워크로 보내 동일 머신에 모이도록 한 후 해당 머신에서 동일한 키의 데이터끼리 조인을 수행하는게 좋습니다. userData 테이블은 5분마다 갱신되는 정도의 events 테이블의 로그양보다는 매우 클 것이 예상되므로 이는 심각한 리소스의 낭비입니다. 심지어 userData에 아무 변경이 없어도 함수가 호출될 때마다 userData는 해싱 작업을 거쳐 네트워크로 셔플링이 됩니다. 

val sc = new SparkContext(...) 
val userData = sc.sequenceFile[UserId, UserInfo]("hdfs://...").partitionBy(new HashPartitioner(100)).persist()

 

  이를 개선하기 위한 방법은 간단합니다. 프로그램 시작 때 한번만 해시 파티션을 하도록 userData에 partitionBy()를 사용하는 것입니다. 이렇게 되면 processNewLogs()는 변경을 할 필요가 없습니다. processNewLogs에 있는 eventsRDD는 지역변수의 형태이므로 이 메소드에서 한 번 쓰이고 버려집니다. 그래서 events를 위한 파티셔너(partitioner) 지정은 이득 볼 것이 없습니다. 이제 userData를 만들면서 partitionBy()를 호출 했으므로 스파크는 이미 userData가 해시 파티션되어 있음을 알고 이 정보를 최대한 활용할 것입니다. 좀 더 자세히 보면, userData.join(events)를 호출할 때 스파크는 오직 eventsRDD만 셔플해서 이벤트 데이터를 각각의 UserId와 맞는 userData해시 파티션이 있는 머신으로 전송할 것입니다. 결과적으로 매우 적은 네트워크비용을 쓰지만 속도는 매우 빨라지게 됩니다. (스칼라와 자바에서는 HashPartitioner를 사용이 가능하지만 python에서는 partitionBy()에서 개수만 전달이 가능합니다.

 

RDD 파티셔너 정하기

val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))
// pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:28
pairs.partitioner
// res40: Option[org.apache.spark.Partitioner] = None
import org.apache.spark.HashPartitioner
// import org.apache.spark.HashPartitioner  
val partitioned = pairs.partitionBy(new HashPartitioner(2)).persist()
// partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[34] at partitionBy at <console>:31 
partitioned.partitioner
// res41: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)

 

  스칼라와 자바에서는 RDD가 어떻게 파티션이 될지 partitioner 속성을 사용해 결정을 할 수 있다. 최초에 pairs를 생성한 뒤에 pairs.partitioner를 통해 파티셔너가 어떻게 설정이 되어있는지 확인을 할 수 있다. None으로 존재하지 않는 것을 볼 수 있다. HashPartitioner를 추가한 뒤에 pairs를 partitionBy와 함께 HashPartitioner의 객체를 파티션의 개수와 함께 전달한다. partitioned의 파티셔너를 확인하면 HashPartitioner로 설정이 되어 있는 것을 알 수 있다. 만약 partitioned를 반복해서 사용한다면 partitionBy의 라인에 persist()를 사용해주면 된다. 

 

[참고] 서적 - 러닝스파크

 

출처 : ourcstory.tistory.com/156?category=630696