DB - MongoDB 맵리듀스(Map Reduce)

2021. 4. 25. 01:22 NoSQL/MongoDB

 

몽고디비 서버에서도 다른 대용량 분산 NoSQL DBMS처럼 맵리듀스 기능을 제공한다. 맵리듀스와 유사한 집계(Aggregations)기능도 제공하지만 더욱 복잡한 패턴의 분석은 맵리듀스 기능이 필요 할 수도 있다. 몽고디비의 맵리듀스는 자바스크립트 언어의 문법을 사용하여 구현한다. 즉, 내부적으로는 자바스크립트 엔진을 이용한다는 뜻이다. 몽고디비가 현재까지 SpiderMonkey, V8 등의 자바스크립트 엔진을 사용했지만 현재는 SpiderMonkey 엔진을 기본으로 사용한다. 두 엔진의 차이가 여기서 이야기할 내용은 아니지만 간단히 아래와 같은 차이를 갖는다.

 

V8 엔진은 멀티 프로세스 방식이며, SpiderMonkey는 단일 프로세스 내에서 멀티 스레드로 작동한다.

 

어찌됬든 몽고디비에서 성능이 더 나은 엔진을 선택했을 것이다. 내부적으로 도큐먼트를 자바스크립트 변수로 매핑하고 자바스크립트로 여러가지 맵리듀스 과정을 거쳐 최종적으로 다시 몽고디비 문서로 생성되는 것이다.

 

아래 이미지는 공식 레퍼런스에 있는 맵리듀스 과정을 도식화한 것이다. 

 

 

결론적으로는 map 과정에서 emit이라는 함수를 호출하는데, 키로 사용될 필드, 값으로 사용될 필드를 인자로 넣어준다. 그러면 내부적으로 같은 키값에 대한 여러개의 값을 배열로 가지고 있게 되고 이후 reduce 과정에서 해당 키와 값배열을 받아 reduce 연산(이밎에서는 Array.sum(values)라는 합산 연산)을 실행한 후 결과 문서를 출력 혹은 별도 컬렉션의 저장등의 작업을 수행한다. 여기서 키와 값은 스칼라 값일 수도 있고, 문서 혹은 서브도큐먼트 일수도 있다.

 

그러면 mapReduce에서 사용하는 각종 옵션에 대해 알아보자.

 

> db.collection.mapReduce(

                                              <map>

                                              ,<reduce>

                                              {

                                                out: <collection>

                                                ,query:<document>

                                                ,sort:<document>

                                                ,limit:<number>

                                                ,finalize:<function>

                                                ,scope:<document>

                                                ,jsMode:<boolean>

                                                ,verbose:<boolean>

                                              }

                                            )

 

