Spark - RDD

2021. 5. 6. 01:32 Big Data/Apache Spark

RDD란?

 분산되어 존재하는 데이터 요소들의 모임이라고 생각하시면 됩니다. RDD는 java의 String처럼 변경이 불가능한 객체(immutable)의 집합으로 각각의 RDD는 여러개의 파티션으로 분리가 됩니다. 변경을 하기 위해서는 새로운 RDD를 만들거나, 존재하는 RDD를 변형, 결과 계산을 위해 RDD에서 연산을 호출하여 생성을 해야 합니다. Spark는 자동으로 RDD에 있는 데이터들을 클러스터에 분배, 수행하는 연산들을 병렬화 합니다. Spark는 RDD를 lazy evaluation으로 액션을 사용하는 시점에서 처리하기 때문에, 구현할때는 transformation과 action의 operation을 확실히 이해하고 있어야 결과를 받아보는데 효율적으로 구현이 가능합니다. 예를 들어 한 라인에 action이 포함된 transformation을 할 경우에는 결과를 출력하기 때문에 불필요한 반환값을 받을 수 있습니다. action을 만나기 전까지 transformation을 처리하지 않습니다. 즉 RDD는 데이터를 갖고 있기 보다는 reference를 가지고 있습니다.

RDD 생성하는 방법

  RDD를 생성하기 위해서는 외부로 부터 데이터를 로딩하거나, 드라이버 프로그램내에 생성된 parallelize의 메소드를 이용해 RDD를 생성할 수 있다.
val lines2 = sc.parallelize(List("banana", "i like banana"))

Apache Key/Value Pairs

RDD에는 어떤 데이터의 형식도 저장이 가능합니다. 그 중에서 Pair RDD라는 RDD가 있는데, 이 RDD는 Key-Value 형태로 데이터를 저장하기 때문에, 병렬 데이터처리 부분에서 그룹핑과 같은 추가적인 기능을 사용할 수 있습니다. key를 기반으로 연산을 할때 유용하게 사용을 할 수 있습니다. (e.g. reduceByKey, groupByKey 등의 연산사용시)

- 단일 pair RDD(집합): reduceBykey, groupBykey, mapValues...
- 두개의 pair RDD(그룹화): subtractBykey, join, cogroup...

Pair RDD를 생성하는 방법은 아래와 같습니다.

lines = sc.textFile("data.txt")
paris = lines.map(lambda s: (s, 1))
count = pairs.reduceByKey(lambda a, b: a + b)

[참고] https://spark.apache.org/docs/latest/programming-guide.html#shuffle-operations

 

Data Partitioning

분산 프로그램에서 통신 비용이 매우 크므로 네트워크 부하를 줄이는 것이 중요합니다. 파티셔닝은 네트워크 부하를 효율적으로 줄이는 방법입니다.

  • 어떤 키의 모음들이 임의의 노드에 함께 모여 있는 것을 보장
  • 키 중심의 연산에서 데이터가 여러번 재 사용될때만 의미가 있음
  • ex) join
  • 스파크에서 파티셔닝
  • Hash기반, Range 기반, 사용자 지정

[참고] http://www.slideshare.net/HyeonSeokChoi/pair-rdd-spark

 

 

RDD Operation

 

  RDD에서 제공하는 연산(operation)은 transformation과 action이 있습니다. RDD의 데이터 로딩 방식은 Lazy 로딩 컨셉을 사용하는데, 예를 들어서 sc.textFile("text.txt")로 파일을 로딩하더라도 실제로 데이터가 로딩이 되지 않습니다. 파일을 실제로 메모리에 올라가는 시점은 action의 operation을 사용할때만 로딩이 됩니다. 그 이유는 filter, map이라는 transformation의 작업을 통해 데이터를 최대한 필요한 부분으로 정재를 한 뒤에 action을 수행하기 위해서 입니다. 정재를 한 후에 메모리에 로딩하는 이유는 처음부터 그 많은 데이터를 업로드 할 수 없고, 할 필요도 없기 때문입니다. 여기서 주의해야할 사항은 action을 수행한 뒤에는 메모리에서 데이터가 바로 지워진다는 점입니다. 이 문제를 해결하기 위해서는 아래 설명할 caching을 보시면 됩니다. 

    • Transformation

기존의 RDD 데이터를 변경하여 새로운 RDD 데이터를 생성해 내는 것, 흔한 케이스는 filter와 같이 특정 데이터를 뽑아 내거나 map 함수처럼, 데이터를 분산 배치 하는 것 등을 말합니다.

    • Action

실제 transformation의 operation을 통해 생성된 RDD의 결과를 확인하기 위해서는 action operation을 사용해야 합니다. count(), first(), saveAsTextFile(path) 등을 통해 데이터를 메모리에 로딩 하거나, RDD객체를 파일의 형태로 저장이 가능합니다.

