[Spark] Row의 Seq Collection으로 RDD, DataFrame 생성하기

2021. 5. 6. 01:59 Big Data/Apache Spark

들어가며

  스파크에서 구현을 하다 보면 각 객체간의 변환(?)이 자유로워야 하는것 같다. 예를 들면 RDD에서 추출한 데이터를 DataFrame으로 생성한다든지, DataFrame에서 여러개의 Row를 추출해서 새로운 RDD를 생성한다는 식의 경우를 말한다. 이번에 당면한 문제는 Json Object의 요소를 저장하고 있는 Seq의 Collection을 갖고 있었고, 이 Collection을 바탕으로 DataFrame의 생성이 필요했다. 이번에는 Seq[org.apache.spark.sql.Row]의 데이터를 RDD와 DataFrame으로 변환하는 방법에 대해서 정리를 해보려고 한다. 

 

소스코드

  logData는 org.apache.spark.sql.Row의 값을 갖고 있다. logData에서 field명이 object인 값을 추출을 하려고 한다. 이때 field의 값 형태가 Json Object의 List로 이루어져 있다. 앞서 글에서 언급한바와 같이 getSeq[Row]를 통해서 Json Object의 리스트의 값을 가지고 왔다. (자세한 내용은 아래 포스팅을 참고하시면 됩니다.) 

[Spark] Json 포맷 처리하기 - Json Object List

  이때 getSeq[Row]를 통해 얻어낸 객체는 Seq[org.apache.spark.sql.Row]의 형태를 갖고 있습니다. 그럼 이 데이터를 어떻게 RDD로 생성하고, DataFrame의 객체로 변환하기 위해서 sc.parallelize로 위에 얻어낸 Row의 Seq를 넘겨 rdd를 생성하고, 생성된 rdd와 함께, Row의 schema를 함께 넘겨주시면 DataFrame의 객체를 얻을 수 있습니다. 이때 data.apply(0)을 넘긴 이유는 Seq에서 첫번째 Row의 정보를 가져왔고, 나머지와 동일하기 때문에 첫번째 Row의 StructType를 이용했습니다. 마지막으로 DataFrame의 show()를 호출하면 테이블을 확인하실 수 있습니다.

 

import org.apache.spark.sql.Row

// Seq[org.apache.spark.sql.Row]
data = logData.getSeq[Row](logData.fieldIndex("object"));

val rdd = sc.parallelize(data)
val df = sqlContext.createDataFrame(rdd, data.apply(0).schema)
df.show()

 

출처 : ourcstory.tistory.com/165?category=630696