옵션 설명
out 맵리듀스 명령의 결과를 저장하거나 출력할 위치를 명시하는데, out 인자에는 문자열 또는 도큐먼트를 설정할 수 있다.
query 맵리듀스 명령을 실행할 대상 문서를 검색하는 조건을 명시한다. query 옵션에는 일반적인 find 명령의 검색 조건과 동일한 오퍼레이터를 활용할 수 있으며, 쿼리의 검색 성능이나 인덱스 활용도 find 명령과 동일한 방식으로 작동한다. 풀스캔보다는 인덱스를 활용하여 검색 성능을 향상시키는 것이 좋다.
sort 맵리듀스 명령을 실행할 대상 문서를 먼저 정렬해서 맵리듀스 엔진으로 전달하고자 할 때에는 sort 옵션을 사용한다. 맵리듀스 엔진은 key 필드의 값으로 정렬을 수행하는데, sort 옵션을 사용하여 인덱스를 활용한 정렬을 수행할 수 있다면 맵리듀스 엔진의 정렬에 드는 부하를 줄일 수 있다.
limit 맵리듀스 엔진으로 전달할 문서의 개수를 설정한다.
finalize 대게 맵리듀스는 map과 reduce 함수로 이루어지는데, reduce함수의 결과를 다시 한번 가공해 최종결과를 만들고 싶다면 finalize 함수를 이용한다.
scope map과 reduce 함수 그리고 finalize 함수에서 접근할 수 있는 글로벌 변수를 정의한다. 보통은 map&reduce에는 인자로 전달된 값만 이용할 수 있으므로 모든 문서가 공유할 변수값등이 필요하면 scope를 이용하면 된다.
jsMode 몽고디비 맵리듀스 엔진은 map이나 reduce 함수로 전달되는 문서를 계속해서 몽고디비서버와 자바스크립트 엔진 사이에서 변환 작업을 한다. 이는 맵리듀스 처리과정에 상당한 성능 저하를 일으키는데, jsMode를 true로 설정하면 중간 과정의 데이터를 메모리에 모두 보관한다. 더 빠른 처리가 가능하지만 그만큼 메모리를 많이 먹는 작업이다. 처리하는 문서가 50만건을 초과하면 jsMode를 true로 설정할 수 없다. jsMode는 기본값이 false이다.
verbose 맵리듀스의 처리 결과에 단계별 처리 과정 및 소요 시간에 대한 정보를 포함할 것인지 결정한다. 기본값은 true이다.

 

밑은 간단하게 out에 적용할 수 있는 옵션에 대한 설명이다.

 

out 옵션 설명

> db.collection.mapReduce(mapFunction, reduceFunction,

  {out: {inline:1}}

);

맵리듀스 결과를 화면 혹은 클라이언트 명령의 결과로 전달한다.

> db.collection.mapReduce(mapFunction, reduceFunction, 

  {out: "collection"}

);

맵리듀스의 결과를 collection이라는 이름의 컬렉션으로 저장한다.

> db.collection.mapReduce(mapFunction, reduceFunction, 

  {out: {replace:"collection"}}

);

collection이라는 컬렉션이 이미 존재한다면 기존 내용을 모두 삭제하고, 맵리듀스 명령의 결과를 해당 컬렉션에 저장한다.

> db.collection.mapReduce(mapFunction, reduceFunction, 

  {out: {merge:"collection"}}

);

collection이라는 컬렉션이 이미 존재하고, 프라이머리 키가 동일한 문서가 존재한다면 맵리듀스의 결과로 그 문서를 덮어쓰기 한다.

> db.collection.mapReduce(mapFunction, reduceFunction, 

  {out: {reduce:"collection"}}

);

collection이라는 컬렉션이 이미 존재하고, 프라이머리 키가 동일한 문서가 존재한다면 맵리듀스의 결과와 기존 문서를 이용해서 다시 맵리듀스를 실행해서 결과를 저장한다. 즉, reduce 타입의 out은 incremental 맵리듀스를 실행할 때 사용한다.

> db.collection.mapReduce(mapFunction, reduceFunction, 

  {out: {replace:"collection", db: database}}

);

맵리듀스의 결과를 새로운 database라는 데이터베이스의 collection 컬렉션에 저장한다. 

> db.collection.mapReduce(mapFunction, reduceFunction, 

  {out: {replace:"collection", sharded:true}}

);

맵리듀스의 결과를 collection 컬렉션에 저장하고, 이때 collection 컬렉션을 _id 필드를 기준으로 샤딩한 다음 결과를 저장한다. 4.2 버전 기준으로 해당 옵션은 deprecated 됬다. 현재는 샤딩된 컬렉션으로 출력하기 위해서는 미리 먼저 샤딩된 컬렉션을 생성해야한다.

> db.collection.mapReduce(mapFunction, reduceFunction, 

  {out: {merge:"collection", nonAtomic:true}}

);

맵리듀스의 결과를 collection 컬렉션에 저장할 때 원자 단위로 처리한다.

 

