[데이터처리] 로그데이터 다루기(2) - 수집 미들웨어 Fluentd란?

2020. 8. 3. 15:42 Big Data/빅데이터

ETL이라는 것은 데이터를 추출, 변확하고 저장소에서 읽게하는 스텝을 뜻한다. 데이터 분석 시스템의 운영에서 가장 중요한 것은 ETL 각각이 정상적으로 동작하는 것이다. Fluentd로 스트림처리에 있어서 ETL을 담당하는 툴인 것이다.

 

Extract

Extract는 데이터 추출의 스텝이다. Fluentd에서는 Input계의 플러그인에 해당한다. 예를 들어 파일이나 HTTP 요청을 소스로 해서 추출할 수 있다. 로그 파일의 경우 파일을 받은 뒤에 각 행이 어떤 데이터를 가지고 있는지를 파악하는게 이 스텝의 역할이다. Nginx의 액세스 로그를 받는 예시에서는 각 필드의 값을 Fluentd가 식별해서 읽는다.

 

Transform

Transform은 데이터 변환을 위한 스텝이다. 추출된 데이터를 저장소에 저장하기 전에 변환한다. Transform을 Extract 단계에서 같이 처리하는 것도 가능하다. Transform에서는 특정의 HTTP 응답의 상태별로 요청을 1분 단위로 카운트해서 저장소에 읽어들이는 케이스도 가능하다. 또한, 로그의 필드에 형태를 부여하거나 특정 조건에서 로그를 필터링하는 등의 처리도 생각해 볼수 있다.

Fluentd에서는 filter 플러그인을 이용함으로써 이 Transform의 사양을 처리하는 것이 가능하다.

 

Load

Load는 데이터를 저장소에 적재하는 스텝이다. Fluentd에서는 out_elasticsearch 플러그인이 이것을 담당하고 있다. Load 스텝에서는 각종 데이터 저장소의 API를 이용해서 적재한다. API가 없는 경우에는 독자적으로 구현해야 할 수도 있다. 스토어에 데이터를 읽어들이게 할 때 네트워크나 디스크의 용량에 문제가 있어 쓰기가 실패할때도 있다. 적절히 retry할 필요가 있다. 또한, 재시도할 경우에 같은 데이터가 두번 써져서 데이터의 중복이 발생하는 경우도 있다. 주의해야하는 점이 여럿 존재하는 스텝이다.

Fluentd는 많은 일을 대신해주는 것을 알 수 있다. 데이터 분석을 지탱해주는 기반을 만들기 위해서는 ETL을 적절히 처리할 수 있어야 한다. Fluentd에서는 Extract는 물론이거니와 풍부한 플러그인으로 Load까지 도와준다.

 

그럼 ETL 처리는 Fluentd만으로 충분할까? Fluentd는 1초당 수만레코드를 넘는 로그라도 문제없이 전송할 수 있다. 다만, 그것은 Load된 저장소가 막히지 않고 쓸 수 있을때의 이야기다. Fluentd에서는 전송 버퍼가 있고, 전송할 곳을 모르는 상태라고 하더라도 Fluentd의 버퍼에 일시적으로 데이터를 보존했다가 나중에 다시 전송하는 것이 가능하다. 하지만, Fluentd에서 보존할 수 있는 버퍼의 양에는 한계가 있다.

 

데이터를 한번에 Load하고 싶은 경우에는 Fluentd가 아니라, 배치처리를 검토하는 편이 좋다. 배치처리에서는 큰 데이터를 한꺼번에 쓰는 데 집중할 수 있다. 배치처리를 하는 경우에도 ETL에 주의하며 해야하는 것은 변함없다. 배치형 벌크 데이터 로더의 Embulk 미들웨어를 살펴보는 것도 좋다.

 

데이터 분석 시스템 가용성 (HA)

데이터 분석 시스템을 계속해서 가동하기 위해서는 어떤식으로 구상을 해야 할까?

 

 

(1) 데이터 멱등성

Fluentd 공식의 고가용성 설정 문서에서도 설명하고 있지만 일반적으로 메시지 송신에는 3가지의 의미가 있다.

 

At most once: 메시지는 바로 송신할 것. 만약 메시지의 송신이 성공했다면 다시는 송신하지 말 것. 다만, 저장소가 꽉 차거나 해서 메시지를 잃어버릴 수가 있음
At least once: 각각의 메시지는 적어도 한번은 송신된다. 실패한 경우에는 메시지는 중복될 수도 있다.
Exactly once: 각각의 메시지를 정확하게 한 번만 전송한다.

서비스에 따라 다를 수 있지만 많은 케이스에서 Exactly once가 요구되는 기능이다. 반드시 메시지가 전송되어야 하고, 중복없이 저장되어야 하기 때문이다. 하지만 Exactly once를 실제로 실현하기 위해서는 많은 비용이 든다. 만약 하나라도 메시지를 잃어버리면 안된다면 동기식 전송을 검토해야 한다. 즉, 예를 들어 로그가 한 줄을 파일에 저장된 것을 확인하면 다음의 메시지의 처리를 시작하고 에러가 발생하면 로그의 입력을 멈추게하는 등의 처리가 필요하다. 하지만 이렇게 하면 throughput(작업시간 당 처리량)이 좋을리가 없다.

 

