2012年2月4日土曜日

MongoDB で MapReduce 試してみた。

MongoDB で MapReduce を試してみる。
年齢のみを項目としてもつ age コレクションを作成し
  1. ヒストグラム
  2. 各種統計量(件数, 合計, 平均, 分散, 標準偏差)
を算出してみようと思う。
2. の統計量の算出では2段 MapReduce を試してみる。

参考:

0. データの投入

今回の検証用データ(年齢180件)を作成したのでそれを投入。
$ mongoimport -d mydb -c age --type csv --headerline --file mydb.age.csv 
connected to: 127.0.0.1
imported 181 objects

$ mongo
MongoDB shell version: 2.0.2
connecting to: test

> use mydb
switched to db mydb

> db.age.count()
180

1. ヒストグラムの作成
年齢ごとの人数をカウントしてみる。

cnt.age.1.js
// 結果出力用コレクション名
var colName = 'age.cnt';

var map = function() {
  emit(
    this.age,   // 集計用キー
    1           // 集計値
  );  
}

var reduce = function(key, values) {
  var count = 0;

  values.forEach(function(value) {
    count += value;
  }); 

  return count;
}


// MongoDB
mongo = new Mongo('localhost');
mydb = mongo.getDB('mydb');

// out で出力用コレクションを指定
var res = mydb.age.mapReduce(map, reduce, {out: colName});

// 結果をコンソールに出力
shellPrint(res);

mongo コマンドの引数としてファイル名を指定する事で MapReduce 処理を実施可能。
$ mongo cnt.age.1.js 
MongoDB shell version: 2.0.2
connecting to: test
{
 "result" : "age.cnt",
 "timeMillis" : 24,
 "counts" : {
  "input" : 180,
  "emit" : 180,
  "reduce" : 20,
  "output" : 23
 },
 "ok" : 1,
}

// 集計値を確認
$ mongo
MongoDB shell version: 2.0.2
connecting to: test
> use mydb
switched to db mydb
> db.age.cnt.find()
{ "_id" : 18, "value" : 2 }
{ "_id" : 19, "value" : 9 }
{ "_id" : 20, "value" : 9 }
{ "_id" : 21, "value" : 12 }
{ "_id" : 22, "value" : 12 }
{ "_id" : 23, "value" : 16 }
{ "_id" : 24, "value" : 15 }
{ "_id" : 25, "value" : 14 }
{ "_id" : 26, "value" : 15 }
{ "_id" : 27, "value" : 11 }
{ "_id" : 28, "value" : 11 }
{ "_id" : 29, "value" : 9 }
{ "_id" : 30, "value" : 10 }
{ "_id" : 31, "value" : 6 }
{ "_id" : 32, "value" : 5 }
{ "_id" : 33, "value" : 1 }
{ "_id" : 34, "value" : 5 }
{ "_id" : 35, "value" : 2 }
{ "_id" : 36, "value" : 7 }
{ "_id" : 37, "value" : 4 }
has more

ちょっと集計が細かすぎるので10刻みでの集計を実施してみる。

cnt.age.2.js
var colName = 'age.cnt2';

var map = function() {
  // 集計範囲に対応するキーを生成
  var age = this.age - 1;
  var range = ((age - (age % 10)) / 10) * 10; 
  var label = String(range + 1) + "~" + String(range + 10);

  emit(
    label,
    {count: 1}  // json 形式で value を指定する事も可
  );  
}

// 同一キーに対するデータが1件しか存在しない場合は
// この関数は呼ばれないっぽい。知らないとハマる。。。
var reduce = function(key, values) {
  var count = 0;

  values.forEach(function(value) {
    count += value.count;
  }); 

  return count;
}


mongo = new Mongo('localhost');
mydb = mongo.getDB('mydb');

var res = mydb.age.mapReduce(map, reduce, {out: colName});

shellPrint(res);

ソースコードのコメントにも書いたが同一キーに対するレコード数が1件の場合は reduce 処理が実施されていないっぽい。 実施してみると結果がおかしい。
$ mongo cnt.age.2.js 
MongoDB shell version: 2.0.2
connecting to: test
{
 "result" : "age.cnt2",
 "timeMillis" : 23,
 "counts" : {
  "input" : 180,
  "emit" : 180,
  "reduce" : 3,
  "output" : 4
 },
 "ok" : 1,
}

$ mongo
MongoDB shell version: 2.0.2
connecting to: test
> use mydb
switched to db mydb
> db.age.cnt2.find()
{ "_id" : "11~20", "value" : 20 }               // 11~20歳の人が20人
{ "_id" : "21~30", "value" : 125 }              // 21~30歳の人が125人
{ "_id" : "31~40", "value" : 34 }               // 31~40歳の人が34人
{ "_id" : "41~50", "value" : { "count" : 1 } }  // value が json になってしまっている!!
>

修正してみた。

cnt.age.3.js
var colName = 'age.cnt3';

var map = function() {
  var age = this.age - 1;
  var range = ((age - (age % 10)) / 10) * 10; 
  var label = String(range + 1) + "~" + String(range + 10);

  emit(
    label,
    {count: 1}
  );  
}

var reduce = function(key, values) {
  var count = 0;

  values.forEach(function(value) {
    count += value.count;
  }); 

  return {count: count};  // 全ての結果を json で記述するという非本質的な解決策w
}


mongo = new Mongo('localhost');
mydb = mongo.getDB('mydb');

var res = mydb.age.mapReduce(map, reduce, {out: colName});

shellPrint(res);