참고로 nonAtomic 옵션은 merge나 reduce 방식의 out에만 적용할 수 있는 옵션이며, 기본값은 false이다. 만약 nonAtomic 옵션을 false로 지정하면 맵리듀스 처리결과를 컬렉션에 저장할 때, 데이터베이스 레벨의 잠금을 걸게된다. nonAtomic이 true라면 물론 데이터베이스 잠금이 걸리지만, 적절히 잠금을 해제(Yield)했다가 다시 잠금을 걸면서 처리한다. 해당 옵션은 상황에 따라 적용하면 될듯하다. 만약 잠금이 걸려있다면 다른 커넥션에서 조회 변경 등을 할 수 없다.

 

Map-Reduce And Sharded Collection

 

-Sharded Collection as Input

맵리듀스 작업의 입력으로 샤딩된 컬렉션을 이용할 때, 몽고디비서버는 각 샤드에 맵리듀스 작업을 자동으로 병렬 전송한다. 입력으로 샤딩된 컬렉션을 이용하기 위해 별도의 옵션은 필요하지 않고 몽고디비서버는 모든 작업이 끝날 때까지 기다린다.

 

-Sharded Collection as output

sharded 옵션이 true라면, 몽고디비서버는 _id 필드 값 샤딩키로 사용하여 맵리듀스 결과 출력 컬렉션을 샤딩한다.

 

샤딩된 컬렉션으로 출력을 하려면 아래와 같은 조건을 지켜줘야한다.

 

  1. 맵리듀스 결과로 출력할 컬렉션이 존재하지 않는다면, 먼저 샤딩된 컬렉션을 생성해야한다. 4.2버전 기준의 몽고디비는 맵리듀스 옵션으로 새로운 샤딩된 컬렉션을 생성하는 것이 deprecated됬고, sharded 옵션 사용도 deprecated됬다. 즉, 샤딩된 컬렉션을 출력 컬렉션으로 사용하고 싶다면 맵리듀스 작업 이전에 먼저 샤딩된 컬렉션을 생성해야한다. 물론 현재 샤딩된 컬레션이 존재하지 않으면, _id 필드값을 기준으로 샤딩된 컬렉션을 생성하긴 하지만 공식적으로 추천하지 않는 방법이다.
  2. 4.2버전 몽고디비서버부터 이미 존재하는 샤딩된 컬렉션을 replacement하는 방식은 deprecated됬다.
  3. 버전 4.0부터 출력 컬렉션이 이미 있지만 샤딩되지 않은 경우 map-reduce가 실패한다.
  4. 새롭게 생성된 샤딩된 컬렉션 또는 비어있는 샤딩된 컬렉션의 경우 MongoDB는 맵리듀스 작업의 첫 단계 결과를 사용하여 샤드에 분산 된 초기 청크를 만듭니다.
  5. 몽고디비서버는 출력 후처리를 모든 샤드에 병렬로 수행한다.

 

Map-Recuce Concurrency

맵리듀스 작업에는 많은 작업들이 있는데, 각 작업마다 동시성을 위한 락이 걸린다. 각 단계에 대한 락은 아래와 같다.

 

  • The read phase takes a read lock. It yields every 100 documents.
  • The insert into the temporary collection takes a write lock for a single write.
  • If the output collection does not exist, the creation of the output collection takes a write lock.
  • If the output collection exists, then the output actions (i.e. merge, replace, reduce) take a write lock. This write lock is global, and blocks all operations on the mongod instance.
  • 읽기 단계는 읽기 잠금을 수행한다. 100개 문서마다 잠금을 해제한다.
  • 임시 콜렉션에 삽입하면 단일 쓰기에 대한 쓰기 잠금이 사용된다.
  • 출력 콜렉션이 존재하지 않으면 출력 콜렉션 작성에 쓰기 잠금이 사용됩니다. 출
  • 력 콜렉션이 존재하면 출력 조치 (즉, 병합, 대체, 리듀스)가 쓰기 잠금을 수행합니다. 이 쓰기 잠금은 전역 적이며 mongod 인스턴스의 모든 작업을 차단합니다.

