Вычисление среднего по группам с использованием MapReduce

В этом примере показано, как вычислить среднее значение по группам в наборе данных с помощью mapreduce. Он демонстрирует, как выполнять расчеты на подгруппах данных.

Подготовка данных

Создайте datastore с помощью airlinesmall.csv набор данных. Этот 12-мегабайтный набор данных содержит 29 столбцов информации о рейсе для нескольких авиаперевозчиков, включая время прибытия и вылета. В этом примере выберите DayOfWeek и ArrDelay(задержка прибытия рейса) как интересующие переменные.

ds = tabularTextDatastore('airlinesmall.csv', 'TreatAsMissing', 'NA');
ds.SelectedVariableNames = {'ArrDelay', 'DayOfWeek'};

datastore лечит 'NA' значения как отсутствующие и заменяет отсутствующие значения на NaN значения по умолчанию. Кроме того, SelectedVariableNames свойство позволяет работать только с выбранными интересующими вас переменными, которые можно проверить используя preview.

preview(ds)
ans=8×2 table
    ArrDelay    DayOfWeek
    ________    _________

        8           3    
        8           1    
       21           5    
       13           5    
        4           4    
       59           3    
        3           4    
       11           6    

Запуск MapReduce

The mapreduce функция требует функции map и функции reduce в качестве входов. mapper получает блоки данных и выводит промежуточные результаты. Редуктор считывает промежуточные результаты и дает конечный результат.

В этом примере mapper вычисляет количество и сумму задержек по дням недели в каждом блоке данных, а затем сохраняет результаты как промежуточные пары "ключ-значение". Ключи являются целыми числами (от 1 до 7), представляющими дни недели, и значения являются двухэлементными векторами, представляющими количество и сумму задержки каждого дня.

Отобразите файл функции map.

function meanArrivalDelayByDayMapper(data, ~, intermKVStore)
  % Data is an n-by-2 table: first column is the DayOfWeek and the second
  % is the ArrDelay. Remove missing values first.
  delays = data.ArrDelay;
  day = data.DayOfWeek;
  notNaN = ~isnan(delays);
  day = day(notNaN);
  delays = delays(notNaN);

  % find the unique days in this chunk
  [intermKeys,~,idx] = unique(day, 'stable');

  % group delays by idx and apply @grpstatsfun function to each group
  intermVals = accumarray(idx,delays,size(intermKeys),@countsum);
  addmulti(intermKVStore,intermKeys,intermVals);

  function out = countsum(x)
    n = length(x); % count
    s = sum(x); % mean
    out = {[n, s]};
  end
end

После фазы Map, mapreduce группирует промежуточные пары "ключ-значение" по уникальному ключу (в данном случае, день недели). Таким образом, каждый вызов редуктора работает по значениям, связанным с одним днем недели. Редуктор получает список промежуточного счета и суммы задержек за день, заданный входом ключом (intermKey) и суммирует значения в общее количество, n и общая сумма s. Затем редуктор вычисляет общее среднее и добавляет к выходу одну последнюю пару "ключ-значение". Эта пара "ключ-значение" представляет среднюю задержку прибытия рейса на один день недели.

Отобразите файл функции сокращения.

function meanArrivalDelayByDayReducer(intermKey, intermValIter, outKVStore)
  n = 0;
  s = 0;

  % get all sets of intermediate results
  while hasnext(intermValIter)
    intermValue = getnext(intermValIter);
    n = n + intermValue(1);
    s = s + intermValue(2);
  end

  % accumulate the sum and count
  mean = s/n;
  % add results to the output datastore
  add(outKVStore,intermKey,mean);
end

Использование mapreduce для применения карты и сокращения функций к datastore, ds.

meanDelayByDay = mapreduce(ds, @meanArrivalDelayByDayMapper, ...
                               @meanArrivalDelayByDayReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce  14%
Map 100% Reduce  29%
Map 100% Reduce  43%
Map 100% Reduce  57%
Map 100% Reduce  71%
Map 100% Reduce  86%
Map 100% Reduce 100%

mapreduce возвращает datastore, meanDelayByDay, с файлами в текущей папке.

Считайте конечный результат из выхода datastore, meanDelayByDay.

result = readall(meanDelayByDay)
result=7×2 table
    Key      Value   
    ___    __________

     3     {[7.0038]}
     1     {[7.0833]}
     5     {[9.4193]}
     4     {[9.3185]}
     6     {[4.2095]}
     2     {[5.8569]}
     7     {[6.5241]}

Организуйте результаты

Целочисленные ключи (от 1 до 7) представляют дни недели. Чтобы организовать дополнительные результаты, преобразуйте ключи в категориальный массив, извлеките числовые значения из камер одного элемента и переименуйте имена переменных полученной таблицы.

result.Key = categorical(result.Key, 1:7, ...
               {'Mon','Tue','Wed','Thu','Fri','Sat','Sun'});
result.Value = cell2mat(result.Value);
result.Properties.VariableNames = {'DayOfWeek', 'MeanArrDelay'}
result=7×2 table
    DayOfWeek    MeanArrDelay
    _________    ____________

       Wed          7.0038   
       Mon          7.0833   
       Fri          9.4193   
       Thu          9.3185   
       Sat          4.2095   
       Tue          5.8569   
       Sun          6.5241   

Сортировка строк таблицы по средней задержке прибытия рейса. Это показывает, что суббота является лучшим днем недели для путешествий, в то время как пятница является худшей.

result = sortrows(result,'MeanArrDelay')
result=7×2 table
    DayOfWeek    MeanArrDelay
    _________    ____________

       Sat          4.2095   
       Tue          5.8569   
       Sun          6.5241   
       Wed          7.0038   
       Mon          7.0833   
       Thu          9.3185   
       Fri          9.4193   

Локальные функции

Вот карта и сокращение функций, которые mapreduce применяется к данным.

function meanArrivalDelayByDayMapper(data, ~, intermKVStore)
  % Data is an n-by-2 table: first column is the DayOfWeek and the second
  % is the ArrDelay. Remove missing values first.
  delays = data.ArrDelay;
  day = data.DayOfWeek;
  notNaN = ~isnan(delays);
  day = day(notNaN);
  delays = delays(notNaN);

  % find the unique days in this chunk
  [intermKeys,~,idx] = unique(day, 'stable');

  % group delays by idx and apply @grpstatsfun function to each group
  intermVals = accumarray(idx,delays,size(intermKeys),@countsum);
  addmulti(intermKVStore,intermKeys,intermVals);

  function out = countsum(x)
    n = length(x); % count
    s = sum(x); % mean
    out = {[n, s]};
  end
end
%---------------------------------------------------------------------------
function meanArrivalDelayByDayReducer(intermKey, intermValIter, outKVStore)
  n = 0;
  s = 0;

  % get all sets of intermediate results
  while hasnext(intermValIter)
    intermValue = getnext(intermValIter);
    n = n + intermValue(1);
    s = s + intermValue(2);
  end

  % accumulate the sum and count
  mean = s/n;
  % add results to the output datastore
  add(outKVStore,intermKey,mean);
end
%---------------------------------------------------------------------------

См. также

|

Похожие темы