気をとりなおして再集計。
$ mongo cnt.age.3.js 
MongoDB shell version: 2.0.2
connecting to: test
{
 "result" : "age.cnt3",
 "timeMillis" : 24,
 "counts" : {
  "input" : 180,
  "emit" : 180,
  "reduce" : 3,
  "output" : 4
 },
 "ok" : 1,
}

$ mongo
MongoDB shell version: 2.0.2
connecting to: test
> use mydb
switched to db mydb
> db.age.cnt3.find()
{ "_id" : "11~20", "value" : { "count" : 20 } }
{ "_id" : "21~30", "value" : { "count" : 125 } }
{ "_id" : "31~40", "value" : { "count" : 34 } }
{ "_id" : "41~50", "value" : { "count" : 1 } }
>
全部 json なら問題なしw


2. 統計量の算出
まずは平均値を計算してみる。 全ドキュメントを走査する事により件数および合計値が得られるので、それらを加工する為の finalize 関数を定義して平均値を導出する。 処理の順番としては
  1. map
  2. reduce
  3. finalize
となる。
また、mapReduce 実施の際に scope で map/reduce/finalize から参照可能な値を渡してみた(下記の例では datetime)。

avg.age.js
var colName = 'age.avg'

var map = function() {
  emit(
    datetime,   // scope で指定された変数を参照 *同一のキーが指定された事になる
    {age: this.age, count: 1}
  );  
}

var reduce = function(key, values) {
  var sum = 0;
  var count = 0;

  values.forEach(function(value) {
    sum += value.age;
    count += value.count;
  }); 

  return {sumAge: sum, count: count};
}

// finalize で最終的な出力を作成出来る
var finalize = function(key, value) {
  return {
          sumAge: value.sumAge,
          count: value.count,
          average: value.sumAge / value.count
  };
}


var mongo = new Mongo('localhost');
var mydb = mongo.getDB('mydb');

// scope で map/reduce/finalize から参照できる値を定義可能
var res = mydb.age.mapReduce(
            map,
            reduce,
            {
              finalize: finalize,
              out: {merge: colName},
              scope: {datetime: new Date()}
            }
          );

shellPrint(res);
結果
$ mongo avg.age.js 
MongoDB shell version: 2.0.2
connecting to: test
{
 "result" : "age.avg",
 "timeMillis" : 26,
 "counts" : {
  "input" : 180,
  "emit" : 180,
  "reduce" : 1,
  "output" : 1
 },
 "ok" : 1,
}

$ mongo
MongoDB shell version: 2.0.2
connecting to: test
> use mydb
switched to db mydb
> db.age.avg.find()
{ "_id" : ISODate("2012-02-03T22:27:19.480Z"), "value" : { "sumAge" : 4753, "count" : 180, "average" : 26.405555555555555 } }

これから上記に加えて分散・標準偏差という平均値を用いて算出される統計量を求めてみる。 1段目の MapReduce 処理の結果として前述の mydb.age.avg コレクションを用いて2段目の MapReduce を実施するという流れが自然ではあるが 今回は MapReduce の処理結果をメモリ上に保持する仕組みを用いて中間コレクションを作成せずに2段 MapReduce を実現してみたいと思う。 具体的には MapReduce 実施時の out 引数に "inline" を指定する事で結果をメモリ上に保持する事が可能となる。

stat.age.js
var colName = 'age.stat'

var mapSum = function() {
  emit(
    1,
    {age: this.age, count: 1}
  );
}

var reduceSum = function(key, values) {
  var sum = 0;
  var count = 0;

  values.forEach(function(value) {
    sum += value.age;
    count += value.count;
  });

  return {sumAge: sum, count: count};
}

var map = function() {
  emit(
    datetime,
    this.age
  );
}

var reduce = function(key, values) {
  var sum = 0;
  values.forEach(function(value) {
    sum += (value - mean) * (value - mean);
  }); 

  return sum;
}

var finalize = function(key, value) {
  return {
          count: count,
          sum: sum,
          mean: mean,
          variance: value / count,
          sd: Math.sqrt(value / count)
  };
}

var mongo = new Mongo('localhost');
var mydb = mongo.getDB('mydb');

// get results
var res = mydb.age.mapReduce(
            mapSum,
            reduceSum,
            {out: {inline: 1}} // 結果を戻り値(res)に格納
          );

// res.results はキーの数に一致するサイズの配列
// *今回は1キーのみ
var sumAge  = res.results[0].value.sumAge;
var count   = res.results[0].value.count;
var mean = sumAge / count;

// get variance & sd
var res = mydb.age.mapReduce(
            map,
            reduce,
            {
              finalize: finalize,
              out: {merge: colName},
              scope: {
                datetime: new Date(),
                sum: sumAge,
                count: count,
                mean: mean
              }
            }
          );

shellPrint(res);

実施してみた。
$ mongo stat.age.js 
MongoDB shell version: 2.0.2
connecting to: test
{
 "result" : "age.stat",
 "timeMillis" : 6,
 "counts" : {
  "input" : 180,
  "emit" : 180,
  "reduce" : 1,
  "output" : 1
 },
 "ok" : 1,
}

$ mongo
MongoDB shell version: 2.0.2
connecting to: test
> use mydb
switched to db mydb
> db.age.stat.find()
{ "_id" : ISODate("2012-02-03T22:39:12.608Z"), "value" : { "count" : 180, "sum" : 4753, "mean" : 26.405555555555555, "variance" : 26.885524691358064, "sd" : 5.18512533034237 } }

0 件のコメント:

コメントを投稿