Личный сайт Алексея Григорьева

Map/Reduce

MapReduce — модель распределённых вычислений, используемая для параллельных вычислений над большими наборами данных в компьютерных кластерах. [wikipedia]

MapReduce

Работа алгоритма MapReduce состоит из трех шагов: Map, Group и Reduce.

Операция Map реализуется пользователем, и преобразует входную пару {ключ: значение} в набор промежуточных пар. Название эта операция получила от одноименной функции высшего порядка Map для обработки списков, однако имеет ряд различий — в отличие от классической функции возвращающей единственное значение, данная функция возвращает список пар. Назовем эту функцию Mapper, чтобы избежать путаницы между этими двумя функциями. Так же Mapper играет роль фильтра — если для данной пары никаких промежуточных значений возвращать не нужно, функция возвращается пустой список.

Операция группирования выполняется внутри алгоритма MapReduce и пользователем не задается. На этом шаге происходит объединение всех значений для одного и того же ключа и результатом является пара {ключ: список значений}. Например, добавление всех промежуточных значений в коллекцию Multimap из Google Guava может выполнить подобную группировку.

Заключительный этап работы алгоритма выполняется функцией Reduce, так же заданной пользователем. Она принимает на вход пару {ключ: список значений}, и сворачивает список значений в одно, так же, как это делает классическая функция Reduce (folding, accumulate и т.п.). Так же есть возможность реализовать шаг Reduce так, чтобы он не отличался от классической функции Reduce — в этом случае заботу о добавлении ключа к результату свертки берет на себя алгоритм.

Результатом работы алгоритма MapReduce являются пары {ключ: свернутое значение}. Все эти операции легко параллелятся на кластерах из любого числа машин, так как не имеют состояния и не зависят друг от друга.

Пример

В качестве примера рассмотрим функцию подсчета слов в книге. Алгоритм принимает книгу, которая состоит из пар {номер главы: список слов из главы}. Функция Mapper применяется для каждой пары входных данных и возвращает список пар {слово_i: 1} (1 в данном случае — количество упоминаний слова на данном этапе).

(define (word-count-mapper key values)
    (map (lambda (word) (cons word 1)) values) 
)

При группировке все значения для каждого ключа попадают в одну и ту же группу. Например, ({слово1: (1 1 1)} {слово2: (1 1)} … {словоn: (1))}.

На последнем этапе функция Reduce получает список единичек и складывает их. Т.е. выполняет

(foldr + 0 (cdr pair))

Для всех промежуточных пар. Результатом работы MapReduce является список пар {слово: количество упоминаний}.

Реализация

Реализуем часть Map. Функция должна принимать заданную пользователем функцию mapper и применять ее для всех пар входящей последовательности. Т.к. mapper каждый раз возвращает список, необходимо «выравнивать» результат с помощью функции flat:

(define (*map mapper sequence)
    (map (lambda (pair) (mapper (car pair) (cdr pair))) sequence)
)
 
(define (flat sequence)
    (foldr append null sequence)
)
 
(define (flat-map mapper sequence)
    (flat (*map mapper sequence))
)

Далее, реализуем группировку. Список, полученный после предыдущего этапа, содержит пары {ключ: значение}. Объединим все пары с одинаковыми ключами в группы (({ключ1: значение11}, {ключ1: значение12}, …), …, ({ключn: значениеn1}, …)}

Функция add-to-bucket добавляет пару в существующий список, просматривая, есть ли уже группа с данным ключом в списке. Если есть, то пара добавляется в существующую группу, иначе создается новая группа, и пара добавляется туда.

(define (add-to-bucket buckets pair)
    (cond
        ((null? buckets) 
            (list (list pair)))
        ((eq? (caaar buckets) (car pair)) 
            (cons (cons pair (car buckets)) (cdr buckets)))
        (else
            (cons (car buckets) (add-to-bucket (cdr buckets) pair)))
    )
)

Функция sort-into-buckets принимает список пар, и формирует из них группы

(define (sort-into-buckers seq)
    (define (buckets-inner buckets tail)
        (if (null? tail)
            buckets
            (buckets-inner (add-to-bucket buckets (car tail)) (cdr tail))
        )
    )
 
    (buckets-inner null seq)
)

Этап Reduce является заключительным этапом. Функция groupreduce принимает сгруппированный список пар-значений, делает свертку всех значений для каждой группы, и возвращает пару {ключ: свернутое значение}

(define (groupreduce reducer start-value buckets)   
    (define (reduce-bucket bucket)
        (cons (caar bucket)
              (foldr reducer start-value (map cdr bucket)))
    )
 
    (map reduce-bucket buckets)
)

Далее, объединим все эти функции в функции mapreduce :

(define (mapreduce mapper reducer start-value sequence)
    (groupreduce reducer start-value (sort-into-buckers (flat-map mapper sequence)))
)

Использование:

(define lorem-ipsum '(lorem ipsum dolor sit amet sed
    dolor elit lorem ipsum lorem sed vel
    ipsum sit amet dolor sed purus))
 
(define lorem-file (cons 'lorem-ipsum lorem-ipsum))
 
(check-equal?
    (mapreduce word-count-mapper + 0 (list lorem-file))
    '((lorem . 3) (ipsum . 3) (dolor . 3) (sit . 2) (amet . 2) 
        (sed . 3) (elit . 1) (vel . 1) (purus . 1)))

Исходный код
Другие реализации: раз два

Источники и дополнительный материал:

http://ru.wikipedia.org/wiki/MapReduce
Berkley CS 61A Lecture 34: Mapreduce I Mapreduce II

Google Code University, MapReduce

http://habrahabr.ru/post/103467/
http://habrahabr.ru/post/103490/
MapReduce Patterns, Algorithms, and Use Cases
MapReduce for Idiots
http://samolisov.blogspot.com/2010/04/reduce-java.html

Simplest kind of MapReduce in Python

Далее:

Суббота, 21 Июл 2012 в 13:01. Вы можете следить за комментариями к этой статье через RSS 2.0.