[Spark] timestamp에서 날짜 추출하는 방법 (month, day, hour)
들어가며
데이터 분석에서 가장 많이, 그리고 자주 사용하는 field가 timestamp의 값이 아닐까 싶다. 그 중에서도 timestamp의 raw데이터를 통해서 새로운 feature를 생성하는 방법이 있다. 예를 들어 날짜별 사용자 방문 수 라고 할때, timestamp의 값에서 우리는 month와 day를 추출해서 새로운 feature의 값으로 사용을 해야 한다.
소스코드
DataFrame의 데이터 타입인 df에서 select의 함수를 통해서 원하는 Column을 추출을 하는 과정이다. timestamp_milli의 값은 ms의 단위의 값이기 때문에 1000을 나눈뒤에 spark에서 제공하는 functions의 라이브러리를 import한 뒤에 사용하면 된다. from_unixtime을 통해 date형태의 String값으로 변환을 한 뒤에 month, dayofmonth, hour을 통해서 월, 일, 시간을 추출을 할 수 있다.
org.apache.spark.sql.functions함수 확인하기
import org.apache.spark.sql.functions._
// DataFrame Column | timestamp_milli |
scala> df.select(col("timestamp_milli")).show()
+---------------+ │starbucks@cluster05:~$
|timestamp_milli| │starbucks@cluster05:~$ ls
+---------------+ │data Desktop download elephant-bird learning-spark Music projects ScalaPB spark-protobuf Videos
| 1468335081249| │derby.log Documents Downloads hadoop-2.6.3 metastore_db Pictures Public spark-1.6.1-bin-hadoop2.6 Templates
| 1468335081249| │starbucks@cluster05:~$ cd spark-1.6.1-bin-hadoop2.6/
| 1468335081983| │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6$ ls
| 1468335082019| │357146072594237_LGF700Led46bb54_20160611_1.0.4_1465677311952.log classes ec2 licenses python RELEASE
| 1468335082052| │357146072594849_LGF700L78a4d982_20160612_1.0.4_1465760998685.log conf examples metastore_db R sbin
| 1468335082086| │bin data lib NOTICE ranks script.py
| 1468335082123| │CHANGES.txt derby.log LICENSE projects README.md SimpleApp
| 1468335082161| │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6$ cd projects/
| 1468335082203| │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects$ ls
| 1468335082242| │ProtoBuf WordCount
| 1468335086042| │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects$ mkdir LoadJson
| 1468335119570| │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects$ cd LoadJson/
| 1468335119570| │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ ls
| 1468335120250| │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ vi load.scala
| 1468335120283| │
| 1468335120317| │No manual entry for val
| 1468335120362| │
| 1468335120395| │shell returned 16
| 1468335120438| │
| 1468335120490| │Press ENTER or type command to continue
+---------------+ │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ vi ~/.vimrc
only showing top 20 rows
scala> df.select(from_unixtime($"timestamp_milli"/1000)).show() │
+----------------------------------------------------------+ │
|fromunixtime((timestamp_milli / 1000),yyyy-MM-dd HH:mm:ss)| │// GPU BUSY
+----------------------------------------------------------+ │val log_source_type = sqlContext.sql("select log_data_entry.log_source_type from log_data_entry")
| 2016-07-12 23:51:21| │val log_data = sqlContext.sql("select log_data_entry.log_source_type, log_data_entry.timestamp_milli, log_data_entry.log_data from log_data_ent
| 2016-07-12 23:51:21| │ry where log_data_entry.log_source_type == 3")
| 2016-07-12 23:51:21| │
| 2016-07-12 23:51:22| │log_data.registerTempTable("log_data")
| 2016-07-12 23:51:22| │val log_data_type = sqlContext.sql("select log_data.log_data_type from log_data")
| 2016-07-12 23:51:22| │val data = sqlContext.sql("select log_data.integer_data from log_data")
| 2016-07-12 23:51:22| │
| 2016-07-12 23:51:22| │
| 2016-07-12 23:51:22| │// Accessilibilty Event Data
| 2016-07-12 23:51:22| │val log = sqlContext.jsonFile("/home/herbbox/aura/data/performance_json/357146072592249_LGF700Laf019783_20160712_1.0.4_1468352197410.json")
| 2016-07-12 23:51:26| │log.registerTempTable("log")
| 2016-07-12 23:51:59| │val log_data_entry = sqlContext.sql("select explode(log_data_entry) as log_data_entry from log")
| 2016-07-12 23:51:59| │log_data_entry.registerTempTable("log_data_entry")
| 2016-07-12 23:52:00| │
| 2016-07-12 23:52:00| │
| 2016-07-12 23:52:00| │case class CustomRow(imei:String, timestamp_milli:Long, packageName:String, sumPower:Double, cpuFgTimeMs:Long, cpuTimeMs:Long)
| 2016-07-12 23:52:00| │
| 2016-07-12 23:52:00| │
| 2016-07-12 23:52:00| │val log_data = sqlContext.sql("select log_data_entry.log_source_type, log_data_entry.timestamp_milli, log_data_entry.log_data from log_data_ent
| 2016-07-12 23:52:00| │ry where log_data_entry.log_source_type == 10 or log_data_entry.log_source_type == 98")
+----------------------------------------------------------+
scala> df.select(month(from_unixtime($"timestamp_milli"/1000)).as("month"), dayofmonth(from_unixtime($"timestamp_milli"/1000)).as("day"), h│log.registerTempTable("log")
our(from_unixtime($"timestamp_milli"/1000)).as("hour")).show()
+-----+---+----+ │ProtoBuf WordCount
|month|day|hour| │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects$ mkdir LoadJson
+-----+---+----+ │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects$ cd LoadJson/
| 7| 12| 23| │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ ls
| 7| 12| 23| │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ vi load.scala
| 7| 12| 23| │
| 7| 12| 23| │No manual entry for val
| 7| 12| 23| │
| 7| 12| 23| │shell returned 16
| 7| 12| 23| │
| 7| 12| 23| │Press ENTER or type command to continue
| 7| 12| 23| │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ vi ~/.vimrc
| 7| 12| 23| │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ vi load.scala
| 7| 12| 23| │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ vi tweet.scala
| 7| 12| 23| │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$
| 7| 12| 23| │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$
| 7| 12| 23| │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ vi load.scala
| 7| 12| 23| │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$
| 7| 12| 23| │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$
| 7| 12| 23| │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$
| 7| 12| 23| │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ sudo apt-get install vim
| 7| 12| 23| │[sudo] password for starbucks:
| 7| 12| 23| │Reading package lists... Done
+-----+---+----+ │Building dependency tree
only showing top 20 rows
'Big Data > Apache Spark' 카테고리의 다른 글
[Spark] Pyspark에서 NoneType Filtering 하는 방법 (0) | 2021.05.06 |
---|---|
[Spark] Pyspark NoneType, null, blank, empty string 필터링 하는 방법 (0) | 2021.05.06 |
[Spark] 스파크에서 외부 라이브러리 사용하는 방법 (0) | 2021.05.06 |
[Spark] Row의 Seq Collection으로 RDD, DataFrame 생성하기 (0) | 2021.05.06 |
[Spark] Json 포맷 처리하기 - Json Object List (0) | 2021.05.06 |
[Spark] 파티셔닝 예제 - 페이지랭크(PageRank) 알고리즘 (0) | 2021.05.06 |
[Spark] RDD데이터 파티셔닝 - 이론 및 예제 (0) | 2021.05.06 |
[Spark] RDD 영속화(캐싱) - 이론 및 예제 (0) | 2021.05.06 |