[Spark] Caching and Serialization

2021. 5. 6. 01:43 Big Data/Apache Spark

Caching and Serialization

아래 내용을 기준으로 학습을 진행했습니다.

  • 어떻게 언제 RDD를 cache를 해야 하는지?
  • Storage level과 사용은 어떻게 하는지?
  • memory 사용을 최적화 하기 위해서 어떻게 하는지? 
  • RDDs를 공유하려면 어떻게 해야 하는지?

 

Persistence

  스파크는 MapReduce와 가장 다른점이 disk I/O가 최소화 되기 때문에 interative한 연산에 적합하다고 말을 합니다. 그러나 추가적으로 성능향상을 얻기 위해서는 RDD를 다루는 방법에 대해서 정확하게 이해를 해야합니다. Spark에서는 memory chaching을 사용하는 것으 알고 있습니다. 예를 들어서 RDD데이터를 생성하고 계속 반복적으로 사용을 한다면, 사용한다는 말은 action의 연산을 반복적으로 한다면 spark에서는 반복적으로 RDD객체를 생성을 합니다. 한마디로 작은 데이터를 읽는 작업을 반복하는것도 비효율적인데, 큰 데이터를 반복적으로 action 연산을 한다면 이 보다 비효율적일수가 없습니다. 그렇기 때문에 spark에서는 persist를 지원합니다. 여기서도 persist를 어떻게 언제하는지가 굉장히 중요합니다. 사용하고자 하는 데이터의 크기가 메모리의 크기보다 더 클 경우에는 disk를 함께 사용이 가능합니다. disk를 사용하면 다시 메모리로 올릴때 reconstitute를 해야합니다. 

  persist를 하는 시기에 대해서 알아보면, 이상적으로 pruning, filtering 등의 downstream 처리를 필요로 하는 transformations을 한 뒤에 하는게 좋습니다. 예를 들어서 file을 로딩하고, parsing을 하고, key를 기반으로 파티셔닝을했을때, 만약 filter, map, 그리고 shuffle을 여러번 반복을하는 작업을 통해 RDD를 재 사용한다면, root RDD를 cache하는 것은 좋은 방법이 아닙니다. paritionBy를 한 뒤에 caching을 하는게 더 좋은 방법이라고 할 수 있습니다.

  더이상 persisted RDD를 사용하지 않을 경우에는, unpersist를 통해서 간단하게 삭제가 가능합니다. Spark에서는 unpersist를 하지 않아도 LRU(Least Recently Used)의 알고리즘을 통해서 새로운 RDDs의 공간을 생성을 합니다. 

 

Where to Persist

  RDD lineage에 하나의 RDD는 filter를 통해 pruning이 되었고, 다른 하나의 RDD와 join을 하고, 그 결과를 reduceByKey와 mapValues를 한다면 언제 어느 시점에 persist를 해야할지 생각을 해봐야 합니다. filter를 한 뒤의 RDD를? 아니면 join을 한뒤에? 아니면 reduceByKey를 한뒤에? 가장 좋은 위치는 RDD가 reduce와 map의 action을 하기 전인, join을 한 뒤에 입니다.

 

Storage Levels

  Storage Levels은 내가 어디의 저장소까지 사용을 할 것인가에 대한 정의를 할 수 있는 것을 의미합니다. Spark에서는 기본으로 MEMORY_ONLY로 동작을 합니다. 그렇기 때문에 cache()의 함수를 통해서 persist를 할 수 있습니다. 

  만약 RDD가 memory에 전부 로딩이 안될 경우에는 DISK옵션을 사용해 디스크와 메모리를 함께 사용을 할 수 있습니다. 하지만 위에서 언급했던것과 같이 디스크를 사용하게 되면 reconstitute를 해야합니다. 

 