위 설명은 간단히 전체 문서를 한번에 읽고 작업하는 것이 아니라 일정 크기의 데이터를 잘라서 맵리듀스 작업을 한다라고 생각하면 된다. 작업을 잘라서 수행하기 때문에 일부 문서들을 map하여 reduce한 다음에 결과를 임시 저장소에 보관했다가 나중에 다시 reduce가 수행될때 동일한 key를 가진 중간 결과가 있으면 같이 모아 reduce를 수행한다.

 

최종 출력에서 merge나 reduce 작업은 문서의 양의 따라 상당히 많은 시간이 걸릴 수 있다. 즉, 오랜 시간동안 락이 걸리면 안되기 때문에 nonAtomic 옵션을 true로 주어서 잠금을 해제하고 잠금을 다시 걸고 하는 방식으로 작업을 해줘야한다. 즉, merge&reduce 출력 옵션은 nonAtomic 옵션을 true로 하길 권장한다.

 

Map-Reduce 예제

db.orders.insert({
...      _id: ObjectId("50a8240b927d5d8b5891743c"),
...      cust_id: "abc123",
...      ord_date: new Date("Oct 04, 2012"),
...      status: 'A',
...      price: 25,
...      items: [ { sku: "mmm", qty: 5, price: 2.5 },
...               { sku: "nnn", qty: 5, price: 2.5 } ]
... });

 

위 문서를 삽입한다. 위 문서를 삽입한 후에 소비자별 총액을 구하는 맵리듀스 연산을 해보도록 할 것이다.

> var mapFunction1 = function() {
...                        emit(this.cust_id, this.price);
...                    };
> var reduceFunction1 = function(keyCustId, valuesPrices) {
...                           return Array.sum(valuesPrices);
...                       };
> db.orders.mapReduce(
...                      mapFunction1,
...                      reduceFunction1,
...                      { out: "map_reduce_example" }
...                    )
{
    "result" : "map_reduce_example",
    "timeMillis" : 76,
    "counts" : {
        "input" : 1,
        "emit" : 1,
        "reduce" : 0,
        "output" : 1
    },
    "ok" : 1
}
 
->result
 
> db.map_reduce_example.find()
{ "_id" : "abc123", "value" : 25 }

 

위는 소비자별 금액 총액을 구하는 맵리듀스 연산이고 결과를 map_reduce_example이라는 새로운 컬렉션에 저장하는 예제이다. 우선 각 단계별로 사용법을 살펴보자.

 

-Map

mapFunction1이라는 변수에 함수를 정의한다. emit이라는 함수는 특정 키값별로 값을 그룹핑하는 함수이다. 여기서 this 연산자는 map 작업에 사용될 문서를 참조하는 키워드이다. 즉, cust_id 별로 price를 배열로 그룹핑하고 있다.

 

-Reduce

reduceFunction1이라는 변수에 함수를 정의한다. 이 함수는 그룹핑된 문서들의 값을 조작하는 함수이다. 즉, 결과값을 가공하는 단계라고 보면 된다. 인자의 keyCustId와 valuesPrices는 Map 단계에서 그룹핑된 키와 값의 배열을 인자로 받고 있다.

 

-output

db.orders.mapReduce 명령으로 우리가 정의한 Map함수와 Reduce함수를 전달하여 작업을 수행한 후 map_reduce_example이라는 컬렉션을 새로 생성하여 결과값을 저장한다.

 

문서하나로는 부족하니 3개의 문서를 더 삽입하여 다시 실행해보자.

