Elasticsearch - Aggregation API(엘라스틱서치 집계,파이프라인(Pipeline Aggregations) 집계) -3
파이프라인 집계(Pipeline Aggregations)는 다른 집계와 달리 쿼리 조건에 부합하는 문서에 대해 집계를 수행하는 것이 아니라, 다른 집계로 생성된 버킷을 참조해서 집계를 수행한다. 집계 또는 중첩된 집계를 통해 생성된 버킷을 사용해 추가적으로 계산을 수행한다고 보면 된다. 파이프라인 집계에는 부모(Parent), 형제(Sibling)라는 두 가지 유형이 있다.
파이프라인 집계를 수행할 때는 buckets_path 파라미터를 사용해 참조할 집계의 경로를 지정함으로써 체인 형식으로 집계 간의 연산이 이뤄진다. 파이프라인 집계는 모든 집계가 완료된 후에 생성된 버킷을 사용하기 때문에 하위 집계를 가질 수는 없지만 다른 파이프라인 집계와는 buckets_path를 통해 참조하도록 지정할 수 있다.
-형제 집계(Sibling)
형제 집계는 동일 선상의 위치에서 수행되는 새 집계를 의미한다. 즉, 형제 집계를 통해 수행되는 집계는 기존 버킷에 추가되는 형태가 아니라 동일 선상의 위치에서 새 집계가 생성되는 파이프라인 집계다. 형제 집계는 다음과 같은 집계들이 있다.
평균 버킷 집계(Avg Bucket Aggregation) |
최대 버킷 집계(Max Bucket Aggregation) |
최소 버킷 집계(Min Bucket Aggregation) |
합계 버킷 집계(Sum Bucket Aggregation) |
통계 버킷 집계(Stats Bucket Aggregation) |
확장 통계 버킷 집계(Extended Stats Bucket Aggregation) |
백분위수 버킷 집계(Percentiles Bucket Aggregation) |
이동 평균 집계(Moving Average Aggregation) |
{
"aggs":{
"histo":{
"date_histogram":{
"field":"timestamp",
"interval":"day"
},
"aggs":{ //중첩 집계
"bytes_sum":{
"sum":{
"field":"bytes"
}
}
}
},
"max_bytes":{ //파이프라인 집계
"max_bucket":{
"buckets_path":"histo>bytes_sum" //버킷 참조
}
}
}
}
->result
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 10001,
"max_score": 0.0,
"hits": []
},
"aggregations": {
"histo": {
"buckets": [
{
"key_as_string": "2015-05-17T00:00:00.000Z",
"key": 1431820800000,
"doc_count": 1632,
"bytes_sum": {
"value": 4.14259902E8
}
},
{
"key_as_string": "2015-05-18T00:00:00.000Z",
"key": 1431907200000,
"doc_count": 2893,
"bytes_sum": {
"value": 7.88636158E8
}
},
{
"key_as_string": "2015-05-19T00:00:00.000Z",
"key": 1431993600000,
"doc_count": 2896,
"bytes_sum": {
"value": 6.65827339E8
}
},
{
"key_as_string": "2015-05-20T00:00:00.000Z",
"key": 1432080000000,
"doc_count": 2578,
"bytes_sum": {
"value": 8.78559106E8
}
}
]
},
"max_bytes": {
"value": 8.78559106E8,
"keys": [
"2015-05-20T00:00:00.000Z"
]
}
}
}
위는 간단하게 2개의 집계를 중첩하였고, 형제 레벨로 histo 집계 밑의 bytes_sum의 버킷을 참조하여 최대값을 구하는 파이프라인 집계를 작성한 것이다. 결과값으로는 중첩된 집계결과와 마지막에 파이프라인의 집계가 나온다. 현재는 bytes_sum이 단일 메트릭 집계이기 때문에 집계 이름으로만 참조하고 있지만 stats 같은 다중 메트릭 집계일 경우 메트릭명까지 참조해줘야 한다. histo>bytes_sum.avg
Additional Sibling Aggregations
파이프라인 집계명 | 집계 쿼리 |
최대 버킷 집계 - 최대 값으로 버킷을 식별하고 버킷의 값과 키를 출력하는 형제 파이프 라인 집계입니다. 지정된 메트릭은 숫자 여야하고 형제 집계는 다중 버킷 집계 여야합니다. |
{ "max_bucket":{ "bucket_path":"histo>bytes_sum" } } |
최소 버킷 집계 - 최소값으로 버킷을 식별하고 버킷의 값과 키를 모두 출력하는 형제 파이프 라인 집계이다. 지정된 메트릭은 숫자 여야하고 형제 집계는 다중 버킷 집계 여야한다. |
{ "min_bucket":{ "bucket_path":"histo>bytes_sum" } } |
평균 버킷 집계 - 지정된 메트릭의 평균 값을 계산하는 파이프 라인 집계이다. 지정된 메트릭은 숫자 여야하고 형제 집계는 다중 버킷 집계여야한다. |
{ "avg_bucket":{ "bucket_path":"histo>bytes_sum" } } |
통계 버킷 집계 - 모든 버킷에 대한 다양한 통계를 계산하는 형제 파이프 라인 집계이다. 지정된 메트릭은 숫자 여야하고 형제 집계는 다중 버킷 집계 여야한다. |
{ "stats_bucket":{ "bucket_path":"histo>bytes_sum" } } |
확장 통계 버킷 집계 -
모든 버킷에 대한 다양한 통계를 계산하는 형제 파이프 라인 집계이다. 지정된 메트릭은 숫자 여야하고 형제 집계는 다중 버킷 집계 여야한다. 이 집계는 집계와 비교하여 몇 가지 통계 (제곱합, 표준 편차 등)를 제공한다. |
{ "extended_stats_bucket":{ "bucket_path":"histo>bytes_sum" } } |
백분위수 버킷 집계 - 모든 버킷에서 백분위 수를 계산하는 형제 파이프 라인 집계이다.. 지정된 메트릭은 숫자 여야하고 형제 집계는 다중 버킷 집계 여야한다. |
{ "percentiles_bucket":{ "bucket_path":"histo>bytes_sum" } } |
이동 평균 버킷 집계 - 이동 평균은 순차 데이터를 부드럽게하는 간단한 방법이다. 이동 평균은 일반적으로 주가 또는 서버 메트릭과 같은 시간 기반 데이터에 적용이다. 평활화는 고주파수 변동 또는 랜덤 노이즈를 제거하는 데 사용될 수 있으므로 계절 성과 같이 저주파수 추세를보다 쉽게 시각화 할 수 있다. 다양한 옵션이 있으므로 자세한 사용법은 레퍼런스를 참고하자. >https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-pipeline-movavg-aggregation.html |
{ "moving_avg_bucket":{ "bucket_path":"histo>bytes_sum" } } |
이동 평균 버킷 집계는 부모에 histogram 혹은 date_histogram 집계가 있어야 한다.
{
"aggs":{
"histo":{
"date_histogram":{
"field":"timestamp",
"interval":"day"
},
"aggs":{
"bytes_sum":{
"sum":{
"field":"bytes"
}
},
"moving_avg_agg":{
"moving_avg":{
"buckets_path":"bytes_sum"
}
}
}
}
}
}
->result
{
"took": 7,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 10001,
"max_score": 0.0,
"hits": []
},
"aggregations": {
"histo": {
"buckets": [
{
"key_as_string": "2015-05-17T00:00:00.000Z",
"key": 1431820800000,
"doc_count": 1632,
"bytes_sum": {
"value": 4.14259902E8
}
},
{
"key_as_string": "2015-05-18T00:00:00.000Z",
"key": 1431907200000,
"doc_count": 2893,
"bytes_sum": {
"value": 7.88636158E8
},
"moving_avg_agg": {
"value": 4.14259902E8
}
},
{
"key_as_string": "2015-05-19T00:00:00.000Z",
"key": 1431993600000,
"doc_count": 2896,
"bytes_sum": {
"value": 6.65827339E8
},
"moving_avg_agg": {
"value": 6.0144803E8
}
},
{
"key_as_string": "2015-05-20T00:00:00.000Z",
"key": 1432080000000,
"doc_count": 2578,
"bytes_sum": {
"value": 8.78559106E8
},
"moving_avg_agg": {
"value": 6.229077996666666E8
}
}
]
}
}
}
-부모 집계(Parent)
부모 집계는 집계를 통해 생성된 버킷을 사용해 계산을 수행하고, 그 결과를 기존 집계 결과에 반영한다. 집계의 종류는 아래와 같다.
파생 집계(Derivative Aggregation) |
누적 집계(Cumulative Sum Aggregation) |
버킷 스크립트 집계(Bucket Script Aggregation) |
버킷 셀렉터 집계(Bucket Selector Aggregation) |
시계열 차분 집계(Serial Differencing Aggregation) |
아파치 웹 로그 등을 통해 수집된 데이터가 시간이 지남에 따라 변화하는 값의 변경폭 추이를 확인하고 싶은 경우 파생 집계를 활용할 수 있다. 파생 집계는 부모 히스토그램 또는 날짜 히스토그램 집계에서 지정된 메트릭의 파생값을 계산하는 상위 파이프라인 집계다. 이는 부모 히스토그램 집계 측정 항목에 대해 작동하고, 히스토그램 집계에 의한 각 버킷의 집계 값을 비교해서 차이를 계산한다. 반드시 지정된 메트릭은 숫자여야 하고, 상위에 해당하는 집계의 min_doc_count가 0보다 큰 값으로 설정되는 경우 일부 간격이 결과에서 생략될 수 있기에 min_doc_count 값을 0으로 설정해야 한다.
파생 집계의 경우에는 이처럼 선행되는 데이터가 존재하지 않으면 집계를 수행할 수 없는데, 실제 데이터를 다루다 보면 종종 노이즈가 포함되기도 하고, 필요한 필드에 값이 존재하지 않을 수 있다. 이러한 부분을 갭(Gap)이라고 할 수 있는데, 쉽게 말해 데이터가 존재하지 않는 부분을 의미한다.
갭(gap)이 발생되는 이유는 아래와 같다.
- 어느 하나의 버킷 안으로 포함되는 문서들에 요청된 필드가 포함되지 않은 경우
- 하나 이상의 버킷에 대한 쿼리와 일치하는 문서가 존재하지 않는 경우
- 다른 종속된 버킷에 값이 누락되어 계산된 메트릭이 값을 생성할 수 없는 경우
이러한 경우에는 파이프라인 집계에 원하는 동작을 알리는 메커니즘이 필요하다. 이 역할을하는 것이 갭 정책(gap_policy)이다. 모든 파이프라인 집계에서는 gap_policy 파라미터를 허용한다.
<갭정책>
skip | 누락된 데이터를 버킷이 존재하지 않는 것으로 간주한다. 버킷을 건너뛰고 다음으로 사용 가능한 값을 사용해 계산을 계속해서 수행한다. |
insert_zeros | 누락된 값을 0으로 대체하며 파이프라인 집계 계산은 정상적으로 진행된다. |
{
"aggs":{
"histo":{
"date_histogram":{
"field":"timestamp",
"interval":"day"
},
"aggs":{
"bytes_sum":{
"sum":{
"field":"bytes"
}
},
"sum_deriv":{
"derivative":{
"buckets_path":"bytes_sum"
}
}
}
}
}
}
->result
{
"took": 5,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 10001,
"max_score": 0.0,
"hits": []
},
"aggregations": {
"histo": {
"buckets": [
{
"key_as_string": "2015-05-17T00:00:00.000Z",
"key": 1431820800000,
"doc_count": 1632,
"bytes_sum": {
"value": 4.14259902E8
}
},
{
"key_as_string": "2015-05-18T00:00:00.000Z",
"key": 1431907200000,
"doc_count": 2893,
"bytes_sum": {
"value": 7.88636158E8
},
"sum_deriv": {
"value": 3.74376256E8
}
},
{
"key_as_string": "2015-05-19T00:00:00.000Z",
"key": 1431993600000,
"doc_count": 2896,
"bytes_sum": {
"value": 6.65827339E8
},
"sum_deriv": {
"value": -1.22808819E8
}
},
{
"key_as_string": "2015-05-20T00:00:00.000Z",
"key": 1432080000000,
"doc_count": 2578,
"bytes_sum": {
"value": 8.78559106E8
},
"sum_deriv": {
"value": 2.12731767E8
}
}
]
}
}
}
결과를 보면 파생 집계는 각 버킷 간의 차이를 값으로 보여준다. 첫번째 버킷은 이전 데이터가 존재하지 않으므로 파생 집계 결과가 포함되지 않는다.
Addtional Parent Aggregations
파이프라인 집계명 | 쿼리 |
파생 집계 - 상위 히스토그램 (또는 date_histogram) 집계에서 지정된 메트릭의 미분을 계산하는 상위 파이프 라인 집계이다. 지정된 메트릭은 숫자 여야하며 둘러싸는 막대 그래프는 ( 집계의 기본값 ) 으로 min_doc_count이 0으로 설정되어 있어야한다. |
{ "derivative":{ "buckets_path":"bytes_sum" } } |
누적 집계 - 위 히스토그램 (또는 date_histogram) 집계에서 지정된 지표의 누적 합계를 계산하는 상위 파이프 라인 집계입니다. 지정된 메트릭은 숫자 여야하며 둘러싸는 막대 그래프는 ( 집계의 기본값 ) 으로 min_doc_count이 0으로 설정되어 있어야합니다 . |
{ "cumulative_sum":{ "buckets_path":"bytes_sum" } } |
버킷 스크립트 집계 - 부모 다중 버킷 집계에서 지정된 메트릭에 대해 버킷 당 계산을 수행 할 수있는 스크립트를 실행하는 부모 파이프 라인 집계입니다. 지정된 메트릭은 숫자 여야하며 스크립트는 숫자 값을 반환해야합니다. |
{ "bucket_script":{ "buckets_path":{ "my_var1":"bytes_sum", "my_var2":"total_count" }, "script":"params.my_var1/params.my_var2 } } |
버킷 셀렉터 집계 - 현재 버킷을 상위 멀티 버킷 집계에 유지할지 여부를 결정하는 스크립트를 실행하는 상위 파이프 라인 집계입니다. 지정된 메트릭은 숫자 여야하며 스크립트는 부울 값을 반환해야합니다. 스크립트 언어 인 경우 expression숫자 반환 값이 허용됩니다. 이 경우 0.0은 그대로 평가되고 false 다른 모든 값은 true로 평가됩니다. |
{ "bucket_selector":{ "buckets_path":{ "my_var1":"bytes_sum", "my_var2":"total_count" }, "script":"params.my_var1 > params.my_var2 } } |
시계열 차분 집계 - 시계열의 값을 다른 시차 또는 기간에 차감하는 기술입니다. 예를 들어, 데이터 포인트 f (x) = f (x t )-f (x t-n ), 여기서 n은 사용되는 기간입니다. |
{ "serial_diff":{ "buckets_path":"bytes_sum", "lag":"7" } } |
자세한 설명은 공식 레퍼런스를 참고하시길 바랍니다.
누적 집계
{
"aggs":{
"histo":{
"date_histogram":{
"field":"timestamp",
"interval":"day"
},
"aggs":{
"bytes_sum":{
"sum":{
"field":"bytes"
}
},
"sum_deriv":{
"cumulative_sum":{
"buckets_path":"bytes_sum"
}
}
}
}
}
}
->result
{
"took": 4,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 10001,
"max_score": 0.0,
"hits": []
},
"aggregations": {
"histo": {
"buckets": [
{
"key_as_string": "2015-05-17T00:00:00.000Z",
"key": 1431820800000,
"doc_count": 1632,
"bytes_sum": {
"value": 4.14259902E8
},
"sum_deriv": {
"value": 4.14259902E8
}
},
{
"key_as_string": "2015-05-18T00:00:00.000Z",
"key": 1431907200000,
"doc_count": 2893,
"bytes_sum": {
"value": 7.88636158E8
},
"sum_deriv": {
"value": 1.20289606E9
}
},
{
"key_as_string": "2015-05-19T00:00:00.000Z",
"key": 1431993600000,
"doc_count": 2896,
"bytes_sum": {
"value": 6.65827339E8
},
"sum_deriv": {
"value": 1.868723399E9
}
},
{
"key_as_string": "2015-05-20T00:00:00.000Z",
"key": 1432080000000,
"doc_count": 2578,
"bytes_sum": {
"value": 8.78559106E8
},
"sum_deriv": {
"value": 2.747282505E9
}
}
]
}
}
}
집계의 값을 계속 해서 누적하는 집계이다.
버킷 스크립트 집계
{
"aggs":{
"histo":{
"date_histogram":{
"field":"timestamp",
"interval":"day"
},
"aggs":{
"bytes_sum":{
"sum":{
"field":"bytes"
}
},
"sum_deriv":{
"bucket_script":{
"buckets_path":{
"my_var1":"bytes_sum",
"my_var2":"bytes_sum"
},
"script":"params.my_var1/params.my_var2"
}
}
}
}
}
}
->result
{
"took": 106,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 10001,
"max_score": 0.0,
"hits": []
},
"aggregations": {
"histo": {
"buckets": [
{
"key_as_string": "2015-05-17T00:00:00.000Z",
"key": 1431820800000,
"doc_count": 1632,
"bytes_sum": {
"value": 4.14259902E8
},
"sum_deriv": {
"value": 1.0
}
},
{
"key_as_string": "2015-05-18T00:00:00.000Z",
"key": 1431907200000,
"doc_count": 2893,
"bytes_sum": {
"value": 7.88636158E8
},
"sum_deriv": {
"value": 1.0
}
},
{
"key_as_string": "2015-05-19T00:00:00.000Z",
"key": 1431993600000,
"doc_count": 2896,
"bytes_sum": {
"value": 6.65827339E8
},
"sum_deriv": {
"value": 1.0
}
},
{
"key_as_string": "2015-05-20T00:00:00.000Z",
"key": 1432080000000,
"doc_count": 2578,
"bytes_sum": {
"value": 8.78559106E8
},
"sum_deriv": {
"value": 1.0
}
}
]
}
}
}
스크립트를 활용해 버킷의 결과 값을 스크립트로 조작가능하다.
버킷 셀렉터
{
"aggs":{
"histo":{
"date_histogram":{
"field":"timestamp",
"interval":"day"
},
"aggs":{
"bytes_sum":{
"sum":{
"field":"bytes"
}
},
"sum_deriv":{
"bucket_selector":{
"buckets_path":{
"my_var1":"bytes_sum",
"my_var2":"bytes_sum"
},
"script":"params.my_var1 < params.my_var2"
}
}
}
}
}
}
->result
{
"took": 7,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 10001,
"max_score": 0.0,
"hits": []
},
"aggregations": {
"histo": {
"buckets": []
}
}
}
불린을 결과값으로 하는 script를 작성해 결과에 노출시킬 버킷을 선택한다.
시계열 차분 집계
{
"aggs":{
"histo":{
"date_histogram":{
"field":"timestamp",
"interval":"day"
},
"aggs":{
"bytes_sum":{
"sum":{
"field":"bytes"
}
},
"thirtieth_difference":{
"serial_diff":{
"buckets_path":"bytes_sum",
"lag":2
}
}
}
}
}
}
->result
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 10001,
"max_score": 0.0,
"hits": []
},
"aggregations": {
"histo": {
"buckets": [
{
"key_as_string": "2015-05-17T00:00:00.000Z",
"key": 1431820800000,
"doc_count": 1632,
"bytes_sum": {
"value": 4.14259902E8
}
},
{
"key_as_string": "2015-05-18T00:00:00.000Z",
"key": 1431907200000,
"doc_count": 2893,
"bytes_sum": {
"value": 7.88636158E8
}
},
{
"key_as_string": "2015-05-19T00:00:00.000Z",
"key": 1431993600000,
"doc_count": 2896,
"bytes_sum": {
"value": 6.65827339E8
},
"thirtieth_difference": {
"value": 2.51567437E8
}
},
{
"key_as_string": "2015-05-20T00:00:00.000Z",
"key": 1432080000000,
"doc_count": 2578,
"bytes_sum": {
"value": 8.78559106E8
},
"thirtieth_difference": {
"value": 8.9922948E7
}
}
]
}
}
}
lag 값을 2로 주어 2번째 전 버킷과 현재 버킷의 차분을 계산하여 결과값에 포함시킨다.
여기까지 간단히 파이프라인 집계에 대해 다루어보았다. 사실 집계의 모든 것을 다 다루지는 못했다. 부족한 것은 공식레퍼런스와 서적을 더 참고해야겠다.
출처: https://coding-start.tistory.com/292?category=757916 [코딩스타트]
'Elastic Stack > ElasticSearch' 카테고리의 다른 글
색인 종류 (색인, 역색인) (2) | 2023.02.18 |
---|---|
Elasticsearch - 클러스터, 샤드, 인덱스 상태 확인하기 (0) | 2021.04.19 |
Elasticsearch - Elasticsearch custom docker image 빌드(엘라스틱서치 커스텀 도커 이미지 생성) (0) | 2021.04.19 |
Elasticsearch - 한글 자동완성(Nori Analyzer, Ngram, Edge Ngram) (0) | 2021.04.19 |
Elasticsearch - Aggregation API(엘라스틱서치 집계,버킷(Bucket Aggregations) 집계) -2 (0) | 2021.04.19 |
Elasticsearch - Aggregation API(엘라스틱서치 집계,메트릭(Metric Aggregations) 집계) -1 (0) | 2021.04.19 |
Elasticsearch - Rest High Level Client를 이용한 Index Template 생성 (0) | 2021.04.19 |
ELK Stack - Logstash(로그스태시)를 이용한 로그 수집 (0) | 2021.04.19 |