그렇기 때문에 Fluentd에서는 At most once와 At least once를 지원하고 있다. 비동기로 데이터를 전송하는 것으로 높은 throughput을 실현할 수 있기 때문이다.

 

(2) Exactly once의 실현은 어렵다.

데이터 분석 기반에 있어서 Exactly once가 필요한 케이스도 있다. 그럴 경우에는 Fluentd이 아니라, 다른 툴을 사용하는 편이 좋은 경우가 있다.

 

정합성 테스트
반드시 저장한 결과의 정합성 테스트를 할 수 있도록 한다. 저장한 레코드 수는 물론이고, 저장한 뒤의 각 레코드가 정확한 형식으로 저장되었는지를 확인한다. 저장이 실패한 것을 알았을 때는 실패한 레코드 또는 레코드 집합을 삭제하고 다시 저장한다.
레코드 삭제
- 레코드의 삭제는 레코드별로 ID를 기준으로 실현한다. 다만 삭제조작이 실패한 경우에는. 실패 현황에 따라 다음 조작을 변경하여야 한다.
- 혹은 동기로 저장하기. 각각의 하나의 메시지가 저장된 것을 확인한다.

비동기이면서 분산시스템에서 Exactly once를 실현하는 것은 매우 어려운 과제로 알려져 있다. 애플리케이션에서 Exactly once가 필요한 경우에는 어떻게든 여러 방법을 동원해서 대응할 필요가 있지만, 데이터 분석 기반으로 어디까지 정확한 데이터를 필요로 하는가는 시스템의 설계에 따라 다른다. 1억행 중에 하나의 행에 틀린 데이터가 존재한다고 치명적인 에러가 되는가 아닌가에 따라 시스템의 설계도 달라지게 된다.

 

(3) retry(재시도)

ETL의 각 단계에 따라 처리가 실패할 수 있다는 것을 반드시 대비해야 한다. 예를 들어 Load의 처리는 다음과 같은 몇가지 요인으로 실패할 가능성이 있다.

  1. 쓰기를 할 곳의 데이터 저장소 용량이 꽉 차버렸다.
  2. 쓰기를 할 곳에 네트워크가 끊어졌다.
  3. Fluentd가 동작하고 있는 머신의 전원이 갑자기 끊어졌다.

이런 경우에는 다시 처리를 실행하고 복구할 필요가 있다. Fluentd의 경우는 프로세스가 떠 있으면 재시도를 하지만 사전에 배치처리를 하는 경우에는 재시도하는 구조를 만들던가, 다시 한번 실행할 필요가 있다.

 

(4) 더욱 가용성을 높이기 위해서: 큐를 사용한다.

Fluentd에서도 멱등성과 재시도에 대해서 시도하고 있지만 Fluentd 노드 자체에 장애가 발생하거나, 데이터를 전부 잃어버렸을 때는 Fluentd의 가용성을 높이는 것이 어렵다.

 

예를 들어 AWS의 경우, 스트림 처리용의 메시징 기반으로 Amazon Kinesis가 있다. 이것은 분산 메시지큐라고 하는 시스템이다. 분산 메시지큐를 이용하면 메시지의 다중화가 가능하기 때문에 하나의 노드에서 데이터를 가지는 것보다 데이터의 안전성을 향상할 수 있다. 분산 메시지큐에 넣은 데이터는 거기에서 데이터를 꺼내는 워커가 정기적으로 추출하고 재이용한다. ETL에서 말하는 Transform과 Load는 워커의 일이 되는 것이다.

 

워커의 작업이 실패한 경우라도 분산 메시지큐에 데이터가 저장되어 있다면 재시도할 수가 있다. 만약 워커노드가 다운되었더라도 재시도하는 것은 간단하다. 다만, 어디까지 메시지를 처리했는지 워커노드와는 다른 장소에 보관해 두어야 한다. 분산 큐 시스템은 데이터 복구가 편리한 반면, 관리를 위한 구조가 늘어난다. 시스템이 필요로하는 처리량(Throughput)과 데이터 크기에 따라서 알맞은 전송기반을 선택해야 한다.

 

Elasticsearch를 사용하면 시계열 데이터의 저장은 간단하다. 또한 검색엔진으로 이용하기에도 Elasticsearch에는 전문 검색 엔진으로서 편리한 기능을 가지고 있다. Elasticsearch는 클러스터를 구성한다고 해도 쓰기 속도와 스토리지에 드는 비용을 같이 생각해보면 비용이 많이 드는 경우가 있다. 이런 경우에는 데이터 저장소를 조합하는 것으로 요건을 만족시킬 수 있다.

 

(1) 관리형 데이터 웨어하우스