Storage Cost

  RDD를 persist하게 되면 모든 객체를 메모리에 유지를 해야합니다. 그렇게 되면 RDD를 저장하는 메모리의양이 거의 대부분을 차지할 수도 있습니다. RDD의 사용 공간을 절약하기 위해 serialize를 하게 됩니다. RDD의 records는 large byte array의 형태로 저장이 되고, data를 deserialize하기 위해서는 CPU를 사용하게 됩니다. 그러나 각각의 record들의 작고 많은 데이터를 저장하는 것보다 하나의 객체(byte array)로 저장을 함으로써 garbage collection에 도움을 주게 됩니다.

  spark에서는 Java serializer를 기본 serializer로 사용을 하고 있고, python에서는 pickle을 사용하고 있습니다. 다른 옵션으로 Java와 Scala에서는 Kryo serializer를 사용할 수 있습니다. kryo는 기존 Java serializer보다 potential compatibility의 코스트 측면에서 더 효율적입니다. 

  또하나 Storage cost를 절약할 수 있는 방법은 java와 scala의 collections을 사용하기 보다는 primitive types을 사용하면 최적화가 가능합니다. 또한 작은 objects을 포함한 nested classes의 사용을 줄여야 합니다. 그렇지 않으면 garbage collection의 overhead가 발생하게 됩니다.

 

Storage Level Comparison

  Storage Cost가 Serialization의 수행 여부에 따라 달라지는 것을 확인하기 위해, 대략 36MB의 파일을 읽게 되면 파일을 Java objects의 사이즈로 변환이 되기 때문에 persisted dataset의 크기는 기존의 파일보다 더 큰 162.8MB가 되게 됩니다. 반대로 Java serializer를 사용해 Serialized data는 52.3MB로 기존 파일의 크기보다 약간 큰 것을 확인할 수 있습니다. Kryo Serializer를 사용하면, 기본 Java Serializer보다 더 작고, 기존 파일의 크기보다 작은 34.1MB의 크기를 보여주는 것을 확인할 수 있습니다.

 

Using Kryo

  Kryo Serializer를 사용하는 방법은 간단합니다. 첫번째로 SparkConf의 객체를 생성하고, spark.serializer의 파라미터에 KryoSerializer로 세팅을 하고, serialize를 할 class를 각각 등록을 해주면 됩니다. (여기서는 Trip과 Station의 class를 KryoClasses로 등록을 했습니다.) 마지막으로 SparkContext의 객체를 위에서 세팅하고, 등록한 SparkConf를 이용해 생성하면 됩니다. Kryo에 의해 Serialization을 하기 위해서는 storage level을 MEMORY_ONLY_SER로 사용을 하면 됩니다.

 

Sharing RDDs

  Spark에서 여러개의 Application을 제출(submit)을 하게 되면, RDD를 Appilcation사이에 공유가 필요할때가 생ㄱ비니다. 하지만 RDDs는 SparkContext에 속해있기 때문에 사실상 불가능합니다. 하지만 Tachyon Proeject를 사용하면 RDD를 효율적으로 공유가 가능합니다. Tachyon Project는 high-speed memory-based Distributed File System을 말합니다. 

 

Memory Configuration

  • Spark.rdd.compress는 serialized RDDs를 compress를 할지 안할지에 대한 설정값입니다.
  • spark.suffle.consolidateFiles는 shuffle 동안 생성된 중간 파일들을 통합하여, 기존 작고 많은 파일을 큰 파일의 형태로 생성을 합니다. 이렇게 하게 되면 disk I/O에 잠재적으로 성능 향상을 가져다 줄 수 있습니다. 문서를 보면 ex4또는 xfs filesystem에서는 true의 값으로, ext3에서는 false의 값을 설정하는게 좋다고 언급을 합니다. ext3은 8개의 코어를 갖은 machines에서는 성능 저하를 가져올 수 있기 때문입니다. 
  • spark.shuffle.spill은 disk로 data를 채우(spill)는 reduces의 작업을 하는동안 사용하는 memory의 양을 제한합니다. spill threshold는 spark.shuffle.memoryFraction에 의해 설정이 됩니다. 
  • spark.storage.memoryFraction은 in-memory persistence를 storage에 얼마나 사용을 할 것인가를 나타냅니다.

(여기는 사용해보지 않으니 정확히 어떤값을 의미하는지 잘모르겠다..... 한번 확인을 전체적으로 해볼 필요는 있을것 같구나.)

[참고] Big Data University - Spark Fundamentals II

 

출처 : ourcstory.tistory.com/149?category=630696