[Spark] 데이터 모델링 준비하기
모든 데이터는 보통 지저분하고 데이터가 의도한 것에 대한 충분한 신뢰성을 가지고 있지 않습니다. 데이터가 깨끗한 상태에 있다는 것을 스스로 증명하거나 테스트하기 전까지는 데이터를 모델링에 사용하거나 지나치게 신뢰하면 안됩니다.
데이터는 중복 데이터나 관찰되지 않은 값, 아웃라이어, 존재하지 않는 주소, 잘못된 폰번호 또는 지역 코드, 올바르지 않은 지역 좌표, 잘못된 데이터나 레이블, 대소문자 구분, 공백 관련 문제를 가지고 있습니다. 데이터 과학자나 데이터 엔지니어는 통계 모델 또는 머신러닝 모델을 빌드하기 위해 이러한 데이터를 깨끗하게 만들어야 합니다.
데이터는 앞에서 말한 문제점들이 없을 경우 기술적으로 깨끗하다고 말할 수 있습니다. 그러나 모델링을 목적으로 데이터셋을 깨끗하게 하기 위해서는 피처의 분포를 확인해야 하고 사전에 정의된 조건들을 만족하는지 검증해야 합니다. 데이터 과학자는 80~90%의 시간을 데이터를 다루거나 피처에 익숙해지는 데 쓰게 됩니다.
중복, 미관찰 값, 아웃라이어 확인하기
데이터를 완전히 검증하거나 검증 결과에 만족하기 전까지는 데이터를 신뢰하거나 사용해서는 안됩니다. 중복, 미관찰 값, 아웃라이어를 어떻게 다룰지 이야기 해보겠습니다.
* 중복 값
중복 값은 데이터셋에서 다른 행이지만 같은 값을 가지는 데이터를 말합니다. 즉 값을 하나 하나 봤을 때, 모든 피처들이 정확히 같은 값을 갖는 서로 다른 행을 의미합니다.
반면에 데이터가 행끼리 구분할 수 있는 ID 같은 것을 가졌다면, 최초에 중복 값으로 여겨졌던 값들이 중복 값이 아닐 수도 있습니다. 가끔 시스템이 오작동해 ID에 에러가 생기는 경우도 있습니다. 그럴 경우에 같은 ID가 실제로 중복이어도 되는 값인지 확인해야 하고, 그렇지 않을 경우 새로운 ID 시스템을 생각해봐야 합니다.
df = spark.createDataFrame( [ (1, 144.5, 5.9, 33, 'M'), (2, 167.2, 5.4, 45, 'M'), (3, 124.1, 5.2, 23, 'F'), (4, 144.5, 5.9, 33, 'M'), (5, 133.2, 5.7, 54, 'F'), (3, 124.1, 5.2, 23, 'F'), (5, 129.2, 5.3, 42, 'M') ], ['id', 'weight', 'height', 'age', 'gender'] )
위 데이터프레임에는 몇가지 이슈가 있습니다.
- ID가 같은 행이 두 개 있고, 그 두 행의 값은 정확히 일치합니다.
- ID가 1과 4인 행은 같습니다. ID만 다른데, 이 경우 우리는 두 사람이 같은 사람이라는 것을 조심스럽게 추측할 수 있습니다.
- ID가 5인 행을 두개 찾을 수 있으나 그 값이 달라서 같은 사람으로 보기에는 힘듭니다.
이 데이터셋은 일곱 줄밖에 안되는 매우 쉬운 예제입니다. 몇 백만줄이 있을 경우에는 어떻게 할 것인가? 처음에 수행해야 하는 것은 중복 값을 가지고 있는지 확인하는 것입니다. 주로 데이터셋 개수와 같은 데이터셋에 대해 distinct() 함수를 실행한 것을 비교해 확인할 수 있습니다.
print('Count of rows: {0}'.format(df.count()) print('Count of distinct rows: {0}'.format(df.distinct().count()))
위에서 두 행의 결과값이 다르면, 순수 중복값이 있다는 것을 알 수 있습니다. 이러한 값들을 dropDuplicates() 함수를 이용해 제거할 수 있습니다.
df = df.dropDuplicates()
ID가 3인 데이터 중 하나를 삭제했습니다. 이제 ID에 대해 다른 중복 값이 있는지 확인해보겠습니다. 이전에 했던 것을 ID가 아닌 다른행들을 사용해 다시 반복할 수 있습니다.
print('Count of ids: {0}'.format(df.count())) print('Count of distinct ids: {0}'.format( df.select([c for c in df.columns if c != 'id']).distinct().count()) )
dropDuplicates() 함수를 사용해도 됩니다. 그러나 ID 컬럼을 제외한 나머지 칼럼을 명시한 서브셋 파라미터를 추가해야 합니다.
df = df.dropDuplicates( subset=[c for c in df.columns if c != 'id'] )
dropDuplicates() 함수는 서브셋 파라미터를 통해 명시된 칼럼들을 사용해서 중복된 행을 찾습니다. 이제 중복된 데이터가 없다는 것을 알았기 때문에 중복된 id가 있는지 확인하는 작업을 수행하면 됩니다. 전체의 고유한 ID 개수를 한번에 계산하기 위해 .agg() 함수를 사용할 수 있습니다.
import pyspark.sql.functions as fn df.agg( fn.count('id').alias('count'), fn.countDistinct('id').alias('distinct') ).show()
그다음에 데이터프레임에 대해 행 개수를 카운트하기 위해 고유 ID의 개수를 카운트하고자 count() 함수와 countDistinct() 함수를 사용했습니다. alias() 함수는 리턴된 칼럼에 대해 이름을 명시하기 위해 사용되었습니다. 위의 코드에서는 각각 count, distinct라는 이름을 부여합니다.
위에서 볼 수 있듯이, 전체 다섯 개의 행이 있으나 고유한 id는 네 개입니다. 그런데 중복된 행은 모두 제거했기 때문에 각각의 행에 고유한 새 id를 줄 수 있습니다.
df.withColumn('new_id', fn.monotonically_increasing_id()).show()
monotonically_increasing_id() 함수는 각 행에 고유한 값을 부여하면서 그 값을 증가시킵니다. 문서에 따르면, 각각의 파티션에 800억 개의 데이터가 있고 파티션의 개수가 약 10억 개 미만인 데이터들에 대해서 monotonically_increasing_id() 함수가 고유한 ID값을 부여한다고 합니다.
여기서 한가지 주의할 점이 있습니다. 스파크 이전 버전에서 monotonically_increasing_id() 함수는 같은 데이터프레임에서 여러 번 작업이 이뤄졌을 때마다 ID 값이 바뀌었는데, 이는 스파크 2.0에서 고정된 ID값을 부여합니다.
* 관찰되지 않은 데이터
많은 데이터를 보다 보면 종종 아무 값도 없는 미관찰 값을 포함한 데이터를 자주 보게될 것입니다. 이는 시스템 장애, 휴먼 에러, 데이터 스키마 변경 등과 같은 여러 가지 이유 때문일 것입니다.
미관찰 값들을 다룰 수 있는 가장 간단한 방법은 미관찰 값을 가지고 있는 모든 데이터를 제거하는 것입니다. 단, 너무 많은 데이터를 제거하지 않도록 조심해야 합니다. 데이터셋 전체에서 미관찰 값의 분포에 따라 데이터셋 전체의 사용 가능성에 큰 영향을 미칠 수 있습니다. 데이터를 제거한 후 아주 작은 데이터만 남았다거나 데이터가 절반 이상으로 줄어들었다면, 어떤 피처가 빈칸을 가장 많이 가지고 있는지 확인하고 그 피처를 제거하는 것이 더 좋습니다. 하나의 피처가 거의 대부분 빈칸이면 그 피처는 거의 쓸모없다고 봐도 되기 때문입니다.
미관찰 값을 다루는 또 다른 방법은 None이라는 값으로 채우는 것입니다. 이럴 경우, 데이터 필드 타입에 따라 몇몇 다양한 값으로 대체해 채워 넣을 수 있습니다.
- 데이터가 참/거짓으로 구분되면, Missing이라는 세 번째 카테고리를 넣으면 됩니다.
- 데이터가 이미 카테고리를 가지고 있다면 Missing 카테고리를 집어넣으면 됩니다.
- 순서 혹은 숫자 데이터를 가지고 있을 경우에는 평균, 중간값 또는 미리 정의된 다른 값으로 바꿀수 있습니다.
df_miss = spark.createDataFrame( [ (1, 144.5, 5.6, 28, 'M', 100000), (2, 167.2, 5.4, 45, 'M', None), (3, None , 5.2, None, None, None), (4, 144.5, 5.9, 33, 'M', None), (5, 133.2, 5.7, 54, 'F', None), (3, 124.1, 5.2, None, 'F', None), (5, 129.2, 5.3, 42, 'M', 76000) ], ['id', 'weight', 'height', 'age', 'gender', 'income'] )
행을 분석하면 다음과 같은 사실을 알 수 있습니다.
- ID가 3인 행은 오직 하나의 유용한 정보(신장)을 갖습니다.
- ID가 6인 행은 오직 하나의 미관찰 값(나이)를 갖습니다.
칼럼을 분석하면 다음과 같은 사실을 알 수 있습니다.
- 수입(income) 칼럼은 매우 개인적인 정보이므로 대부분 미관찰되었습니다.
- 몸무게(weight)와 성(gender) 칼럼에서 미관찰 값이 존재한 데이터는 하나뿐입니다.(ID가 3인 데이터)
- 나이(age) 컬럼은 두 개의 미관찰 값을 갖습니다.
각 행의 미관찰 값 개수를 알기 위해 다음 코드를 이용할 수 있습니다.
df_miss.rdd.map( lambda row: (row['id'], sum([c == None for c in row])) ).collect()
이는 다음 결과를 출력합니다.
각 행에서 미관찰 값들에 대해 어떻게 처리할지(컬럼 전체를 제거할지 또는 어떤 값으로 바꿔 넣을지) 결정하기 위해 어떤 값이 미관찰됐는지 확인해 보겠습니다.
df_miss.where('id == 3').show()
이제 각 컬럼에서 미관찰 값의 비율을 확인해보겠습니다.
df_miss.agg(*[ (1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing') for c in df_miss.columns ]).show()
count() 함수에 있는 * 아규먼트(argument)를 칼럼 이름 부분에 추가하면 모든 행의 개수를 셀 수 있게 됩니다. 반면에 리스트 앞에 *가 오면, agg() 함수는 그 리스트의 각 엘리먼트를 함수에 전달할 파라미터로 취급합니다. 14%의 미관찰 값이 몸무게와 성 칼럼에 있는 것을(키 칼럼의 두 배) 알 수 있습니다. 수입 칼럼은 무려 72%에 해당합니다. 이제 미관찰 값들에 대해 어떻게 처리해야 할지 감이 오기 시작합니다.
우선 대부분의 값이 미관찰 수업 피처를 제거할 것입니다.
df_miss_no_income = df_miss.select( [c for c in df_miss.columns if c != 'income'] )
이제 ID가 3인 행을 제거할 필요가 없다는 것을 알 수 있습니다. 몸무게와 나이 칼럼이 평균을 계산하거나 미관찰 값을 대체하기에 충분한 수의 관찰된 값을 가지고 있기 때문입니다.
그러나 관찰된 값을 제거하기로 결정했다면 다음과 같이 dropna() 함수를 사용해도 됩니다. 여기서 thresh 파라미터를 사용할 것입니다. 이 파라미터를 통해 각 행에서 제거할 수 있는 최소의 미관찰 값 개수를 임계치로 설정할 수 있습니다. 이 값은 데이터셋이 수십 혹은 수백개의 피처들을 가지고 있거나 미관찰 값에 대한 임계치를 넘은 행들을 제거하고 싶을 경우 유용하게 사용할 수 있습니다.
df_miss_no_income.dropna(thresh=3).show()
한편으로, 미관찰 값을 추정해 채우려면 fillna() 함수를 사용할 수 있습니다. 이 함수는 단일 integer, float, long, string 타입을 지원합니다. 전체 데이터셋에서 미관찰 값은 그 값으로 채워질 것입니다. 또한 딕셔너리 형태(예: {'<colName>':<value_to_impute>})도 지원합니다. 딕셔너리 또한 <value_to_impute>와 같이 integer, float, string만을 지원한다는 제한이 있습니다.
평균, 중간값 또는 다른 계산된 값으로 채우려면 우선 그 값을 계산하고 그러한 값을 가지는 딕셔너리를 만든 후 fillna() 함수에 전달해야 합니다.
means = df_miss_no_income.agg( *[fn.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender'] ).toPandas().to_dict('records')[0] means['gender'] = 'missing' df_miss_no_income.fillna(means).show()
카테고리 값들에 대해서는 당연히 평균을 구할 수 없기 때문에 성 칼럼은 생략했습니다.
여기서 데이터 타입이 두번 변환됩니다. agg() 함수의 결과를 취해 pandas의 데이터프레임으로 변환한 후 딕셔너리 형태로 한번 더 변환했습니다.
toPandas() 함수는 RDD에서 collect() 함수와 같은 방식으로 동작하기 때문에 문제를 일으킬 수 있다는 사실을 염두해두어야 합니다. 이 함수는 모든 정보를 워커 노드로부터 수집한 후, 드라이버 노드로 옮깁니다. 이는 수천 개의 피처를 다루지 않는 한, 앞에서 다뤘던 데이터셋에서는 문제가 되지 않습니다.
pandas의 to_dict() 함수에 대한 파라미터는 다음 딕셔너리를 생성하라고 지시합니다.
{'age': 40.399999999999999, 'height': 5.4714285714285706, 'id': 4.0, 'weight': 140.283333333333333 }
성별 피처에 대한 평균 또는 그 어떤 수치도 계산도 할 수 없기 때문에 missing이라는 카테고리를 성별 피처에 추가했습니다. 또한 나이 칼럼의 평균이 40.40이더라도 값을 대체할 때는 df_miss_no_income.age 칼럼의 타입이 유지된다는 것을 명심해야 합니다.
아웃라이어
아웃라이어는 대부분의 데이터와는 매우 다른 분포를 띄는 데이터를 의미합니다. 매우 다르다는 것에 대한 정의는 조금씩 다를 수 있으나 일반적으로 모든 값들이 대체적으로 Q1-Q1.5IQR 사이에 있거나 Q3+1.5IQR 사이에 있으면 아웃라이어가 없다고 말할 수 있습니다. IQR은 상위 쿼타일(75%)과 하위 쿼타일(25%)의 차로 정의 됩니다.
df_outliers = spark.createDataFrame([ (1, 143.5, 5.3, 28), (2, 154.2, 5.5, 45), (3, 342.3, 5.1, 99), (4, 144.5, 5.5, 33), (5, 133.2, 5.4, 54), (6, 124.1, 5.1, 21), (7, 129.2, 5.3, 42) ], ['id', 'weight', 'height', 'age'])
위의 데이터를 기반으로 아웃라이어를 정의해보겠습니다.
우선 하위/상위 제한선을 각각의 피처에 대해 계산합니다. 이를 위해 approxQuantile() 함수를 사용할 것입니다. 맨 처음 파라미터는 칼럼명이고, 두 번째 파라미터는 0과 1사이거나(0.5는 중간값을 의미) 리스트입니다. 세번째 파라미터는 각 피처에 대한 허용 가능한 수준의 에러를 명시합니다. 이 값이 0일 경우 피처에 대한 정확한 값을 계산할 수 있으나 이는 매우 많은 연산량을 요구합니다.
cols = ['weight', 'height', 'age'] bounds = {} for col in cols: quantiles = df_outliers.approxQuantile( col, [0.25, 0.75], 0.05 ) IQR = quantiles[1] - quantiles[0] bounds[col] = [ quantiles[0] - 1.5 * IQR, quantiles[1] + 1.5 * IQR ]
위의 코드에서 boundary 딕셔너리는 각 피처에 대해 하위/상위 범위를 포함합니다.
Out[17]: { 'age': [9.0, 51.0],
'height': [4.8999999999999995, 5.6],
'weight': [115.0, 146.84999999999997]
}
이제 아웃라이어를 확인하기 위해 이 값을 사용해 보겠습니다.
outliers = df_outliers.select( *['id'] + [((df_outliers[c] < bounds[c][0]) | (df_outliers[c] > bounds[c][1])).alias(c + '_o') for c in cols] ) outliers.show()
몸무게와 나이 피처에 각각 두 개의 아웃라이어가 있습니다. 예전에는 아웃라이어를 판단하는 공식을 따로 알아야만 했습니다. 하지만 아웃라이어를 찾아내기 위해 위의 코드만 이용하면 됩니다.
df_outliers = df_outliers.join(outliers, on='id') df_outliers.filter('weight_o').select('id', 'weight').show() df_outliers.filter('age_o').select('id', 'age').show()
데이터에 친숙해치기
추천하는 방법은 아니지만, 데이터에 대해 제대로 알지 못하더라도 모델을 만들 수는 있습니다. 이럴 경우 시간은 더 걸릴 것이며 결과도 좋지 않을 것입니다. 그러므로 능력 있는 데이터 과학자나 데이터 모델링 엔지니어는 모델링을 시작하기 전에 데이터를 어느 정도 숙지하고 있습니다. 데이터들에 대한 간단한 기술 통계(descriptive statistics)부터 시작해보겠습니다.
* 기술 통계
간단히 말해, 기술 통계는 데이터셋에서의 관찰 값 개수, 각 컬럼의 평균과 표준 편차 또는 최댓값/최솟값 등의 기본적인 정보를 제공합니다. 우선 데이터를 로드하고 스파크 데이터프레임으로 변환해 보겠습니다.
import pyspark.sql.types as typ
우선 필요한 모듈을 로드해보겠습니다. pyspark.sql.types는 IntegerType()이나 FloatType() 같은 사용 가능한 모든 데이터 타입을 로드합니다. 그다음에는 데이터를 읽고 헤더 라인을 filter() 함수로 제거합니다. 다음 작업은 행을 콤마로 분리하고 각각의 엘리먼트를 정수형으로 변환하는 작업입니다.
fraud = sc.textFile('ccFraud.csv.gz') header = fraud.first() fraud = fraud.filter(lambda row: row != header).map(lambda row: [int(elem) from elem in row.split(',')]) # 데이터프레임 스키마 생성 fields = [ *[typ.StructField(h[1:-1], type.IntegerType(), True) fro h in header.split(',')] ] schema = typ.StructType(fields) # 데이터프레임 생성 fraud_df = spark.createDataFrame(fraud, schema)
카테고리 컬럼을 더 자세히 이해하기 위해, groupby() 함수를 이용해 그 값들의 빈도를 셀 수 있습니다. 성별 칼럼에 대한 빈도를 세보겠습니다.
fraud_df.groupby('gender').count().show()
결과값에 차이가 있다면 다소 불균형한 데이터셋임을 확인할 수 있습니다. 실제 숫자 피처에 대해서는 describe() 함수를 사용할 수 있습니다.
numberical = ['balance', 'numTrans', 'numIntlTrans'] desc = fraud_df.describe(numerical) desc.show()
위의 소스를 출력하여 얻을 수 있는 적은 정보(count, mean, stddev, min, max)들을 통해서도 상당히 많은 것을 알 수 있습니다.
- 모든 피처는 양의 방향으로 왜곡되었다. 최댓값이 평균값보다 몇배 더 크다.
- 변동 계수:coefficient variabion(평균과 표준 편차의 비율)가 매우 크다(값이 1과 가깝거나 크다. 이는 넓게 퍼진 데이터를 의미한다.)
다음 코드도 비대칭도를 확인할 수 있습니다(밸런스 피처)
fraud_df.agg({'balance': 'skewness'}).show()
합계 함수 리스트에는 avg(), count(), countDistinct(), first(), kurtosis(), max(), mean(), min(), skewness(), stddev(), stddev_pop(), stddev_samp(), sum(), sumDistinct(), var_pop(), var_samp(), variance() 등이 있습니다.(각 함수의 기능은 함수명으로부터 쉽게 추측할 수 있습니다)
* 상관 계수
서로 다른 피처들 간의 상호 관계를 나타내주는 측정 값은 상관 계수입니다. 모델은 일반적으로 타킷 데이터에 상당히 연관돼 있는 피처만을 포함합니다. 그러나 피처 간의 상관 계수를 확인하는 것도 상당히 중요합니다. 서로 상관 계수가 높은 피처들을 포함하는 모델은 예상치 못하는 결과를 초래하거나 불필요하기 때문입니다.
파이스파크에서 상관 계수를 계산하는 것은 데이터셋이 데이터프레임 형태로 있으면 매우 쉽습니다. 조금 복잡한 부분이 있다면 corr() 함수가 오직 두 쌍의 상관 계수만 계산할 수 있는 피어슨 상관 계수만을 지원한다는 점입니다. 오직 두 쌍의 상관 계수만 구할 수 있기 때문에 각각의 피처에 대해 corr() 함수를 실행해야 한다는 번거로움이 있습니다.
fraud_dv.corr('balance', 'numTrans')
상관 계수 행렬을 구하기 위해서는 다음의 스크립트를 사용합니다.
n_numerical = len(numerical) corr = [] for i in range(0, n_numerical): temp = [None] * i for j in range(i, n_numerical): temp.append(fraud_df.corr(numerical[i], numerical[j])) corr.append(temp)
시각화
* 히스토그램
히스토그램은 피처의 분포를 시각화하는 가장 쉬운 방법입니다. 파이스파크에서 히스토그램을 만드는 방법은 세가지입니다.
- 데이터를 워커 노드에서 집계해 워커 노드가 bin 리스트를 드라이버 노드에게 리턴하고 각 bin의 개수를 드라이버 노드가 센다
- 데이터를 모두 드라이버 노드에 리턴하고, 시각화 라이브러리 함수를 사용해 히스토그램을 만든다.
- 데이터를 샘플링해 드라이버 노드에 리턴한다. 드라이버 노드는 리턴된 데이터를 이용해 데이터를 시각화 한다.
데이터셋에서 행의 개수가 너무 많으면, 두번째 작업은 불가능합니다. 따라서 데이터를 먼저 집계해야 합니다.
* 피처 사이의 상호작용
Scatter 차트로 동시에 세 개 변수 간의 상호작용을 시각화할 수 있습니다. 임시 데이터를 다루거나 변화를 지속적으로 관찰하고 싶은 경우가 아닌 이상 세 개의 피처에 대한 시각화는 다루지 않을 것입니다. 설령 그런 경우가 있다고 하더라도 시간 데이터를 불연속적으로 구분해 2D 형태로 시각화할 것입니다. 3D 형태로 차트를 해석하면 대부분의 경우 더욱 복잡하고 헷갈리는 작업이 됩니다.
파이스파크는 서버 쪽에서는 어떠한 시각화 모듈도 제공하지 않기 때문에 수십억개의 데이터를 동시에 시각화하는 것은 매우 비현실 적입니다. 데이터셋은 0.02%로 샘플링(약 2,000개 데이터)하여 구현하면 훨씬 작업이 간단해질 수 있습니다.
계층화된 샘플링을 하지 않는 이상, 정해진 샘플링 비율로 적어도 세 개에서 다섯 개의 샘플링을 준비해야 합니다. 이렇게 함으로써 샘플링된 데이터셋이 전체를 대표할 수 있는지 체크할 수 있습니다. 다음은 성별을 기준으로 0.02% Fraud 데이터셋을 샘플링하는 예제입니다.
data_sample = fraud_df.sampleBy( 'gender', {1:0.0002, 2:0.0002} ).select(numerical) # 2D 차트 생성 코드 data_multi = dic( [(elem, data_sample.select(elem).rdd.flatMap(lambda row: row).collect()) for elem in numerical] ) sctr = chrt.Scatter(data_multi, x='balance', y='numTrans') chrt.show(sctr)
출처: https://12bme.tistory.com/309?category=737765 [길은 가면, 뒤에 있다.]
'Big Data > 빅데이터' 카테고리의 다른 글
[ELK] 키바나 5.0 배우기 (0) | 2020.08.03 |
---|---|
[R] 데이터 개념 이해하기 (0) | 2020.08.03 |
[빅데이터] 검색시스템 간단 요약 (0) | 2020.08.03 |
[Spark] 스파크 ML 패키지 (0) | 2020.08.03 |
[Spark] 데이터프레임 (0) | 2020.08.03 |
[Spark] 스파크 RDD란? (0) | 2020.08.03 |
[Spark] 스파크 이해하기 (0) | 2020.08.03 |
[ELK] 엘라스틱서치(ElasticSearch) 시작하기 (0) | 2020.08.03 |