매일 생성되는 데이터를 저장하면서, 매일 분석태스크를 처리하는 데이터 저장소의 운영은 부담이 크다. 데이터를 분석해서 결과를 내어 놓는 것이 비즈니스라고 한다면 운영/보수에 비용이 드는 것은 주객전도가 된다. 만약 운영에 할당할 리소스와 각 데이터 저장소의 튜닝에 시간을 절약하면서 이 데이터 저장소와 미들웨어를 활용하고 싶을 경우에는 호스팅 서비스를 이용하는 것도 검토해 볼 수 있다.

 

Fluentd를 이용하는 경우라면 Tresure Data가 적합한다. Google Cloud Platform이 제공하는 완전관리형 서비스인 BigQuery는 Dremel이 베이스가 되는 대규모 컬럼지향형 데이터구조에 대해서 쿼리가 가능하다.

 

(2) 스트림 처리

데이터 저장소에도 여러가지 선택지가 있지만 스트림처리에 따라 유연성을 가지도록 해서 해결할 수 있는 문제도 있다. 예를들어 Esper는 스트림처리에 쿼리언어 같은 인터페이스를 제공하고 있다. Esper는 CEP(Complex Event Processing) 엔진이라고 하며, 스트림에 대해서 EPL(Event Processing Language)를 작성하는 것으로 메시지를 처리할 수 있는 소프트웨어이다. CEP 엔진을 Fluentd와 조합해서 사용할땐은 Norika를 사용하는 것이 좋다. 또한 분산 컴퓨팅환경에서의 스트림처리기반으로는 LinkedIn에서 개발한 Apache Kafka를 메시징 기반으로 이용하는 선택지가 있다.

 

AWS를 이용하고 있다면 Apache Kafka 대신에 Amazon Kinesis를 이용할 수도 있다. 또한 스트림처리 중에 이미 가지고 있는 데이터와 조합해서 사용하고 싶은 경우에는 각 노드상의 메모리에서 처리할 수 있게 하던가, 그것도 용도에 맞지 않는다면 Redis나 DynamoDB와 같은 레이턴시가 작은 KVS(Key Value Store)에 처리대상인 데이터를 올리는 것을 검토할 수 있다.

 

데이터 저장소, 서비스, 스트림처리를 나누어 사용하는 것은 다음과 같은 이유에서다.

 

(1) 코스트

첫째는 코스트. 비용때문이다. TCO(Total Cost of Ownership; 총 보유 비용)와 분석에 드는 비용을 모두 말한다. TCO는 데이터의 저장부터 파기할때까지의 필요한 시간과 지출을 의미한다. 데이터가 늘어나면 늘어날수록 일반적으로 데이터의 보유 비용은 늘어난다. 데이터를 유지하기 위한 비용과는 별도로 분석태스크를 위해서 비용이 드는 경우가 있다면, 그것도 비용에 포함된다.

 

(2) 정보의 신선도

두번째는 정보의 신선도이다. 분석 대상이 되는 데이터에서 준실시간으로 결과를 알고싶은지, 어느 정도 시차를 두어도 되는지에 따라 분석방법이 달라진다. 스트림에서의 처리가 필요한 경우도 있다. 또는 배치로 처리해도 좋을지, 아니면 배치가 아니면 처리하지 못할 경우도 있다. 어느 정도로 정보의 신선도가 필요한지에 따라 결정할 수 있다.

 

직전 10초간의 각 IP주소별 요청 수 등을 뽑고 싶을 경우에는 스트림처리가 필요하다. 하지만, 직전 1년간의 고객 상품구매 데이터를 가지고 상품별 관련성을 도출하려는 경우에는 바로 결과를 얻는 것이 어렵다. 분석 방법에 따라 이용되는 데이터를 직전에 이용하기 위해서 준비할 것인가, 아니면 배치처리를 위해서 저장해둘 것인가로 구별하는 것이 여기서 중요하다. 이 논점은 TCO의 관점에서도 유효하다.

 

(3) 확장성(Scalability)

이것은 각각의 스토어에 관한 확장성과 스트림처리의 아키텍처의 확장성 양쪽을 의미한다.

 

(4) 스키마의 유연성

"어떤 타이밍에 데이터의 스키마를 결정할 것인가"라는 점을 다루고 있다. 예를 들어 데이터의 스키마가 처음부터 결정되어 있다면, 쓰기를 할 때 데이터의 스키마가 고정되어 있어도 문제가 없다. 이것은 Schema on write라고 한다. 즉 쓰기를 할때 스키마가 결정되는 방식이다. 관계형 데이터베이스에 데이터를 넣을 때는 테이블이 필요하기 때문에 최초에 테이블을 만들고 데이터를 넣는 것을 말한다.

 

