[Spark] 스파크 RDD란?
분산된 이뮤터블 자바 객체 컬렉션인 RDD(Resilient Disributed Data)는 연산을 매우 빠르게 하며 아파치 스파크의 핵심입니다.
이름에서 알 수 있듯이, 데이터셋은 분산돼 있습니다. 데이터셋은 키를 기반으로 덩어리(Chunk) 단위로 쪼개져 있고 실행 노드(Executor Node)로 분산돼 있습니다. 이렇게 함으로써 데이터셋에 대한 연산 속도를 매우 빠르게 할 수 있습니다. RDD는 각각 덩어리에 적용된 모든 트랜스포메이션을 추적하는데, 이는 연산 속도를 빠르게 하고 어떤 문제로 인해 데이터 손실이 발생했을 때 대비책을 제공하기 위함입니다.(데이터 손실이 발생할 경우 RDD는 데이터를 복구시킬 수 있습니다) 데이터 흐름은 데이터 손실에 대한 다른 방어책으로 데이터 복제가 아닌 다른 방법을 이용해 데이터를 복구합니다.
RDD의 내부 작동 원리
RDD는 병렬도 동작하고, 이는 스파크의 가장 큰 장점입니다. 각 트랜스포메이션은 속도를 비약적으로 향상시키기 위해 실행됩니다.
데이터셋에 대한 트랜스포케이션은 게으릅니다. 이 말은 모든 트랜스포메이션은 데이터셋에 대한 액션이 호출됐을 때 실행된다는 뜻입니다. 이로 인해 스파크의 실행이 최적화됩니다. 예를 들어, 데이터 분석가가 데이터셋에 익숙해지기 위해 수행하는 다음 일반적인 과정을 보겠습니다.
1. 특정 칼럼의 고유한 값의 개수를 세기
2. A로 시작하는 것들 찾기
3. 스크린에 결과 출력하기
위의 언급된 과정과 같이 간단하게, A로 시작하는 단어들이 궁금하다면, 다른 아이템의 고유 값 개수를 세는 것은 의미없는 작업입니다. 그러므로 스파크는 위의 실행 과정을 순서대로 따르지 않으며, A로 시작하는 단어를 세고 결과를 출력하는 작업만 수행합니다.
이 과정을 코드로 나눠보겠습니다. 첫번째로는 .map(lambda v: (v, 1))를 이용해 스파크가 A를 포함하는 단어를 모으게 하고, .filter(lambda val: val.startswith('A')) method) 를 사용해 A로 시작하는 단어를 필터링합니다. 여기서 .reduceByKey(operator.add)를 호출하면, 각 키마다 출현한 횟수를 모두 더해 데이터셋이 줄어들게 됩니다. 이 모든 단계들이 데이터셋을 트랜스폼(transform)합니다.
두번째로는 .collect() 함수를 호출해 단계를 수행합니다. 이 단계가 우리의 데이터셋에 대한 액션입니다. (이 과정으로 데이터셋에서의 고유 데이터 갯수를 세게 됩니다.)
결과적으로 액션은 트랜스포케이션의 순서를 역순으로 해서, 매핑을 먼저하고 데이터를 필터링합니다. 이로 인해 더 적은 데이터가 리듀서(Reducer)에 전달 됩니다.
RDD 생성하기
파이스파크에서 RDD를 생성하는 방법은 두 가지입니다.
data = sc.parallelize([('Amber', 22), ('Alfred', 23), ('Skye', 4), ('Albert', 12), ('Amber', 9)])
위와 같이 컬렉션에 대해 parallelize() 함수를 수행하거나, 외부 어딘가 또는 특정 위치에 저장된 파일을 읽을 수 있습니다.
data_from_file = sc.textFile('/User/drabast/Documents/PySpark_Data/VS14MORT.txt.gz', 4)
sc.textFile(..., n)의 마지막 파라미터 n은 데이터셋이 나눠진 파티션의 개수를 의미합니다. 경험상으로 한 클러스터에서는 두 개에서 네 개 정도의 파티션으로 데이터셋을 나누는 것이 가장 좋습니다. 스파크는 NTFS, FAT, Mac OS Extended(HFS+) 또는 HDFS, S3, 카산드라 등의 분산 파일시스템과 같은 여러 개의 파일시스템으로부터 데이터를 읽을 수 있습니다. 데이터셋을 읽고 저장하는 곳에 주의해야 합니다. 경로는 특수문자를 포함할 수 없습니다. 이는 아마존 S3나 마이크로소프트 Azure 데이터 저장소에 저장된 경로에도 적용됩니다.
여러가지 데이터 포맷이 지원됩니다. 텍스트, 파케이, JSON, 하이브 테이블, 그리고 관계형 데이터베이스에서의 데이터는 JDBC 드라이버로 읽을 수 있습니다. 스파크는 압축된 데이터셋도 읽어들여서 작업할 수 있습니다.
데이터가 어떻게 읽히느냐에 따라서, 데이터를 지니고 있는 객체는 조금씩 다르게 표현됩니다. 데이터가 읽히는 파일은 MapPartitionsRDD로 표현됩니다. 컬렉션에 대해 .parallelize() 함수를 돌릴 때처럼 ParallelCollectionRDD를 사용하지 않습니다.
* 스키마
다음 장에서 설명할 데이터프레임과 달리 RDD는 스키마리스(schema-less) 데이터 구조입니다. 그러므로 RDD를 사용할 때 데이터셋을 병렬 처리하는 것은 전혀 문제가 되지 않습니다.
data_heterogenous = sc.parallelize([ ('Ferrari', 'fast'), {'Porsche': 100000}, ['Spain', 'visited', 4504] ]).collect()
그래서 튜플(tuple), 사전(dict), 리스트(list)와 같은 거의 모든 데이터 구조를 섞을 수 있고, 이는 스파크에서 전혀 문제가 되지 않습니다.
데이터셋에 대해 .collect() 함수를 수행하면(즉, 데이터셋을 드라이버에 다시 가져오기 위해 액션을 수행하면) 파이썬에서 일반적으로 했던 방식대로 객체 내의 데이터에 접근할 수 있습니다.
data_heterogenous[1]['Porsche']
이 코드는 100000을 출력합니다.
.collect() 함수는 RDD의 모든 엘리먼트(element)를 드라이버에 리턴하고, 드라이버에서 엘리먼트들은 리스트로 나열됩니다.
* 파일로부터 데이터 읽기
텍스트 파일로부터 데이터를 읽을 때, 파일의 각 행이 RDD의 한 엘리먼트를 이룹니다. data_from_file.take(1) 함수로 결과를 직접 출력해보면 출력결과가 읽기 편하지는 않습니다. 이를 더욱 일기 편하게 하기 위해 각각의 값들이 리스트로 표현될 수 있도록 만들어보겠습니다.
* 람다 표현
이 예제에서는 data_from_file에서처럼 알아보기 힘든 데이터로부터 유용한 정보를 추출할 것입니다. 먼저, 행을 읽을 수 있는 형태로 파생해주는 다음 코드를 보고 함수를 정의해보겠습니다.
def extractInformation(row): import re import numpy as np selected indices = [2,4,5,6,7,9,10,11,12,13,14,15,16,17,18, ... 77,78,79,81,82,83,84,85,87,89 ] record_split = re.compile(r'([\s]{19})([0-9]{1})([\s]{40}) ... ([\s]{33})([0-9\s]{3})([0-9\s]{1})([0-9\s]{1}) try: rs = np.array(record_split.split(row))[selected_indices] except: rs = np.array(['-99']*len(selected_indeces)) return rs
여기서 주의할 점이 있습니다. 일반적인 파이썬 함수를 선언하면 스파크가 파이썬 인터프리터와 JVM을 지속적으로 스위치해야 하기 때문에 애플리케이션이 느려질 수 있습니다. 가능하다면, 반드시 스파크에서 제공하는 내장(built-in) 함수를 사용해야 합니다.
다음에 할 일은 모듈을 임포트(import)하는 것입니다. (정규 표현식을 파싱하기 위한 re 모듈과 여러 엘리먼트를 선택하기 편하게 할 numpy 모듈)
마지막으로, 정보를 추출하기 위해 그리고 각 행에 대해 파싱하기 위해 정규 표현식 객체를 생성합니다. ( 정규식 레퍼런스 )
하나의 데이터가 파싱되면, 각각의 리스트를 Numpy 배열로 바꾸고 리턴합니다. 이 과정에서 무언가 오류가 발생하면 -99로 초기화된 리스트를 리턴합니다. 이를 통해 데이터가 제대로 파싱되지 않았음을 알 수 있습니다. -99 대신에 .flatMap()을 사용해 오류가 있는 데이터를 암묵적으로 필터링하고 비어있는 리스트(empty list)를 리턴할 수 있습니다.
이제 데이터셋을 쪼개고 변형하기 위해 extractinformation() 함수를 사용할 것입니다. map() 함수에 오로지 함수 시그니처만 전달한다는 것을 잊지말하야 합니다. (이 함수는 각 파티션에서 RDD 내의 한 데이터만 extractinformation() 함수에 전달합니다.)
data_from_file_conv = data_from_file.map(extractInformation)
전역 범위 vs. 지역 범위
파이스파크 사용자로서 익숙해져야 할 것 중 하나는 스파크에서의 병렬 처리입니다. 파이썬에 능숙하더라도 파이스파크에서 스크립트를 실행하는 것에 약간 어려움을 느낄 수 있습니다.
스파크는 두 가지 모드로 동작할 수 있습니다(로컬 모드와 클러스터 모드). 스파크가 로컬 모드로 동작할 때는 파이썬을 실행시키는 것과 다르지 않을 수도 있습니다.(바뀐 것은 대부분 구문상의 것들과 데이터와 코드가 분리된 워커 프로세스 사이에서 복사될 수 있다는 약간의 구조적인 차이뿐입니다)
그러나 특별한 주의없이 같은 코드를 클러스터 모드에서 실행하면 골치 아픈 일들이 더 많이 생깁니다. 이것을 하기 위해서는 스파크가 클러스터에서 잡을 어떻게 실행시키는지 이해해야 합니다.
클러스터 모드에서 잡이 실행되면, 그 잡은 드라이버 노드(또는 마스터 노드)에 보내집니다.
드라이버 노드는 잡을 위해 DAG(Directed Acyclic Graph))을 생성하고 어떤 실행 노드(또는 워커 노드)가 특정 태스크를 실행할지 결정합니다. 그리고 드라이버는 각 태스크를 마칠 준비를 한 후, 워커 노드가 각자의 태스크를 수행하고 작업을 마치면 그 결과를 드라이버 노드에 리턴하도록 합니다(워커 노드가 RDD에서 태스크를 수행할 수 있도록 드라이버 노드에 보여진 변수와 함수의 집합)
이 변수와 함수는 내부적으로 실행 노드의 문맥상에서 정적입니다. 즉, 각자의 실행 노드가 드라이버 노드에서 사용되는 변수와 함수를 복사해 사용합니다. 태스크를 실행할 때, 실행 노드가 이 변수나 함수를 수정할 경우 다른 실행 노드들의 변수나 함수에는 영향을 주지 않는다는 것입니다. 이로 인해 런타임 버그나 몇몇 이상 행위를 유발할 수 있는데, 보통 이러한 오류들은 굉장히 추적하기가 힘듭니다.
트랜스포메이션
트랜스포메이션은 데이터셋의 형태를 만듭니다. 이것들은 필터링, 조인, 데이터셋 내의 값들에 대한 트랜스코딩 등을 포함합니다. 이번 절에서는 RDD에서 가능한 몇몇 트랜스포메이션을 보여줄 예정입니다.
RDD에는 스키마가 없기 때문에 이번 절에서는 생산된 데이터셋에서의 스키마를 잘 알고 있다고 가정합니다.
* map() 트랜스포메이션 (분류)
map() 함수는 가장 많이 쓰이는 함수가 될 것입니다. 이 함수는 RDD의 각 엘리먼트에 적용됩니다(data_from_file_conv 데이터셋에서 이 함수를 각각의 행에 대한 트랜스포메이션으로 볼 수 있습니다)
이번 예제에서는 사망 날짜를 숫자 값으로 변형하는 새로운 데이터셋을 생성할 것입니다.
data_201 = data_from_file_conv.map(lambda row: int(row[16]))
data_2014.take(10)을 실행한 결과는 다음과 같습니다.
당연히 더 많은 열들을 가져올 수 있으나 가져온 것들을 튜플, 사전 또는 리스트로 감싸야 합니다. map() 함수가 의도된 대로 동작하는 것을 확인하기 위해 17번째 행 엘리먼트를 추가해보겠습니다.
data_2014_2 = data_from_file_conv.map( lambda row: (row[16], int(row[16]):) data_2014_2.take(5)
* .filter() 트랜스포메이션 (where문)
자주 사용되는 또 다른 트랜스포메이션은 .filter() 함수입니다. 이 함수를 이용해서 데이터셋으로부터 특정 조건에 맞는 엘리먼트를 선택할 수 있습니다. 예를 들어, data_from_file_conv 데이터셋에서 2014년에 사고로 몇명이 죽었는지를 카운트해보겠습니다.
data_filtered = data_from_file_conv.filter( lambda row: row[16] == '2014' and row[21] == '0') data_filtered.count()
위의 명령어는 실행하는 컴퓨터의 성능에 따라 다소 시간이 걸릴 수 있습니다.
* .flatMap(...) 트랜스포메이션
.flatMap() 함수는 .map() 함수와 비슷하게 동작합니다. 그러나 이는 리스트가 아닌 평면화된 결과(flattened result)를 리턴합니다. 다음 코드를 실행해보겠습니다.
data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1)) data_2014_flat.take(10)
위의 코드는 다음과 같은 결과를 출력합니다.
이 결과를 이전에 data_2014_2에서 생성된 것과 비교해보겠습니다. 이미 언급했듯이, .flatMap() 함수는 입력 데이터를 파싱할 때 올바르지 않은 형태의 데이터를 제거하기 위해 사용됩니다. flatMap() 함수는 각 행을 리스트로 읽어서 추가합니다. 이때 올바르지 않은 형태의 데이터는 아무것도 포함하지 않은 리스트를 전달해 제거합니다.
* .distinct() 트랜스포메이션
이 함수는 특정 칼럼에서의 중복된 값을 제거해, 고유한 값을 리스트로 리턴합니다. 이는 데이터셋을 알아보거나 검증할 때 특히 유용합니다. 다음의 코드에서 성별 컬럼(row[5])이 남자나 여자만을 포함하는지 알아보면, 데이터셋을 제대로 파싱했다는 것을 검증하게 됩니다. 다음의 코드를 실행해보겠습니다.
distinct_gender = data_from_file_conv.map( lambda row: row[5]).distinct() distinct_gender.collect()
이 코드는 다음의 결과를 생성합니다.
우선 성별을 포함하는 칼럼을 추출하고, .distinct() 함수를 사용해 고유한 값으로 어떤 값들이 있는지 확인해보겠습니다. 마지막, .collect() 함수를 사용해 그 고유한 값들을 출력해보겠습니다. 이 함수는 많은 자원을 사용하는 함수고, 데이터를 섞은 연산을 포함하기 때문에 정말 필요할 경우에만 사용해야 합니다.
* .sample(...) 트랜스포메이션
.sample() 함수는 데이터셋으로부터 임의로 추출된 샘플을 리턴합니다. 첫 번째 파라미터는 중복 허용 여부를 명시하고, 두 번째 파라미터는 리턴할 데이터셋과 전체 데이터셋 간의 크기 비율을 명시하며, 세번째 파라미터는 임의의 숫자를 생성하기 위한 시드입니다.
fraction = 0.1 data_sample = data_from_file_conv.sample(False, fraction, 666)
이 예제에서 원본 데이터의 10%에 해당하는 임의의 샘플을 얻었습니다. 이를 확인하기 위해 데이터셋의 크기를 출력해보겠습니다.
print('Original dataset: {0}, sample: {1}'.format(data_from_file_conv.count(), data_sample.count()))
위의 명령어는 다음 결과를 출력합니다.
해당하는 RDD의 개수를 카운트하는 .count() 함수를 액션으로 사용합니다.
* .leftOuterJoin(...) 트랜스포메이션
SQL에서처럼 .leftOuterJoin(...)는 두 개의 RDD를 두 개의 데이터셋에서 찾은 값에 기반해 조인하고, 두 개의 RDD가 매치되는 데이터에 대해 왼쪽 RDD에 오른쪽 RDD가 추가된 결과가 리턴됩니다.
rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c', 10)]) rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)]) rdd3 = rdd1.leftOuterJoin(rdd2)
RDD3에서 .collection() 함수를 수행하면 다음 결과가 나옵니다.
이 함수는 데이터를 섞는 과정을 포함해 성능을 저하시킬 수 있습니다. 따라서 필요할 때만 가끔 사용해야 합니다.
여기서 볼 수 있는 것은 RDD1에서의 모든 엘리먼트와 그것에 상응하는 RDD2에서의 값입니다. 여기서 볼 수 있듯이, 'a'는 RDD3에서 두 번 나타나고 'a'는 RDD2에서 두번 나타납니다. RDD1에서 'b'의 값은 한 번만 표시되고 RDD2에서의 값인 6과 조인됩니다. 무시된 것이 두개 있습니다. RDD1에서의 'c' 값은 RDD2에서 상응하는 키를 가지고 있지 않습니다. 그래서 리턴된 튜플에서의 값이 None입니다. 그리고 레프트 조인을 구했기 때문에 RDD2에서의 d값은 무시됩니다.
.join() 함수를 사용했다면 'a'와 'b'가 두 RDD의 교집합이기 때문에 'a'와 'b'의 값만 갖게 되었을 것입니다. 다음 코드를 실행해보도록 합시다.
rdd4 = rdd1.join(rdd2) rdd4.collect()
두 RDD의 교집합 엘리먼트를 리턴하는 .intersection() 함수도 유용한 함수입니다. 다음 코드를 실행해보겠습니다.
rdd5 = rdd1.intersection(rdd2) rdd5.collect()
* .repartition(...) 트랜스포메이션
데이터셋을 재파티션하면 데이터가 나눠지는 파티션의 개수가 바뀝니다. 이 기능은 데이터를 섞어서 성능을 크게 저하시키는 요인이 되기 때문에 정말 필요할 때만 가끔 사용돼야 합니다.
rdd1 = rdd1.repartition(4) len(rdd1.glom().collect())
위의 코드는 새로운 파티션 개수인 4를 출력합니다.
.collect() 함수와는 다르게 .glom() 함수는 하나의 리스트를 생성하는데, 그 리스트의 각 엘리먼트는 명시된 파티션에 존재하는 데이터셋의 모든 엘리먼트에 대한 리스트입니다. .glom()이 생성하는 리스트는 파티션 개수만큼의 엘리먼트를 가지고 있습니다.
액션
트랜스포메이션과는 다르게, 액션은 데이터셋에서 스케줄된 태스크를 실행합니다. 트랜스포메이션 같은 경우, 데이터 트랜스포메이션이 끝난 후 트랜스포메이션을 실행할 수 있습니다. 이것은 어떠한 트랜스포메이션도 포함하지 않거나(예를 들어, .take(n)은 어떠한 트랜스포메이션이 수행되지 않아도 RDD에서 n개의 데이터를 리턴합니다) 또는 어떠한 트랜스포메이션도 될 수 있습니다.
* .take(...) 함수
map() 함수처럼, .take() 함수 역시 가장 유용한 함수 중 하나입니다. 이 함수는 하나의 파티션에서 가장 위에 있는 n행을 리턴하기 때문에 RDD 전체를 리턴하는 .collect()보다 더 자주 쓰입니다. 큰 데이터셋일수록 이것이 특히 중요합니다.
data_first = data_from_file_conv.take(1)
데이터로부터 임의의 샘플을 얻고 싶으면 .takeSample() 함수를 사용하면 됩니다. 이 함수는 세 개의 파라미터를 갖습니다. 첫번째 파라미터는 샘플링이 재선택되는 경우를 허용할지 말지를 결정하고, 두번째 파라미터는 리턴되는 데이터의 개수를 명시합니다. 마지막으로 세번째 파라미터는 랜덤 숫자를 생성하기 위한 시드입니다.
* .collect(...) 함수
이 함수는 RDD의 모든 엘리먼트를 드라이버로 리턴합니다. 이것에 대한 문제점은 위에서 설명했기 때문에 설명을 반복하지는 않으려고 합니다.
* reduce(...) 함수
.reduce() 함수는 특정 함수를 사용해 RDD의 개수를 줄입니다. RDD의 총합을 구하기 위해 이 함수를 사용할 수 있습니다.
rdd1.map(lambda row: row[1]).reduce(lambda x, y: x+y)
우리는 RDD1의 값 리스트를 map() 트랜스포메이션을 이용해 생성했고, 그 결과를 처리하기 위해 .reduce() 함수를 이용했습니다. .reduce() 함수는 각각의 파티션에서 합계 함수를 수행하고(위에서는 lambda로 표기), 마지막 집계가 수행되는 드라이버 노드에 그 합계를 리턴합니다.
여기서 주의할 점이 있습니다. 리듀서로 전달되는 함수는 결합 법칙이 성립해야 하고(즉, 엘리먼트의 순서가 바뀌어도 결과에 영향을 주지 않아야 합니다). 교환 법칙이 성립해야 합니다. 즉, 피연산자의 순서가 바뀌어도 결과는 같아야 합니다. 그러므로 어떤 함수를 리듀셔로 정할지 신중하게 결정해야 합니다.
이전의 법칙들을 무시하면, 문제가 발생할 것입니다. 예를 들어, 다음 RDD(하나의 파티션만 사용하는)가 있다고 가정해보겠습니다.
data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 1)
데이터를 현재 결과와 다음 결과를 나누는 방법으로 리듀스하면, 예상되는 결과는 10이 나옵니다.
works = data_reduce.reduce(lambda x, y: x / y)
그러나 파티션을 세 개로 나누면 결과는 달라질 것입니다. 아래 결과는 0.004입니다.
data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 3) data_reduce.reduce(lambda x, y: x/y)
.reduceByKey(...) 함수는 .reducer() 함수와 비슷하게 동작하나 이 함수는 키 값을 기반으로 리듀스를 수행합니다.
data_key = sc.parallelize( [('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1). ('d', 3)], 4) data_key.reduceByKey(lambda x, y: x + y).collect()
* .count(...) 함수
.count() 함수는 RDD의 엘리먼트 개수를 셉니다. 다음 코드를 사용해 보겠습니다.
data_reduce.count()
이 코드는 data_reduce의 정확한 엘리먼트 개수인 6을 출력할 것입니다.
.count() 함수는 다음 함수와 같은 결과를 출력하나 .count() 함수는 전체 데이터셋을 드라이버로 옮기지 않습니다.
len(data_reduce.collect()) # 잘못된 방법 - 실행하지 말 것
데이터셋이 키-값 형태로 있으면, 고유한 키의 수를 구하기 위해 .countByKey() 함수를 사용할 수도 있습니다. 다음 코드를 실행해보겠습니다.
data_key.countByKey().items()
이 코드는 다음 결과를 출력합니다.
* .saveAsTextFile(...) 함수
이름으로부터 추측되듯이, .saveAsTextFile() 함수는 RDD를 텍스트 파일로 저장합니다. (각 파티션을 분리된 파일에)
data_key.saveAsTextFile('/Users/drabast/Documents/PySpark_Data/data_key.txt')
모든 행이 스트링으로 표시되기 때문에 뒤쪽부터 읽고 싶으면 뒤쪽으로 파싱해야 합니다.
def parseInput(row): import re pattern = re.compile(r'\(\'([a-z]_\', ([0-9])\)') row_split = pattern.split(row) return (row_split[1], int(row_split[2])) data_key_reread = sc.textFile('/Users/drabast/Documents/PySpark_Data/data_key.txt').map(parseInput) data_key_reread.collect()
* .foreach(...) 함수
이 함수는 같은 함수를 RDD의 각 엘리먼트에 반복적으로 적용하는 함수입니다. .map() 함수와는 달리, foreach() 함수는 정의된 함수를 하나하나 각각의 데이터에 적용합니다. 이는 파이스파크에서 지원하지 않는 데이터베이스에 데이터를 저장하고 싶을 때 유용합니다.
이 함수를 이용해 data_key RDD에 저장돼 있는 모든 데이터를 출력해보겠습니다. (주피터 노트북이 아니라 CLI에 출력)
def f(x): print(x) data_key.foreach(f)
CLI를 보면, 모든 데이터들이 출력된 것을 볼 수 있습니다. 출력되는 데이터의 순서는 매번 다릅니다.
요약
RDD는 스파크의 핵심입니다. RDD와 같은 스키마리스 데이터 구조는 스파크에서 다룰 가장 기본적인 데이터 구조입니다.
이번 장에서는 .parallelize() 함수나 텍스트 파일로부터 데이터를 읽는 방법으로 RDD를 텍스트 파일로부터 생성하는 방법을 알아봤습니다. 또한 비구조적인 데이터를 처리하는 방법도 다뤘습니다.
스파크에서 트랜스포메이션은 게으릅니다(액션이 호출됐을 때만 적용된다). 이번 포스팅에서는 가장 자주 쓰이는 트랜스포메이션과 액션에 대해 알아보았습니다. 파이스파크 문서는 더 많은 것들을 포함하고 있습니다.
스칼라 RDD와 파이썬 RDD의 가장 큰 차이는 스피드입니다. 파이썬 RDD는 스칼라 RDD보다 훨씬 느릴 수 있습니다.
출처: https://12bme.tistory.com/306?category=737765 [길은 가면, 뒤에 있다.]
'Big Data > 빅데이터' 카테고리의 다른 글
[빅데이터] 검색시스템 간단 요약 (0) | 2020.08.03 |
---|---|
[Spark] 스파크 ML 패키지 (0) | 2020.08.03 |
[Spark] 데이터 모델링 준비하기 (0) | 2020.08.03 |
[Spark] 데이터프레임 (0) | 2020.08.03 |
[Spark] 스파크 이해하기 (0) | 2020.08.03 |
[ELK] 엘라스틱서치(ElasticSearch) 시작하기 (0) | 2020.08.03 |
[ELK] ELK 스택(ElasticSearch, Logstash, Kibana)으로 데이터 분석 (0) | 2020.08.03 |
[하둡] 맵리듀스(MapReduce) 이해하기 (0) | 2020.08.03 |