RDD: 9개의 글
들어가며 스파크에서 구현을 하다 보면 각 객체간의 변환(?)이 자유로워야 하는것 같다. 예를 들면 RDD에서 추출한 데이터를 DataFrame으로 생성한다든지, DataFrame에서 여러개의 Row를 추출해서 새로운 RDD를 생성한다는 식의 경우를 말한다. 이번에 당면한 문제는 Json Object의 요소를 저장하고 있는 Seq의 Collection을 갖고 있었고, 이 Collection을 바탕으로 DataFrame의 생성이 필요했다. 이번에는 Seq[org.apache.spark.sql.Row]의 데이터를 RDD와 DataFrame으로 변환하는 방법에 대해서 정리를 해보려고 한다. 소스코드 logData는 org.apache.spark.sql.Row의 값을 갖고 있다. logData에서 field명이 ..
RDD데이터 파티셔닝 - 이론 및 예제 이번에 설명한 내용은 스파크에서 노드 간 데이터세트의 파티셔닝을 어떻게 제어할 것인가 하는 것이다. 분산 프로그램에서 통신은 비용이 매우 크므로 네트워크 부하를 최소화할 수 있는 데이터 배치는 프로그램 성능을 비약적으로 향상시킬 수 있습니다. 비분산프로그램이 수많은 데이터 레코드 처리를 위해 올바른 자료 구조를 선택할 피ㄹ요가 있는 것처럼, 스파크의 애플리케이션도 네트워크 비용을 줄이기 위해서는 RDD의 파티셔닝을 제어해야 합니다. 파티셔닝은 조인 같이 키 중심의 연산에서 데이터세트가 여러번 재활용 될 때만 의미가 있습니다. val sc = new SparkContext(...) val userData = sc.sequenceFile[UserId, UserInfo]..
RDD 영속화(캐싱) - 이론 및 예제 동일한 RDD를 여러 번 사용하고 싶을 때도 있을 것이다. 생각없이 이를 시도한다면 스파크는 RDD와 RDD에서 호출하는 액션들에 대한 모든 의존성을 재연산하게 된다. 이는 데이터를 여러 번 스캔하는 반복 알고리즘들에 대해서는 매우 무거운 작업일 수 있다. RDD를 여러 번 반복 연산하는 것을 피하려면 스파크에 데이터 영속화(persist/persistence)를 요청을 할 수 있다. RDD 영속화에 대한 요청을 하면 RDD를 계산한 노드들은 그 파트션들을 저장하고 있게 된다. 영속화된 데이터를 갖고 있는 노드에 장애가 생기면 스파크는 필요 시 유실되ㄴ 데이터 파티션을 재연산한다. 만약 지연 없이 노드 장애에 대응하고 싶다면 데이터를 복제하는 정책을 선택할 수도 있..
Optimizing Transformations and Actions 아래 내용을 중심으로 학습을 진행했습니다. Use advanced RDD operations Identify what operations cause shuffling Understand how to avoid shuffling when possible Group, combine, reduce key-value pairs Advanced RDD Operations advanced RDD의 연산에 대해서 알아보도록 하겠습니다. Numeric RDDs는 statistical 연산을 할 수 있습니다. 해당 연산을 통해 standard deviation, sum, mean, max, min, 등의 통계적이 연산이 가능합니다. mapPartiti..
RDD Architecture 아래 내용을 기준으로 학습했습니다. 어떻게 RDDs를 생성하는지에 대해서 이해 RDD의 performance를 증가시키기 위해 partitions을 관리 RDD resilient를 만들게 하는게 무엇인지 RDDs가 jobs, stages에서 broken이 되었을때 어떻게 처리하는지 Serialize tasks RDD Review RDD에 대해서는 이전에도 언급한바가 있기 때문에 간단하게 언급하면, Spark = RDD라고 생각하면 된다. 그 만큼 RDD는 Spark를 이해하는데 있어 가장 중요한 요소라고 할 수 있습니다. RDD는 여러개의 파티션들로 구성이 되어있고, 파티션의 개수는 Spark에서 클러스터의 CPU의 코어의 개수를 기반으로 결정이 됩니다. 여기서 언급하는 파..
Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing (2012)에 나온 논문을 읽어보았다. 데이터 중간 결과를 재사용하는 iterative한 연산이 필요한 알고리즘들 machine learning and graph algorithms, PageRank, K-means clustering, logstic regression, data mining이 있다. 위 알고리즘을 MapReduce에서 처리를 하게 되면 Map과 Reduce의 사이에서 data replication, disk I/O, serialization의 overhead가 발생한다. 즉, MapReduce가 iteration에서 수행..
Interactive Analysis with the Spark Shell API를 쉽게 습득하기 위해서는 spark's shell만큼 좋은게 없습니다. python과 scala를 통해서 할 수 있습니다. 이번에는 RDD를 생성하는 방법, MapReduce를 Spark에서 구현하는 방법과, caching하는 방법에 대해서 설명하려고 합니다. 마지막에는 내가 구현한 앱을 사용자들에게 배포하기 위해 sbt build tool을 이용해 배포하는 방법에 대해서 설명합니다. Resilient Distributed Dataset(RDD) 생성하기 위해서는 hadoop hdfs file을 읽거나, 기존에 있는 rdds를 transforming을 통해서 변환이 가능합니다. RDD에서 제공하는 method는 action..
RDD란? 분산되어 존재하는 데이터 요소들의 모임이라고 생각하시면 됩니다. RDD는 java의 String처럼 변경이 불가능한 객체(immutable)의 집합으로 각각의 RDD는 여러개의 파티션으로 분리가 됩니다. 변경을 하기 위해서는 새로운 RDD를 만들거나, 존재하는 RDD를 변형, 결과 계산을 위해 RDD에서 연산을 호출하여 생성을 해야 합니다. Spark는 자동으로 RDD에 있는 데이터들을 클러스터에 분배, 수행하는 연산들을 병렬화 합니다. Spark는 RDD를 lazy evaluation으로 액션을 사용하는 시점에서 처리하기 때문에, 구현할때는 transformation과 action의 operation을 확실히 이해하고 있어야 결과를 받아보는데 효율적으로 구현이 가능합니다. 예를 들어 한 라인..
분산된 이뮤터블 자바 객체 컬렉션인 RDD(Resilient Disributed Data)는 연산을 매우 빠르게 하며 아파치 스파크의 핵심입니다. 이름에서 알 수 있듯이, 데이터셋은 분산돼 있습니다. 데이터셋은 키를 기반으로 덩어리(Chunk) 단위로 쪼개져 있고 실행 노드(Executor Node)로 분산돼 있습니다. 이렇게 함으로써 데이터셋에 대한 연산 속도를 매우 빠르게 할 수 있습니다. RDD는 각각 덩어리에 적용된 모든 트랜스포메이션을 추적하는데, 이는 연산 속도를 빠르게 하고 어떤 문제로 인해 데이터 손실이 발생했을 때 대비책을 제공하기 위함입니다.(데이터 손실이 발생할 경우 RDD는 데이터를 복구시킬 수 있습니다) 데이터 흐름은 데이터 손실에 대한 다른 방어책으로 데이터 복제가 아닌 다른 방..