또 다른 방법은 데이터를 읽을때 스키마를 결정하는 방법이다. 이것은 Schema on read라고 한다. Schema on read 방법에서는 데이터를 저장할 때에 스키마를 결정할 필요가 없다. 데이터의 스키마가 자주 변할때는 이 방법이 다루기 좋다. 고객의 행동을 나타내는 로그에 새로운 파라미터가 추가되는 경우는 날마다 기능이 추가되는 제품에서는 자주 있는 일이다. 이런 경우, 로그데이터의 형식을 변경하기 위해 테이블의 형식을 변경하지 않으면 안된다고 한다면 유연하게 로그데이터에 파라미터를 추가하는 것이 불가능하게 된다. 데이터를 읽을 때 데이터의 스키마를 정의할 수 있다면 유연하게 대응할 수 있다.

 

Schema on write에서는 스키마를 변경하기 위해 시간이 걸리지만, 인터렉티브한 쿼리에 대해서 고속으로 답할 수 있는 이점이 있다. 어느쪽의 스키마방식을 선택할 것인가는 데이터의 용도에 따라서 결정하면 좋다.

 

(5) 중간데이터의 유지

데이터를 분석하는 과정에서 한번의 데이터 처리로 최종적으로 필요한 분석 결과를 내는 경우는 거의 없다. 그 전에 분석하기 편하도록 전처리된 데이터를 집계하는 등의 처리를 하는 경우가 많다.

 

예를 들면 어떤 고객 클러스터에 대한 행동의 사전분포를 구축하려는 경우 일단 각각의 행동 로그 데이터를 고객 클러스터별로 어떤 행동을 하는지 뭉쳐서 집계하고 분포구축을 위한 힌트를 얻는다. 행동 로그의 경우에는 시계열 데이터이기 때문에 어떤 기간으로 데이터를 다룰 것인가에 따라서도 생성되는 중간 데이터가 달라지게 된다. 1분 단위로 데이터가 필요한 경우도 있고, 1일 단위로 집계한 데이터를 다루어야 하는 경우도 있다. 그리고, 최종적으로는 행동모델을 도출하기 전에 전단계에서 필요한 데이터를 산출하거나 집계하는 것으로도 데이터는 생겨난다. 이 데이터를 중간 데이터라고 한다. 데이터의 분석처리에서 이 중간 데이터를 어디에 배치하는 가에 따라 처리시간을 단축하고 효율적으로 분석할 수 있다.

 

저장소를 검토할 때는 이런 관점들을 조합해서 판단해야 한다. 만약 장기간 데이터를 유지해야할 필요가 있고, 10테라바이트 정도의 데이터를 매시간 분석대상으로 할 필요가 있다면 Hadoop이나 BigQuery 또는 몇가지의 MPP(Massively Parallel Processing) 데이터베이스가 선택지가 된다. 만약 다음과 같은 요건이 있다면 Elasticsearch가 유효한 선택지이다.

  • 데이터를 어느 정도 기간만 유지해도 된다.
  • 스트림에서 데이터를 처리하면서, 직전 1시간의 1분단위 데이터를 집계할 필요가 있다.
  • 어느정도 확장성을 필요로 한다.

스트림 처리를 한 뒤에 집계데이터만 유지할 필요가 있고, 매초의 집계가 필요한 경우라면 CEP(Complex Event Processing) 엔진과 같은 스트림처리를 하는 아키텍처가 맞다. 실제로는 복수의 데이터베이스와 미들웨어를 조합해서 분석 기반을 구축하는 것이 코스트 효율이 좋고 효과적으로 분석할 수 있는 기반을 만드는 방법이다. 이런 배치처리와 스트림처리를 병용하는 아키텍처, 그중에서도 분산 컴퓨팅 환경에서의 제안은 람다 아키텍처라고 한다.

 

거대한 데이터를 하나의 데이터베이스에 저장하고 거기에서 모든 분석쿼리를 실행하는 것도 가능하지만, 이런 방법에서는 많은 비용이 들게된다. 데이터를 저장하고 처리하기 위한 애플리케이션에는 각각의 특징이 있다. 모든 처리를 하나의 미들웨어에서 하면 비용은 비약적으로 높아지게 된다. 한번에 모든 처리를 하자고 할 것이 아니라, 현재 가지고 있는 데이터에서 비즈니스에 필요한 정보자산의 우선순위가 높은 데이터를 선별할 것, 그리고 어떤 분석수단을 이용할 것인가를 나열한 뒤에 필요한 아키텍처를 하나하나 생각해 보는 것이 데이터 분석 기반을 만드는 첫걸음이 될것이다.

 

데이터는 최종적으로 의사결정에 이용되어야 가치를 가진다. 앞으로도 데이터 저장소와 시각화의 방법은 다양화되고 더욱더 좋은 성능의 소프트웨어가 나올 것이다.

  • 데이터 저장소의 변화. 시계열 데이터 전용 DB의 진화. 분석DB의 고성능화, MPP의 일용품화
  • 데이터 오더의 변화. 기가바이트 → 테라바이트 → 페타바이트. 사내 데이터만이 아니라 퍼블릭데이터를 시작으로 하는 데이터를 조합한 횡단분석
  • 데이터타입의 변화. 비즈니스상 취급하는 로그의 종류가 늘어나고 어떤 로그를 해석할 것인가 판단해야 한다.