> db.orders.insertMany([{
...      cust_id: "abc123",
...      ord_date: new Date("Oct 04, 2012"),
...      status: 'A',
...      price: 40,
...      items: [ { sku: "mmm", qty: 5, price: 2.5 },
...               { sku: "nnn", qty: 5, price: 2.5 } ]
... },
... {
...      cust_id: "abc123",
...      ord_date: new Date("Oct 04, 2012"),
...      status: 'A',
...      price: 25,
...      items: [ { sku: "mmm", qty: 5, price: 2.5 },
...               { sku: "nnn", qty: 5, price: 2.5 } ]
... },
... {
...      cust_id: "abc123",
...      ord_date: new Date("Oct 04, 2012"),
...      status: 'A',
...      price: 10,
...      items: [ { sku: "mmm", qty: 5, price: 2.5 },
...               { sku: "nnn", qty: 5, price: 2.5 } ]
... }]);
 
> var mapFunction1 = function() {emit(this.cust_id, this.price)};
> var reduceFunction1 = function(keyCustId, valuesPrices) {return Array.sum(valuesPrices);};
> db.orders.mapReduce(mapFunction1,reduceFunction1,{ out: "map_reduce_example"});
{
    "result" : "map_reduce_example",
    "timeMillis" : 47,
    "counts" : {
        "input" : 4,
        "emit" : 4,
        "reduce" : 1,
        "output" : 1
    },
    "ok" : 1
}
> db.map_reduce_example.find()
{ "_id" : "abc123", "value" : 100 }

 

4개의 문서의 price를 모두 더하고 있다. 다음은 replace 출력이다. 기존 맵리듀스 결과를 저장하는 컬렉션에 더미 데이터를 하나 삽입하고 맵리듀스를 다시 실행시켰다.

> db.map_reduce_example.insert({description:"dummy data"});
WriteResult({ "nInserted" : 1 })
> db.map_reduce_example.find()
{ "_id" : "abc123", "value" : 100 }
{ "_id" : ObjectId("5d859b0b9ca9d5ca23a3447e"), "description" : "dummy data" }
> var mapFunction1 = function() {                        emit(this.cust_id, this.price);                    };
> var reduceFunction1 = function(keyCustId, valuesPrices) {                           return Array.sum(valuesPrices);                       };
> db.orders.mapReduce(mapFunction1,reduceFunction1,{out:{replace:"map_reduce_example"}});
{
    "result" : "map_reduce_example",
    "timeMillis" : 44,
    "counts" : {
        "input" : 4,
        "emit" : 4,
        "reduce" : 1,
        "output" : 1
    },
    "ok" : 1
}
> db.map_reduce_example.find()
{ "_id" : "abc123", "value" : 100 }

 

기존 컬렉션의 데이터를 모두 지우고 새로운 맵리듀스 결과를 삽입하였다. 다음은 merge 출력이다. orders 컬렉션에 하나의 문서를 추가했고 map_reduce_example에 역시 더미 데이터를 하나 삽입하였다.

> db.map_reduce_example.insert({description:"dummy data"});
WriteResult({ "nInserted" : 1 })
> db.orders.insertOne({cust_id:"abc123",price:20});
{
    "acknowledged" : true,
    "insertedId" : ObjectId("5d859bdd9ca9d5ca23a34480")
}
> var mapFunction1 = function() {                        emit(this.cust_id, this.price);                    };
> var reduceFunction1 = function(keyCustId, valuesPrices) {                           return Array.sum(valuesPrices);                       };
> db.orders.mapReduce(mapFunction1,reduceFunction1,{out:{merge:"map_reduce_example"}});
{
    "result" : "map_reduce_example",
    "timeMillis" : 47,
    "counts" : {
        "input" : 5,
        "emit" : 5,
        "reduce" : 1,
        "output" : 2
    },
    "ok" : 1
}
> db.map_reduce_example.find()
{ "_id" : "abc123", "value" : 120 }
{ "_id" : ObjectId("5d859bbf9ca9d5ca23a3447f"), "description" : "dummy data" }

 

기존의 더미 데이터는 그대로 남아있고 새로운 결과를 기존 문서에 병합하였다. 마지막은 reduce 출력이다. orders 컬렉션에 새로운 문서하나를 삽입하였다.