예제

  days.log의 파일을 읽어서, file이라는 RDD를 생성합니다. RDD에서 "ERROR"의 문자열을 가진 행을 가지고 와서, 새로운 lines라는 RDD를 생성했습니다. 결과적으로 Driver에서 결과를 확인하기 위해 collect()함수를 이용해 화면에 출력하는 예제입니다. transformation의 operation을 했을때는 메모리에 올리는 작업을 하지 않습니다. 그렇기 때문에 filter를 하고 나서의 결과만을 collect라는 함수를 호출했을때 메모리에 올라갑니다. 여기서 collect()는 drvier의 메모리로 가져오기 때문에 많은 양의 데이터를 가져오는 경우 주의해야 합니다.

file = sc.textFile("day.log")
lines = file.filter(lambda x : "ERROR" in x)
for line in lines.collect():
	print l

[참고] https://spark.apache.org/docs/latest/programming-guide.html#transformations

[참고] https://spark.apache.org/docs/latest/programming-guide.html#actions

 

Cache

  앞서 언급한것 처럼 Transformation과 Action의 Operation을 통해 데이터를 처리하고, 메모리에 로딩하는 작업을 합니다. 하지만 Action의 operation을 사용하고나면, 데이터는 결과를 반환하고 메모리에서 사라지게 됩니다. 즉, 한번 action을 할때마다 데이터를 처리하고 로딩하고 반환하는 작업을 반복적으로 하게 됩니다. 그렇다면 효율적으로 구현하기 위해서는 RDD객체를 메모리에 올린 상태에서 재활용(?)해서 사용할 수 있는 방법이 필요합니다. Persist vs Cache cache는 persist에서 저장 옵션을 MEMORY_ONLY로 한 옵션과 동일하다. LRU (Least Recently Used) 알고리즘에 의해 삭제되거나, RDD.unpersist()함수를 호출하면 명시적으로 메모리나 디스크에서 삭제가 가능하다. 당연히 가장 좋은 옵션은 디폴트 옵션 MEMORY_ONLY옵션이고, 메모리가 부족한 경우 MEMORY_ONLY_SER 옵션을 이용하면, Serialized 형태로 저장하기 때문에 메모리 공간은 줄일 수 있으나 Serialize-Deserialize 의 오버로드가 증가해 CPU사용률이 올라갑니다. 데이터의 양이 많을 경우에는 DISK에 저장하는 옵션보다는 차라리 persist하지 않고, 필요할 때마다 재 계산하는 것이 더 빠를 수 있습니다. 만약 동일한 작업을 두번 이상하기 위해서는 RDD.persist()라는 메소드를 이용해, RDD를 메모리에 상주 시킬 수 있습니다. 

  RDD를 메모리에 로딩을 하고 사용하는 함수로 persist()와 cache()두 가지 operation을 지원합니다. 이때 저장하는 옵션을 선택이 가능합니다. 내가 메모리에 로딩하고자 하는 데이터의 크기가 보유한 메모리의 크기보다 크면 로딩이 어렵기 때문에 disk와 memory를 동시에 사용할 수 있습니다. memory만 사용하는게 물론 성능면에서는 우수합니다. 기본 default값은 memory에 저장하고, 옵션을 통해 변경이 가능합니다. 자세한 옵션은 아래 사이트를 참고하세요.

[참고] https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence

 

RDD객체 저장

  만약 내가 처리중이거나, 처리가 완료된 RDD객체를 저장 할 수 있습니다. 저장을 하는 방법은 RDD객체의 그대로 raw형태로 저장하거나, serialized의 형태로 저장이 가능합니다. 아무래도 raw형태로 저장하면, serialize/deserialize하는 비용이 발생하지 않기 때문에 저장 속도가 빠를 수 있지만, 메모리의 사용량이 많습니다. 그에 반해 serialized형태로 저장하면 serialize와 deserialize를 하기 때문에 메모리의 사용량은 줄일 수 있지만, 쓰기/읽기 하는 과정에서 오버로드가 증가하는 단점이 있습니다. cf. 내가 처리해야 하는 양의 데이터가 메모리에 비해 너무 클때는 Serialize-Deserialize를 수행하고, 메모리가 충분하면 RDD형태 그대로 저장을 하면 됩니다.

  스파크는 Java로 구현되어 있기 때문에, JVM상에서 도작을 합니다. 그렇다면 JVM의 특징을 그대로 갖고 있겠지요? JVM의 장점이자 단점인, garbage collection에 의한 성능 제약을 받을 수 있습니다. 우리가 평소에 java로 코딩을 할때 garbage collector의 역할로 메모리를 free하는 작업을 안해서 코딩할때 메모리를 크게 생각을 안하고 했었는데, spark에서는 우리에게 이렇게 도움을 주는 garbage collector에 의해서 성능저하를 가져올 수 있습니다.

Spark에서는 OFF_HEAP이라는 옵션이 있는데, 이 옵션은 위 문제를 해결하기 위해 JVM내에 데이터를 저장하는게 아니라, 별도의 메모리 공간에 데이터를 저장하는 방식이 OFF_HEAP옵션입니다. 그러면 garbage collector가 동작하지 않기때문에 성능저하가 일어나지 않습니다.

 

[참고] https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
[참고] https://dzone.com/refcardz/apache-spark

 

출처 : ourcstory.tistory.com/127?category=630696