이런 경우에도 변함없이 무엇을 가치로 제공할 것인가가 과제가 된다. 나중에 로그 분석이라는 문맥에 대해서도 방법이 다양하게 된다.

 

데이터의 양이 늘어나고 다양화되며 데이터를 다루는 수단도 더욱 큰 데이터를 다루기 쉽도록 진화할 것이다. 이런 경우에도 로그를 다루는 엔지니어링의 기본은 변하지 않는다. 지금 가지고 있는 데이터를 다룰 수 있도록 해서 현황을 파악하고 변화를 읽으며 분석할 수 있도록 환경을 만드는 것이 중요하다.

 

Fluentd는 무슨 미들웨어인가?

Fluentd는 로그 수집 미들웨어이다. 저장 장소가 분산되어 있는 데이터와 로그의 수집을 간단하고 스마트하게 해결해 줌으로써 데이터로부터 가치를 창출하기 위한 비용을 최소화할 수 있다. 기존의 구조로는 해결하기 어려웠던 기능을 Fluentd에서는 많이 제공하고 있다. 데이터의 필터가공, 각종 미들웨어나 API호출, 다양한 파일로 출력을 가능하게 해주는 플러그인으로 생태계가 갖추어져 있다. 로그/메시지 수집 미들웨어인 Fluentd와 다른 미들웨어와의 큰 차이는 플러그인 방식으로인한 범용성에 있다.

 

iPhone/Android 단말의 앱로그를 수집하고 싶다거나 웹애플리케이션에서 로그를 수집하고 싶다거나 라인공장 기계의 센서데이터를 모으고 싶을때, 이런 여러가지 요구에도 Fluentd는 대응할 수 있다. 그 외에도 로그 수집의 목적인 "데이터에서 가치를 만든다"라는 점에서도 각종 비용을 아낄 수 있는 특징을 많이 갖고 있다.

 

소셜게임이나 광고기술 등의 웹서비스 관련기업과 클라우드 기반의 백엔드 등을 중심으로 수천개 이상의 회사에서 도입하고 있고, 선진적인 제조업계의 공장에서 센서 데이터 수집을 하는 장비에 활용하는 등 활용처는 다양한다.

 

(1) 데이터 구조

Fluentd에서 하나의 메시지는 [tag, time, record]라는 3개의 요소로 구성되어 있다. tag는 레코드의 라우팅에 사용하는 문자열, time은 UNIX 타임스탬프로 저장하고, record는 객체형으로 Key-Value 형식의 연관 배열을 저장한다.

 

record는 중첩 구조도 다루긴합니다만 최종적으로 데이터를 저장하는 곳이 지원하지 않는 경우는 플랫 Key-Value형식으로 메시지를 구성하든지 필터처리로 변환할 필요가 있다. 또한, 편의상 각각의 메시지와 그것을 여러개 청크로 구분해야하는 경우를 제외하고는 레코드라고 표현한다. 다음은 Fluentd의 데이터 구조 샘플이다.

[
    "apache.access",
    1402903141,
    {
        "host": "64.233.160.120",
        "user": "-",
        "method": "GET",
        "path": "/article/2000",
        ... 이하 생략
    }
]

 

(2) 아키텍처

Fluentd는 로그/메시지에 임의의 태그를 붙여서 순차수집을 하면서 필터/버퍼/태그 또는 라벨을 사용한 라우팅을 거쳐서 각종 데이터 출력 장소에 보존하는 것을 안정적으로 비동기 처리할 수 있는 아키텍처로 되어 있다. 다음은 기본적인 데이터의 흐름이다.

  • 각종 데이터 소스에서 로그/메시지에 태그를 붙여서 수집한다(Input 플러그인)
  • 필터라고 하는 데이터 가공과 집계 처리를 필요에 따라 실행한다(Filter 플러그인, Filter계 Output 플러그인)
  • 각종 데이터 저장소로 출력한다(Output 플러그인)

전형적인 데이터 흐름으로는 Input 플러그인으로 schemaless한 구조화 메시지를 받는다(혹은 데이터를 추출한다). 필요에 따라 Filter 플러그인과 Filter계 Output 플러그인을 여러개 조합해서 데이터의 가공 처리를 한다. 여기까지 무엇인가 에러가 발생하면 적절한 재시도 처리를 하면서 Output 플러그인을 사용해서 최종적인 데이터 저장소에 보낸다.

 

이 재시도관련 예외처리 기능은 훌륭하다. 예를 들어 파일버퍼 기능을 사용하면 데이터의 저장소에 통신 에러가 발생하더라도 버퍼에 쌓고 재시도하는 구조로 되어 있기 때문에 디스크 용량이나 예외처리 관련 기능을 생략할 수 있다. 또한 높은 성능을 내기 위해서 Fluentd의 코어부분에는 C언어 네이티브로 작성된 cool.io와 MessagePack을 사용하고 있다.

 

이렇게 함으로써 Fluentd는 다양한 로그/메시지를 통합적으로 수집하고 그것을 사용하기 편한 형태로 원하는 장소에 저장하는 기능을 간단하고 스마트하게 구현하고 있다.

 

