[Spark] timestamp에서 날짜 추출하는 방법 (month, day, hour)

2021. 5. 6. 02:03 Big Data/Apache Spark

들어가며

  데이터 분석에서 가장 많이, 그리고 자주 사용하는 field가 timestamp의 값이 아닐까 싶다. 그 중에서도 timestamp의 raw데이터를 통해서 새로운 feature를 생성하는 방법이 있다. 예를 들어 날짜별 사용자 방문 수 라고 할때, timestamp의 값에서 우리는 month와 day를 추출해서 새로운 feature의 값으로 사용을 해야 한다. 

소스코드

  DataFrame의 데이터 타입인 df에서 select의 함수를 통해서 원하는 Column을 추출을 하는 과정이다. timestamp_milli의 값은 ms의 단위의 값이기 때문에 1000을 나눈뒤에 spark에서 제공하는 functions의 라이브러리를 import한 뒤에 사용하면 된다. from_unixtime을 통해 date형태의 String값으로 변환을 한 뒤에 month, dayofmonth, hour을 통해서 월, 일, 시간을 추출을 할 수 있다. 

org.apache.spark.sql.functions함수 확인하기

import org.apache.spark.sql.functions._

// DataFrame Column | timestamp_milli |
scala> df.select(col("timestamp_milli")).show()
+---------------+                                                                                                                           │starbucks@cluster05:~$ 
|timestamp_milli|                                                                                                                           │starbucks@cluster05:~$ ls
+---------------+                                                                                                                           │data       Desktop    download   elephant-bird  learning-spark  Music     projects  ScalaPB                    spark-protobuf  Videos
|  1468335081249|                                                                                                                           │derby.log  Documents  Downloads  hadoop-2.6.3   metastore_db    Pictures  Public    spark-1.6.1-bin-hadoop2.6  Templates
|  1468335081249|                                                                                                                           │starbucks@cluster05:~$ cd spark-1.6.1-bin-hadoop2.6/
|  1468335081983|                                                                                                                           │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6$ ls
|  1468335082019|                                                                                                                           │357146072594237_LGF700Led46bb54_20160611_1.0.4_1465677311952.log  classes    ec2       licenses      python     RELEASE
|  1468335082052|                                                                                                                           │357146072594849_LGF700L78a4d982_20160612_1.0.4_1465760998685.log  conf       examples  metastore_db  R          sbin
|  1468335082086|                                                                                                                           │bin                                                               data       lib       NOTICE        ranks      script.py
|  1468335082123|                                                                                                                           │CHANGES.txt                                                       derby.log  LICENSE   projects      README.md  SimpleApp
|  1468335082161|                                                                                                                           │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6$ cd projects/
|  1468335082203|                                                                                                                           │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects$ ls
|  1468335082242|                                                                                                                           │ProtoBuf  WordCount
|  1468335086042|                                                                                                                           │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects$ mkdir LoadJson
|  1468335119570|                                                                                                                           │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects$ cd LoadJson/
|  1468335119570|                                                                                                                           │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ ls
|  1468335120250|                                                                                                                           │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ vi load.scala
|  1468335120283|                                                                                                                           │
|  1468335120317|                                                                                                                           │No manual entry for val
|  1468335120362|                                                                                                                           │
|  1468335120395|                                                                                                                           │shell returned 16
|  1468335120438|                                                                                                                           │
|  1468335120490|                                                                                                                           │Press ENTER or type command to continue
+---------------+                                                                                                                           │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ vi ~/.vimrc 
only showing top 20 rows     

