spark: 20개의 글
PySpark 에서 NoneType을 Filtering 하는 방법 (any, all, subset) pyspark에서 drop method는 NULL을 가진 행을 제거하는데 가장 간단한 함수다. 기본적으로 NULL 값을 가진 행을 모두 제거를 한다. 모든 컬럼이 NULL인 경우 제거를 하고, 하나의 컬럼이 NULL인 경우 제거를 하고 싶은 경우가 있을것이다. 이런 경우에 어떻게 다르게 진행하는지 "any", "all"을 통해 설명 위 작업을 SQL에서 진행한다면 WHERE 절에 해당 컬림이 NULL인지 체크하는 구문을 넣어야 한다. 만약 모든 컬럼에 대해서 해야 하면? 모든 컬럼을 명시해야하는 단점이 있다. (SELECT * FROM TABLE WHERE COL1 IS NOT NULL) drop 메소드에..
Spark에서 제공하는 라이브러리 외에도 형태소 분석기나, 기존에 우리가 사용하던 라이브러리를 사용하는 방법에 대해서 설명한다. 클러스터의 모든 슬레이브에 /var/lib/의 이하 폴더에 사용할 라이브러리를 추가해 놓은 상태입니다. spark-shell이나 spark-submit을 할때 아래와 같이 --driver-class-path를 통해 라이브러리를 포함시키면 됩니다. 하지만 하나씩 하는건 무리가 있으니 아래와 같이 실행하면 모든 library를 한번에 import 할 수 있습니다. $ spark-shell --driver-class-path $(echo /var/lib/spark/*.jar | tr ' ' ',') 하지만? 위처럼 하면 아래와 같은 Exception이 발생을 합니다. class의 p..
들어가며 데이터 분석에서 가장 많이, 그리고 자주 사용하는 field가 timestamp의 값이 아닐까 싶다. 그 중에서도 timestamp의 raw데이터를 통해서 새로운 feature를 생성하는 방법이 있다. 예를 들어 날짜별 사용자 방문 수 라고 할때, timestamp의 값에서 우리는 month와 day를 추출해서 새로운 feature의 값으로 사용을 해야 한다. 소스코드 DataFrame의 데이터 타입인 df에서 select의 함수를 통해서 원하는 Column을 추출을 하는 과정이다. timestamp_milli의 값은 ms의 단위의 값이기 때문에 1000을 나눈뒤에 spark에서 제공하는 functions의 라이브러리를 import한 뒤에 사용하면 된다. from_unixtime을 통해 date..
들어가며 스파크에서 구현을 하다 보면 각 객체간의 변환(?)이 자유로워야 하는것 같다. 예를 들면 RDD에서 추출한 데이터를 DataFrame으로 생성한다든지, DataFrame에서 여러개의 Row를 추출해서 새로운 RDD를 생성한다는 식의 경우를 말한다. 이번에 당면한 문제는 Json Object의 요소를 저장하고 있는 Seq의 Collection을 갖고 있었고, 이 Collection을 바탕으로 DataFrame의 생성이 필요했다. 이번에는 Seq[org.apache.spark.sql.Row]의 데이터를 RDD와 DataFrame으로 변환하는 방법에 대해서 정리를 해보려고 한다. 소스코드 logData는 org.apache.spark.sql.Row의 값을 갖고 있다. logData에서 field명이 ..
들어가며 스파크에서는 CSV, Json, Protocol Buffer, Hadoop에서 지원하는 데이터 포맷 등 다양한 포맷을 지원을 한다. 이번에는 Json파일을 읽어서 스키마를 확인을 하고, 스키마에 있는 필요한 데이터를 추출하는 방법에 대해서 알아보려고 한다. 데이터는 공개되어 있는 tweet 데이터를 사용하였고, tweet데이터에서 내가 필요한 데이터를 추출하기 위한 과정을 소스코드로 작성해 보았습니다. 설명 소스 코드는 tweet의 데이터를 읽어들인 이후에 schema를 확인을 하여 데이터의 포맷이 어떻게 이루어져 있는지 확인을 합니다. (아래 스키마 그림 첨부 했습니다.) 스파크는 lazy execution을 하기 때문에 결과를 확인하기 위해 .take(N)을 사용했습니다. 처음에 tweets..
파티셔닝 예제 - 페이지랭크(PageRank) 알고리즘 RDD 파티셔닝에 의한 효과를 볼 수 있는 좀 더 복잡한 알고리즘 예제로 페이지 랭크를 생각 할 수 있다. 페이지 랭크 알고리즘은 구글의 공동 창업자 Larry Page의 이름을 인용한 것으로 얼마나 많은 문서들이 해당 문서를 링크하고 있는지 기초하여 각 문서에 대해서 중요도를 매기는 알고리즘을 말한다. 페이지 랭크 알고리즘은 웹페이지의 중요도를 측정하는 척도로도 사용이 되지만, 과학 논문에서 어떤 논문이 중요한지 평가하거나, SNS의 영향력에 있는 허브유저를 찾아내는 데에도 사용이 되고 있다. 페이지 랭크의 단점은 많은 조인을 수행하는 반복알고리즘이다. 그렇기 때문에 많은 조인을 반복적을 효율적으로 처리가 가능한 RDD 파티셔닝을 이용하면 좋은 결..
RDD데이터 파티셔닝 - 이론 및 예제 이번에 설명한 내용은 스파크에서 노드 간 데이터세트의 파티셔닝을 어떻게 제어할 것인가 하는 것이다. 분산 프로그램에서 통신은 비용이 매우 크므로 네트워크 부하를 최소화할 수 있는 데이터 배치는 프로그램 성능을 비약적으로 향상시킬 수 있습니다. 비분산프로그램이 수많은 데이터 레코드 처리를 위해 올바른 자료 구조를 선택할 피ㄹ요가 있는 것처럼, 스파크의 애플리케이션도 네트워크 비용을 줄이기 위해서는 RDD의 파티셔닝을 제어해야 합니다. 파티셔닝은 조인 같이 키 중심의 연산에서 데이터세트가 여러번 재활용 될 때만 의미가 있습니다. val sc = new SparkContext(...) val userData = sc.sequenceFile[UserId, UserInfo]..
RDD 영속화(캐싱) - 이론 및 예제 동일한 RDD를 여러 번 사용하고 싶을 때도 있을 것이다. 생각없이 이를 시도한다면 스파크는 RDD와 RDD에서 호출하는 액션들에 대한 모든 의존성을 재연산하게 된다. 이는 데이터를 여러 번 스캔하는 반복 알고리즘들에 대해서는 매우 무거운 작업일 수 있다. RDD를 여러 번 반복 연산하는 것을 피하려면 스파크에 데이터 영속화(persist/persistence)를 요청을 할 수 있다. RDD 영속화에 대한 요청을 하면 RDD를 계산한 노드들은 그 파트션들을 저장하고 있게 된다. 영속화된 데이터를 갖고 있는 노드에 장애가 생기면 스파크는 필요 시 유실되ㄴ 데이터 파티션을 재연산한다. 만약 지연 없이 노드 장애에 대응하고 싶다면 데이터를 복제하는 정책을 선택할 수도 있..
Caching and Serialization 아래 내용을 기준으로 학습을 진행했습니다. 어떻게 언제 RDD를 cache를 해야 하는지? Storage level과 사용은 어떻게 하는지? memory 사용을 최적화 하기 위해서 어떻게 하는지? RDDs를 공유하려면 어떻게 해야 하는지? Persistence 스파크는 MapReduce와 가장 다른점이 disk I/O가 최소화 되기 때문에 interative한 연산에 적합하다고 말을 합니다. 그러나 추가적으로 성능향상을 얻기 위해서는 RDD를 다루는 방법에 대해서 정확하게 이해를 해야합니다. Spark에서는 memory chaching을 사용하는 것으 알고 있습니다. 예를 들어서 RDD데이터를 생성하고 계속 반복적으로 사용을 한다면, 사용한다는 말은 acti..
Optimizing Transformations and Actions 아래 내용을 중심으로 학습을 진행했습니다. Use advanced RDD operations Identify what operations cause shuffling Understand how to avoid shuffling when possible Group, combine, reduce key-value pairs Advanced RDD Operations advanced RDD의 연산에 대해서 알아보도록 하겠습니다. Numeric RDDs는 statistical 연산을 할 수 있습니다. 해당 연산을 통해 standard deviation, sum, mean, max, min, 등의 통계적이 연산이 가능합니다. mapPartiti..