(3) 배치처리와 다른점

배치를 이용한 데이터 처리가 Fluentd의 스트리밍 데이터 처리와 크게 다른 점은 수집한 데이터를 활용할 수 있을때까지 시간이 오래 걸린다는 점이다. 아래는 각각의 데이터 플로우를 보여준다. 

 

 

짧은 시간 단위로 실행하고 싶어도 로그 로테이트 등의 타이밍이 아니면 구현이 까다로워지기 때문에 부득이하게 하루에 한번 내지는 몇번 정도로 Crontab에서 실행하기도 한다. 배치 처리 중에 큰 단위시간의 데이터를 한 개 이상의 서버에서 수집하기 위해서 로그를 이용할 수 있을때 까지 큰폭의 지연이 있었다. 뿐만 아니라, 취급하는 데이터양이 점점 늘어날수록 네트워크 대역도 부족하게 되고, 때에 따라서 폭주하기도 한다. 이런 배치 처리에 Fluentd를 도입하면 데이터의 실시간 이용이 가능하게 된다.

 

범용성이 높은 Fluentd이지만, CPU 처리 성능이 필요한 비교적 큰 단위의 데이터 처리에는 추천하지 않는다. 예를 들어 FTP에 연결된 대용량 CSV를 마이크로배치화하는 경우에는 Fluentd의 배치 처리만 특화한 embulk라든가 스케줄 관리와 네트워크 플로우 툴인 digdag도 존재한다. Fluentd와 embulk 모두 입력, 필터 처리, 출력의 데이터 파이프라인 처리를 설정파일로 적용할 수 있기 때문에 개념이 비슷하고 범용적으로 채택하는 경우가 늘고 있다.

 

만약 파일에 출력한 로그를 정기적으로 모아서 처리하는 배치 처리가 이미 동작하고 있더라도 그 배치 처리를 건드리지 않고 Fluentd의 병행으로 가동할 수 있는 것은 강력한 장점이다. Fluentd를 적용하는 것으로 데이터 수집의 귀찮음을 최소화하고, 순차 수집을 가능하게 하는 것이 신선도가 높은 데이터로 비즈니스의 개선에 집중할 수 있도록 해준다.

 

또한 로그의 데이터 처리와 집게 이전 단계에서 로그를 모아서 안전한 장소에 준 실시간으로 파일을 저장하기 위한 간단한 용도로 Fluentd를 사용할 수도 있다.

 

(4) 플러그인의 에코시스템

Fluentd 자체의 기능과 역할은 매우 단순하지만 그 확장성과 플러그인의 에코시스템은 매우 강력하다. 다음은 Fluentd 플러그인의 종류다. 이런 플러그인들을 조합하면 여러가지 요구 조건을 충족 시킬수 있다.

  • 각종 소스로부터 데이터를 입력하는 Input 플러그인
  • 데이터를 가공하는 Filter 플러그인
  • 데이터를 출력하는 Output 플러그인

이렇게 일찍이 많은 사람들이 개발한 플러그인을 조합해서 사용하는 것으로 여러가지 로그 수집, 데이터가공, 데이터 출력의 설정을 간단하고 안정적으로 구혀할 수 있다. 필요한 플러그인이 있다면 이런 자산을 이용해서 손쉽게 플러그인을 만들 수 있고, 또한 그로 인해 서드파티에서 다수의 플러그인을 오픈소스로 공개하고 있다. 이용자가 많은 플러그인에 대해서는 Fluentd의 공식사이트에서 카테고리별로 정리된 내용을 확인할 수 있고, https://rubygems.org/에서 서 fluent-plugin이라는 키워드로 검색하면 모든 플러그인흘 확인할 수도 있다.

 

Perl로 작성된 "Plagger"와 이미지는 비슷하면서 데이터 입력이 있고 필터와 가공 그리고 Output하는 Perl로 만들어진 Plagger를 엔터프라이즈급의 로그, 이벤트 수집 미들웨어로 강화한 제품이라고 이해해도 좋다.

 

Fluentd는 RubyGems.org에 공개된 gem이므로 공식사트를 통해 직접 시스템에 설치를 할 수도 있고, 공식적으로 배포된 td-agent라는 올인원 패키지를 사용해서 설치할 수도 있다.

 

Linux계 배포판에서는 설치스크립트에서 treasuredata의 레파지토리를 시스템에 추가한다. 이것을 이용해서 td-agent가 설치된다. 이 Ruby 바이너리를 포함하는 Fluentd 실행 환경은 Linux계 배포판이라면 /opt/td-agent/ 디렉토리에 설치된다.

 