scala> df.select(from_unixtime($"timestamp_milli"/1000)).show()                                                                            │
+----------------------------------------------------------+                                                                                │
|fromunixtime((timestamp_milli / 1000),yyyy-MM-dd HH:mm:ss)|                                                                                │// GPU BUSY
+----------------------------------------------------------+                                                                                │val log_source_type = sqlContext.sql("select log_data_entry.log_source_type from log_data_entry")
|                                       2016-07-12 23:51:21|                                                                                │val log_data = sqlContext.sql("select log_data_entry.log_source_type, log_data_entry.timestamp_milli, log_data_entry.log_data from log_data_ent
|                                       2016-07-12 23:51:21|                                                                                │ry where log_data_entry.log_source_type == 3")
|                                       2016-07-12 23:51:21|                                                                                │
|                                       2016-07-12 23:51:22|                                                                                │log_data.registerTempTable("log_data")
|                                       2016-07-12 23:51:22|                                                                                │val log_data_type = sqlContext.sql("select log_data.log_data_type from log_data")
|                                       2016-07-12 23:51:22|                                                                                │val data = sqlContext.sql("select log_data.integer_data from log_data")
|                                       2016-07-12 23:51:22|                                                                                │
|                                       2016-07-12 23:51:22|                                                                                │
|                                       2016-07-12 23:51:22|                                                                                │// Accessilibilty Event Data 
|                                       2016-07-12 23:51:22|                                                                                │val log = sqlContext.jsonFile("/home/herbbox/aura/data/performance_json/357146072592249_LGF700Laf019783_20160712_1.0.4_1468352197410.json")
|                                       2016-07-12 23:51:26|                                                                                │log.registerTempTable("log")
|                                       2016-07-12 23:51:59|                                                                                │val log_data_entry = sqlContext.sql("select explode(log_data_entry) as log_data_entry from log")
|                                       2016-07-12 23:51:59|                                                                                │log_data_entry.registerTempTable("log_data_entry")
|                                       2016-07-12 23:52:00|                                                                                │
|                                       2016-07-12 23:52:00|                                                                                │
|                                       2016-07-12 23:52:00|                                                                                │case class CustomRow(imei:String, timestamp_milli:Long, packageName:String, sumPower:Double, cpuFgTimeMs:Long, cpuTimeMs:Long)
|                                       2016-07-12 23:52:00|                                                                                │
|                                       2016-07-12 23:52:00|                                                                                │
|                                       2016-07-12 23:52:00|                                                                                │val log_data = sqlContext.sql("select log_data_entry.log_source_type, log_data_entry.timestamp_milli, log_data_entry.log_data from log_data_ent
|                                       2016-07-12 23:52:00|                                                                                │ry where log_data_entry.log_source_type == 10 or log_data_entry.log_source_type == 98")
+----------------------------------------------------------+  


scala> df.select(month(from_unixtime($"timestamp_milli"/1000)).as("month"), dayofmonth(from_unixtime($"timestamp_milli"/1000)).as("day"), h│log.registerTempTable("log")
our(from_unixtime($"timestamp_milli"/1000)).as("hour")).show()

+-----+---+----+                                                                                                                            │ProtoBuf  WordCount
|month|day|hour|                                                                                                                            │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects$ mkdir LoadJson
+-----+---+----+                                                                                                                            │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects$ cd LoadJson/
|    7| 12|  23|                                                                                                                            │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ ls
|    7| 12|  23|                                                                                                                            │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ vi load.scala
|    7| 12|  23|                                                                                                                            │
|    7| 12|  23|                                                                                                                            │No manual entry for val
|    7| 12|  23|                                                                                                                            │
|    7| 12|  23|                                                                                                                            │shell returned 16
|    7| 12|  23|                                                                                                                            │
|    7| 12|  23|                                                                                                                            │Press ENTER or type command to continue
|    7| 12|  23|                                                                                                                            │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ vi ~/.vimrc 
|    7| 12|  23|                                                                                                                            │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ vi load.scala
|    7| 12|  23|                                                                                                                            │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ vi tweet.scala
|    7| 12|  23|                                                                                                                            │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ 
|    7| 12|  23|                                                                                                                            │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ 
|    7| 12|  23|                                                                                                                            │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ vi load.scala 
|    7| 12|  23|                                                                                                                            │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ 
|    7| 12|  23|                                                                                                                            │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ 
|    7| 12|  23|                                                                                                                            │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ 
|    7| 12|  23|                                                                                                                            │starbucks@cluster05:~/spark-1.6.1-bin-hadoop2.6/projects/LoadJson$ sudo apt-get install vim
|    7| 12|  23|                                                                                                                            │[sudo] password for starbucks: 
|    7| 12|  23|                                                                                                                            │Reading package lists... Done
+-----+---+----+                                                                                                                            │Building dependency tree       
only showing top 20 rows

 

출처 : ourcstory.tistory.com/166?category=630696