> db.orders.insertOne({cust_id:"abc123",price:30});
{
    "acknowledged" : true,
    "insertedId" : ObjectId("5d859c8c9ca9d5ca23a34481")
}
> var mapFunction1 = function() {                        emit(this.cust_id, this.price);                    };
> var reduceFunction1 = function(keyCustId, valuesPrices) {                           return Array.sum(valuesPrices);                       };
> db.orders.mapReduce(mapFunction1,reduceFunction1,{out:{reduce:"map_reduce_example"}});
{
    "result" : "map_reduce_example",
    "timeMillis" : 39,
    "counts" : {
        "input" : 6,
        "emit" : 6,
        "reduce" : 1,
        "output" : 2
    },
    "ok" : 1
}
> db.map_reduce_example.find()
{ "_id" : "abc123", "value" : 270 }
{ "_id" : ObjectId("5d859bbf9ca9d5ca23a3447f"), "description" : "dummy data" }

 

내가 생각했던 결과와는 조금 다르다. 내가 생각했던 결과를 실제 결과를 보고 다시 생각하니 당연히 내가 생각한 결과가 나올 수가없다. 내가 생각한 결과는 기존에 120이 합산되어있고 price가 30인 문서를 새로 삽입했으니 150이 되겠지 했는데, 결과는 당연히 아니다 맵리듀스결과는 150으로 나오고 기존 120에 150이 합해져 270이라는 결과가 나오는 것이다. 당연하다..맵리듀스 결과 컬렉션에 문서단위로 들어간게 아니니..당연히 120에 150이 더해진다. 보통 incremental 맵리듀스는 날짜등을 기준으로 증분시키면서 사용하면 될듯하다. 역시 바보였다 나는..

 

다음은 finalize를 통해 맵리듀스 결과를 다시 한번 가공하는 예제이다.

> var mapFunction2 = function() {
...                        for (var idx = 0; idx < this.items.length; idx++) {
...                            var key = this.items[idx].sku;
...                            var value = {
...                                          count: 1,
...                                          qty: this.items[idx].qty
...                                        };
...                            emit(key, value);
...                        }
...                     };
> var reduceFunction2 = function(keySKU, countObjVals) {
...                      reducedVal = { count: 0, qty: 0 };
... 
...                      for (var idx = 0; idx < countObjVals.length; idx++) {
...                          reducedVal.count += countObjVals[idx].count;
...                          reducedVal.qty += countObjVals[idx].qty;
...                      }
... 
...                      return reducedVal;
...                   };
> var finalizeFunction2 = function (key, reducedVal) {
... 
...                        reducedVal.avg = reducedVal.qty/reducedVal.count;
... 
...                        return reducedVal;
... 
...                     };
> db.orders.mapReduce( mapFunction2,
...                      reduceFunction2,
...                      {
...                        out: { merge: "map_reduce_example" },
...                        query: { ord_date:
...                                   { $gt: new Date('01/01/2012') }
...                               },
...                        finalize: finalizeFunction2
...                      }
...                    )
{
    "result" : "map_reduce_example",
    "timeMillis" : 47,
    "counts" : {
        "input" : 4,
        "emit" : 8,
        "reduce" : 2,
        "output" : 4
    },
    "ok" : 1
}
> db.map_reduce_example.find()
{ "_id" : "abc123", "value" : 270 }
{ "_id" : ObjectId("5d859bbf9ca9d5ca23a3447f"), "description" : "dummy data" }
{ "_id" : "mmm", "value" : { "count" : 4, "qty" : 20, "avg" : 5 } }
{ "_id" : "nnn", "value" : { "count" : 4, "qty" : 20, "avg" : 5 } }

 

-input

입력값을 전체 문서 대상이 아닌 ord_date 값이 2012년1월1일 이후인 문서를 입력값으로 받고 있다.

 

-Map