종류디렉토리설명
설정 디렉토리/etc/td-agent/ 
설정 파일/etc/td-agent/td-agent.conf기동 스크립트가 이 설정을 로드한다.
기동 스크립트/etc/init.d/td-agent 
로그 디렉토리/var/log/td-agent/ 
플러그인 디렉토리/etc/td-agent/plugin/독자적인 플러그인은 in_abc.rb, filter_abc.rb, out_abc.rb와 같은 형태로 설치한다.
Ruby 바이너리/opt/td-agent/embedded/bin/rubyRuby2.1이 번들로 포함되어 있다.
Gem 커맨드/usr/sbin/td-gent-gemtd-agent-gem 커맨드를 gem 커맨드처럼 이용한다.
Gem 설치장소/opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/미리 설치된 Fluentd 플러그인과 추가 플러그인이 설치되는 장소
jemalloc/opt/td-agent/embedded/lib/libjemalloc.so메모리의 파편화를 막기 위한 jemalloc은 Fluentd 서비스가 실행될 때 로드한다.

 

유연한 설정을 활용해서 데이터 파이프라인을 만들 수 있다. Fluentd의 설정 파일은 <system>, @include, <source>, <label>, <filter>, <match>의 6가지의 디렉티브를 조합하여 기술할 수 있다. 다음이 기본적인 데이터 흐름이다.

  1. <system> 디렉티브가 있으면 거기서 지정된 설정으로 코어 부분의 동작이 바뀐다.
  2. @include 디렉티브가 있으면 로컬 파일이나 HTTP로 받은 설정을 가지고 Fluentd 프로세스를 작동한다.
  3. <source> 디렉티브에서 지정한 플러그인을 경유해서 로그 수집을 시작한다.
  4. 지정한 태그 패턴의 <label> 디렉티브 안에 있는 <filter>나 <match>의 내용을 처리한다.
  5. 필요에 따라 중복해서 기술된 <filter> 디렉티브의 대상인 태그의 레코드를 가공한다.
  6. 정한 태그 패턴의 <match> 디렉티브에서 태그의 변환과 외부로의 데이터 출력을 처리한다.

이 데이터 흐름의 제어에는 dot(.)으로 구별된 태그와 라벨을 사용하고, 더해서 플러그인을 연결해서 다양한 필터 가공을 할 수 있다. 그 장점을 충분히 활용하고 안정적으로 운영하기 위해서는 태그와 라벨 설계의 최적화, 설정 파일의 간소화, 역할별로 서버의 분리를 해야 한다.

 

 

(1) <system> 디렉티브

Fluentd 코어 부분의 동작을 결정하는 디렉티브로 Fluentd 자체의 동작 및 로그 출력 부분의 상세 설정을 커스터마이즈할 수 있다. 이 설정들은 실행될 때 커맨드라인의 파라미터로 /etc/sysconfig/td-agent를 커스터마이즈하는 것과 같은 효과를 낼 수 있는 부분도 많다.

 

<system> 디렉티브를 사용하면 커맨드라인 파라미터를 사용하는 것보다 간단하게 다음과 같이 코어 부분의 동작을 변경할 수 있다. 또한 <system> 디렉티브는 실행 시의 커맨드라인 파라미터보다 우선적으로 설정된다.

 

(2) @include 디렉티브

설정을 외부로부터 읽어들이기 위한 디렉티브이다. 로컬 파일 뿐 아니라 설정 배포 서버에서 http 프로토콜로 받는 것도 가능하다. 분할되어 있는 설정 파일을 읽고 싶은 경우에도 include 디렉티브를 이용한다.

 

다만, Fluentd가 실행되는 타이밍에 매번 호출을 하기 때문에 로딩이 실패하면 실행이 실패하게 된다. 설정의 캐시 파일은 만들어지지 않기 때문에 어떤 설정을 읽었는지 실행시에 td-agent.log에 남긴 내용을 확인하거나, Fluentd의 RPC 엔드포인트를 설정해서 config.dump API를 호출해서 수동으로 td-agent.log에 남겨서 확인할 수도 있다.

 

(3) <source> 디렉티브

<source> 디렉티브는 메시지의 입력 부분이 되는 Input 플러그인을 지정하고, Input 플러그인에게 전달할 설정을 하는 디렉티브이다. 포트를 열고 메시지를 기다리거나 로그 파일의 내용이 추가되는 것을 감시하거나 프로그램을 일정 간격으로 실행해서 메시지를 읽거나 하는 플러그인 설정을 하고 싶을때 사용한다. 이 Input 플러그인에서는 tag, time, record의 3개의 요소로 구성된 메시지가 Fluentd로 보내져, 태그와 라벨을 사용해서 라우팅한다.

 

@type과 tag는 필수 항목이다. 각각 호출하는 플러그인과 출력할 때 분배하기 위한 태그를 지정한다. @id는 Fluentd 자체의 로그 출력과 감시할때 plugin_id로 이용할 별명으로 중복이 되지 않도록 각 디렉티브에 정의되어야 한다. 

 

Input 플러그인 중에 forward와 tail이 많이 사용된다. forward는 Fluentd 클라이언트로부터 메시지를 받거나, 다른 Fluentd 인스턴스로부터 메시지를 받을 때 사용하는 플러그인이다. 그리고 받은 메시지를 다른 Fluentd 인스턴스로 전달하고 싶을때는 Output 플러그인의 forward를 사용한다.

 

