[Spark] Json 포맷 처리하기 - Json Object List
들어가며
스파크에서는 CSV, Json, Protocol Buffer, Hadoop에서 지원하는 데이터 포맷 등 다양한 포맷을 지원을 한다. 이번에는 Json파일을 읽어서 스키마를 확인을 하고, 스키마에 있는 필요한 데이터를 추출하는 방법에 대해서 알아보려고 한다. 데이터는 공개되어 있는 tweet 데이터를 사용하였고, tweet데이터에서 내가 필요한 데이터를 추출하기 위한 과정을 소스코드로 작성해 보았습니다.
설명
소스 코드는 tweet의 데이터를 읽어들인 이후에 schema를 확인을 하여 데이터의 포맷이 어떻게 이루어져 있는지 확인을 합니다. (아래 스키마 그림 첨부 했습니다.) 스파크는 lazy execution을 하기 때문에 결과를 확인하기 위해 .take(N)을 사용했습니다. 처음에 tweets의 DataFrame의 객체를 생성을 한 뒤에, map의 연산을 통해서 DataFrame의 각 Row의 객체를 얻어옵니다. 여기서 tweet은 하나의 트윗을 나타내고, Row의 객체 형태를 갖고 있습니다.
하나의 Row에서 내가 원하는 field를 가져오기 위해서는 field의 인덱스를 알아야 합니다. 확인하는 방법은 tweet.fieldIndex("columnName")을 통해 확인이 가능합니다. 하나의 tweet의 id를 가져오기 위해서는 아래 스키마에서도 확인이 가능하듯이 long의 데이터 타입을 갖고 있기 때문에 tweet.getLong(tweet.fieldIndex("id"))를 통해서 가져올 수 있습니다.
그렇다면 primitive의 형태의 데이터외의 데이터는 어떻게 가져와야 할까요. getList를 통해서 List객체를 반환을 받을 수 있습니다. 여기서 contributorsIDs를 가져오려고 한다면, array의 타입이기 때문에 tweet.getList(tweet.fieldIndex("contributorsIDs"))를 통해 가지고 올 수 있습니다.
구조체로 표현되어있는 필드의 값을 가져오기 위해서는 getStruct를 통해 가져올 수 있습니다. tweet.getStruct(tweet.fieldIndex("user"))를 통해 user의 구조체를 가져올 수 있고, 각각의 필드의 값은 다시 user의 필드인덱스를 통해서 접근이 가늫압니다. 예를 들어 이름을 가져오고 싶으면 tweet.getStruct(tweet.fieldIndex("user"))의 값을 val user로 할당을 한 뒤에, user.getString(user.fieldIndex("name"))을 통해 가져올 수 있습니다.
항상 모든 예제는 내가 원하는 범위만큼은 제공하지 않지요. 그래서 제가 삽질을 한 내용을 말씀드리면, 아래와 같이 user가 만약에 Array형태의 Struct면 어떻게 가져와야 할까요? 만약 getList()를 통해 가지고 오게 되면, List[Nothing]의 에러가 발생합니다. 그래서 List에서 각 항목 예를 들면 user의 name을 가져와줘 라고 user.getString(user.fieldIndex("name"))을 하게 되면 'error: value getString is not a member of Nothing' 의 에러가 발생하게 됩니다. 만약 getSeq()를 쓰면 어떻게 될까요. 같은 문제가 발생합니다. 이 문제는 getSeq()를 하게 되면 List 내부에 있는 데이터의 타입을 명시적으로 작성을 해줘야 합니다. getSeq[Row]()로 Row의 DataType을 함께 넘겨주면 문제는 해결이 됩니다. Row...라고 알려줘야 필드에 들어있는 리스트의 구조체의 내부에 있는 값을 가져올 수 있습니다.
진짜 위 문제를 해결하기 위해서 반나절은 소비한것 같네요. 이 문제 때문에 해결하는 절차를 보면, 각 데이터를 추출 할 때마다 schema를 확인했습니다. 예를 들면 tweet에서도 user의 구조체를 추출을 하고 user.schema를 하면 schema의 값을 확인을 할 수 있었습니다. 하지만 만약에 getSeq()를 통해 추출한 List는 Nothing이라는 말만 내보내더군요. 아 그러니까 shcema가 없다는 것은 지금 이 List에 담겨있는 객체들이 어떤 객체를 나타내는지 모르는구나... 그래서 명시적으로 Row를 해주자 해서 해결을 했습니다.
소스코드
// https://github.com/databricks/learning-spark/blob/master/files/testweet.json
val tweets = sqlContext.jsonFile("/home/lee/learning-spark/learning-spark/files/testweet.json")
// verify schema
tweets.printSchema()
tweets.schema
// show table
tweets.show()
// parse json
tweets.take(10).map{
case tweet =>
val id = tweet.getLong(tweet.fieldIndex("id"))
val createdAt = tweet.getString(tweet.fieldIndex("createdAt"))
val contributorIDs = tweet.getList(tweet.fieldIndex("contributorsIDs"))
println("contributorIds = " + contributorIDs)
val i = 0
for (i <- 0 to contributorIDs.size()-1) {
println("contributor = " + contributorIDs.get(i))
}
println("id = " + id)
println("createdat" + createdAt)
val user = tweet.getStruct(tweet.fieldIndex("user"))
println("user = " + user)
val name = user.getString(user.fieldIndex("name"))
println("username = " + name)
}
// 실행결과
/*
contributorIds = [banana, lee, hong]
contributor = banana
contributor = lee
contributor = hong
id = 529799371026485248
createdatNov 4, 2014 4:56:59 PM
user = [Aug 5, 2011 9:42:44 AM,,WrappedArray(),1095,1231,600,15594928,false,false,true,false,false,en,0,,Holden Karau,C0DEED,,,true,,http://pbs.twimg.com/profile_images/3005696115/2036374bbadbed85249cdd50aac6e170_normal.jpeg,https://pbs.twimg.com/profile_images/3005696115/2036374bbadbed85249cdd50aac6e170_normal.jpeg,0084B4,FFFFFF,DDEEF6,333333,true,holdenkarau,false,6234,false,-3]
username = Holden Karau
res38: Array[Unit] = Array(())
*/
Tweet Schema
'Big Data > Apache Spark' 카테고리의 다른 글
[Spark] Pyspark NoneType, null, blank, empty string 필터링 하는 방법 (0) | 2021.05.06 |
---|---|
[Spark] 스파크에서 외부 라이브러리 사용하는 방법 (0) | 2021.05.06 |
[Spark] timestamp에서 날짜 추출하는 방법 (month, day, hour) (0) | 2021.05.06 |
[Spark] Row의 Seq Collection으로 RDD, DataFrame 생성하기 (0) | 2021.05.06 |
[Spark] 파티셔닝 예제 - 페이지랭크(PageRank) 알고리즘 (0) | 2021.05.06 |
[Spark] RDD데이터 파티셔닝 - 이론 및 예제 (0) | 2021.05.06 |
[Spark] RDD 영속화(캐싱) - 이론 및 예제 (0) | 2021.05.06 |
[Spark] Caching and Serialization (0) | 2021.05.06 |