item 배열을 하나씩 순회하면서 값을 채워주고 있다. 여기서 value는 하나의 문서로 들어가고 있다. 처음에 설명했듯이 key나 value에는 스칼라 값뿐만 아니라 문서도 들어갈 수 있다.

 

-Reduce

하나의 key 값에 대해 값을 누적시키고 있다. 각 배열의 요소에는 count가 1과 각자의 qty를 가지고 있고, 해당 값들을 누적시키기 위해서 reducedVal 변수를 초기화했다.

 

-finalize

맵리듀스의 결과를 키/값으로 받고 해당 값들을 가공하고 있다. 여기서는 평균 수량을 구하기 위한 연산을 수행하고 있고 인자로 들어온 reducedVal에 새로운 필드를 생성하고 평균값을 넣었다.

 

-output

map_reduce_example 컬렉션으로 결과를 merge한다.

 

Incremental MapReduce

증분 맵리듀스에 대한 간단한 예제이다. 각 map/reduce/finalize에 대한 설명은 하지 않는다.

db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 } );
 
db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 } );

 

위 문서들을 삽입해준다.

> db.sessions.insertMany([db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } ),
... db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 } ),
... db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 } ),
... db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 } ),
... db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 } ),
... db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 } ),
... db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 } ),
... db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 } )]);
{
    "acknowledged" : true,
    "insertedIds" : [
        ObjectId("5d85a1f29ca9d5ca23a3448a"),
        ObjectId("5d85a1f29ca9d5ca23a3448b"),
        ObjectId("5d85a1f29ca9d5ca23a3448c"),
        ObjectId("5d85a1f29ca9d5ca23a3448d"),
        ObjectId("5d85a1f29ca9d5ca23a3448e"),
        ObjectId("5d85a1f29ca9d5ca23a3448f"),
        ObjectId("5d85a1f29ca9d5ca23a34490"),
        ObjectId("5d85a1f29ca9d5ca23a34491")
    ]
}
> var mapFunction = function() {
...                       var key = this.userid;
...                       var value = {
...                                     userid: this.userid,
...                                     total_time: this.length,
...                                     count: 1,
...                                     avg_time: 0
...                                    };
... 
...                       emit( key, value );
...                   };
> var reduceFunction = function(key, values) {
... 
...                         var reducedObject = {
...                                               userid: key,
...                                               total_time: 0,
...                                               count:0,
...                                               avg_time:0
...                                             };
... 
...                         values.forEach( function(value) {
...                                               reducedObject.total_time += value.total_time;
...                                               reducedObject.count += value.count;
...                                         }
...                                       );
...                         return reducedObject;
...                      };
> var finalizeFunction = function (key, reducedValue) {
... 
...                           if (reducedValue.count > 0)
...                               reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
... 
...                           return reducedValue;
...                        };
> db.sessions.mapReduce( mapFunction,
...                        reduceFunction,
...                        {
...                          out: "session_stat",
...                          finalize: finalizeFunction
...                        }
...                      )
{
    "result" : "session_stat",
    "timeMillis" : 44,
    "counts" : {
        "input" : 16,
        "emit" : 16,
        "reduce" : 5,
        "output" : 5
    },
    "ok" : 1
}
> db.session_stat.find()
{ "_id" : null, "value" : { "userid" : null, "total_time" : NaN, "count" : 8, "avg_time" : NaN } }
{ "_id" : "a", "value" : { "userid" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 } }
{ "_id" : "b", "value" : { "userid" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 } }
{ "_id" : "c", "value" : { "userid" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 } }
{ "_id" : "d", "value" : { "userid" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 } }

 

첫 맵리듀스의 수행 결과이다. 유저 아이디당 총 세션시간과 접속 횟수, 평균 세션 시간을 넣고 있다. 추가적으로 문서를 추가한다.

db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } ),
db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length: 115 } ),
db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length: 125 } ),
db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length: 55 } )

 