tail은 로그 파일에 추가되는 내용을 차례대로 읽는 플러그인이다. log rotate가 되더라도, 적절히 감지하여 Linux의 tail -f 커맨드에 가까운 동작을 한다. 또한 Fluentd를 재싱행하더라도 중복해서 읽지 않고 이어서 읽어들인다. pos_file 옵션을 지정해서 파일의 inode 번호와 offset 위치를 파일에 저장해두도록 설정해두기를 권장한다.

 

그리고, format 옵션을 사용해서 key, value의 구조화 데이터로 만들면 데이터를 취급하기가 쉬워진다.

 

(4) <filter> 디렉티브

특정 태그에 Filter 플러그인을 호출해서 데이터 처리를 하도록 설정하는 디렉티브이다. 초기의 Fluentd는 Input과 Output 플러그인 밖에 없었기 때문에 복수의 가공처리를 하기위해서는 Output 플러그인을 여러개 사용할 필요가 있었기 때문에 태그를 변환하는 작업이 필요했다. 따라서 설정 파일의 유지관리가 어려워지고 복잡성이 올라가는 문제가 생겼고, 그 문제를 해결하기 위해 추가된 기능이 태그의 변환이 필요없는 Filter 플러그인 이다.

 

내부적으로는 파이프라인 처리를 사용하기 때문에 체인이 늘어나더라도 시간당 처리 성능에 영향은 크지 않다. 대표적인 플러그인으로는 레코드를 취사 선택하는 grep 플러그인과 호스트명을 추가하는 등의 레코드 내용을 편집하는 record_feformer 플러그인, IP 주소를 이용하여 지역정보를 부여하는 geoip 플러그인이 있다.

 

Filter 플러그인에서는 태그의 변경이 불가능하다. 태그의 변경을 위해서는 <match> 디렉티브에서 Output 플러그인의 rewrite_tag_filter를 이용하면 된다.

 

(5) <match> 디렉티브

로그의 출력 장소를 정하는 디렉티브이다. 임의의 태그에 해당하는 로그를 type 파라미터로 지정한 Output 플러그인으로 출력한다.

 

플러그인의 수가 가장 많은 장르로 각종 클라우드 스토리지에 저장하거나, 데이터베이스, Key-value store, Queueing 시스템에 연계하는 써드파티 플러그인이 많이 있다. 대표적인 플러그인으로 레코드별로 Fluentd 노드에 전송하는 forward 플러그인과 여러 개의 Output 플러그인에 전달하는 copy 플러그인, 로컬 파일에 저장하는 file 플러그인, AWS의 S3에 보존하는 s3 플러그인이 있다. 그 외에 임의의 정규표현에 매칭되는 레코드의 태그를 가공해서 별도의 <match> 디렉티브에 전달하는 rewrite_tag_filter 플러그인이 있다.

 

<match> 디렉티브에서 정의하는 태그의 패턴은 매우 유연하여 다양한 표현이 가능하다. Fluentd에서는 태그를 점(.)으로 요소를 구분하고, *를 사용해서 와일드카드로 지정하거나 {}를 사용하여 OR 조건으로 지정할 수 있다.

 

file과 forward, s3 플러그인 등의 저장 및 외부 통신을 하는 Output 플러그인에서는 순차적 전송 처리를 하지 않고, 일단 버퍼에 누적해두고, 한꺼번에 송신한다. 문제가 발생해서 처리가 정상적으로 되지 않았을 경우 Fluentd는 송신을 재시도한다. 이런 제어 버퍼관련 설정 파라미터도 존재한다. 초기값은 플러그인별로 조금씩 다를수 있기 때문에 각 플러그인의 설정을 확인하면 된다.

 

  • in_exec: 일정 간격별로 지정된 경로에 있는 스크립트를 커맨드 실행해서 그 결과를 수집한다.
  • out_exec_filter: 수집한 레코드의 가공을 외부 스크립트에 맡기고 임의의 태그에 체인한다.
  • out_exec: 수집한 레코드를 외부 스크립트에 전달하여 처리한다.

 

out_exec_filter와 out_exec로부터 TSV(Tab Separated Values: 탭 구분)과 MessagePack, JSON 등의 데이터 형식을 이용할 수 있는 언어에서 무엇이든 호출할 수 있다. 1행에 1레코드를 표준입력 경우로 외부 프로그램에 레코드 내용을 전달해서 어떤 처리를 하고, 종료상태가 에러면 Fluentd가 적절히 재시도를 하고, out_exec_filter이면 표준출력에 보내고 그 결과를 임의의 태그로 Fluentd 안에 송신한다.

 

각 플러그인의 flush_interval로 지정된 시간에 버퍼링한 메시지를 한꺼번에 전달하는 것으로 CPU 비용이 드는 프로세스의 생성도 막고 있다.



출처: https://12bme.tistory.com/476?category=737765 [길은 가면, 뒤에 있다.]