맵리듀스를 2011년11월5일 이후에 새로 들어온 문서를 기준으로 결과를 내고, 이전의 맵리듀스 결과에 증분(누적)시킨다.

> db.sessions.insertMany([db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } ),
... db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length: 115 } ),
... db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length: 125 } ),
... db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length: 55 } )]);
{
    "acknowledged" : true,
    "insertedIds" : [
        ObjectId("5d85a29d9ca9d5ca23a34496"),
        ObjectId("5d85a29d9ca9d5ca23a34497"),
        ObjectId("5d85a29d9ca9d5ca23a34498"),
        ObjectId("5d85a29d9ca9d5ca23a34499")
    ]
}
> var mapFunction = function() {
...                       var key = this.userid;
...                       var value = {
...                                     userid: this.userid,
...                                     total_time: this.length,
...                                     count: 1,
...                                     avg_time: 0
...                                    };
... 
...                       emit( key, value );
...                   };
> var reduceFunction = function(key, values) {
... 
...                         var reducedObject = {
...                                               userid: key,
...                                               total_time: 0,
...                                               count:0,
...                                               avg_time:0
...                                             };
... 
...                         values.forEach( function(value) {
...                                               reducedObject.total_time += value.total_time;
...                                               reducedObject.count += value.count;
...                                         }
...                                       );
...                         return reducedObject;
...                      };
> var finalizeFunction = function (key, reducedValue) {
... 
...                           if (reducedValue.count > 0)
...                               reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
... 
...                           return reducedValue;
...                        };
> db.sessions.mapReduce( mapFunction,
...                        reduceFunction,
...                        {
...                          query: { ts: { $gt: ISODate('2011-11-05 00:00:00') } },
...                          out: { reduce: "session_stat" },
...                          finalize: finalizeFunction
...                        }
...                      );
{
    "result" : "session_stat",
    "timeMillis" : 43,
    "counts" : {
        "input" : 4,
        "emit" : 4,
        "reduce" : 0,
        "output" : 5
    },
    "ok" : 1
}
> db.session_stat.find()
{ "_id" : null, "value" : { "userid" : null, "total_time" : NaN, "count" : 8, "avg_time" : NaN } }
{ "_id" : "a", "value" : { "userid" : "a", "total_time" : 300, "count" : 3, "avg_time" : 100 } }
{ "_id" : "b", "value" : { "userid" : "b", "total_time" : 345, "count" : 3, "avg_time" : 115 } }
{ "_id" : "c", "value" : { "userid" : "c", "total_time" : 375, "count" : 3, "avg_time" : 125 } }
{ "_id" : "d", "value" : { "userid" : "d", "total_time" : 165, "count" : 3, "avg_time" : 55 } }

 

결과가 누적된 것을 볼수 있다. 이런식으로 날짜를 기준으로 증분 맵리듀스를 사용하면 가장 활용도가 좋을 듯 싶다. 즉, 시점을 기준으로 하자는 것이다.

 

마지막으로 맵리듀스를 사용하면서 꼭 알아야할 내용이다.

 

  • Map 함수에서 호출하는 emit() 함수의 두번째 인자와 Reduce 함수의 리턴 값은 같은 포맷이어야 한다.
  • Reduce 함수의 연산 작업은 멱등(Idempotent)이어야 한다. 즉, Reduce 함수 작업이 멱등하지 않다면 정확한 결과가 나오지 않을 수 있다.

 

여기까지 간단하게 몽고디비의 맵리듀스를 다루어보았다. 사실 영어로된 레퍼런스가 해석이 애매모호한 부분도 많고 아직 다루어보지 않은 맵리듀스 관련 내용도 있다. 해당 내용들을 보완해 추후에 다루어볼 것이다. 이번 포스팅은 맵리듀스가 뭐고 간단히 어떻게 사용해볼 수 있냐를 다루어봤다.



출처: https://coding-start.tistory.com/293?category=815805 [코딩스타트]