exponenta event banner

Анализ больших данных в MATLAB с помощью MapReduce

В этом примере показано, как использовать mapreduce функция обработки большого объема файловых данных. Алгоритм MapReduce является основой многих современных приложений для обработки больших данных. Этот пример работает на одном компьютере, но код может быть увеличен для использования Hadoop ®.

Во всем этом примере набор данных представляет собой набор записей Американской статистической ассоциации для внутренних авиаперевозок США в период с 1987 по 2008 год. Если вы уже экспериментировали с «большими данными», возможно, вы уже знакомы с этим набором данных. Небольшое подмножество этого набора данных включено в MATLAB ® для выполнения этого и других примеров.

Введение в хранилища данных

Создание хранилища данных позволяет получить доступ к коллекции данных блочным способом. Хранилище данных может обрабатывать произвольно большие объемы данных, и данные могут быть даже распределены по нескольким файлам. Можно создать хранилище данных для многих типов файлов, включая коллекцию табличных текстовых файлов (показано здесь), базу данных SQL (требуется Toolbox™ базы данных) или распределенную файловую систему Hadoop ® (HDFS™).

Создание хранилища данных для коллекции табличных текстовых файлов и предварительный просмотр содержимого.

ds = tabularTextDatastore('airlinesmall.csv');
dsPreview = preview(ds);
dsPreview(:,10:15)
ans=8×6 table
    FlightNum    TailNum    ActualElapsedTime    CRSElapsedTime    AirTime    ArrDelay
    _________    _______    _________________    ______________    _______    ________

      1503       {'NA'}             53                 57          {'NA'}         8   
      1550       {'NA'}             63                 56          {'NA'}         8   
      1589       {'NA'}             83                 82          {'NA'}        21   
      1655       {'NA'}             59                 58          {'NA'}        13   
      1702       {'NA'}             77                 72          {'NA'}         4   
      1729       {'NA'}             61                 65          {'NA'}        59   
      1763       {'NA'}             84                 79          {'NA'}         3   
      1800       {'NA'}            155                143          {'NA'}        11   

Хранилище данных автоматически анализирует входные данные и делает наилучшее предположение относительно типа данных в каждом столбце. В этом случае используйте 'TreatAsMissing' аргумент пары «имя-значение» для правильной замены отсутствующих значений. Для числовых переменных (например, 'AirTime'), tabularTextDatastore заменяет каждый экземпляр 'NA' с NaN значение, которое является арифметическим представлением IEEE для Not-a-Number.

ds = tabularTextDatastore('airlinesmall.csv', 'TreatAsMissing', 'NA');
ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'TailNum')} = '%s';
ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'CancellationCode')} = '%s';
dsPreview = preview(ds);
dsPreview(:,{'AirTime','TaxiIn','TailNum','CancellationCode'})
ans=8×4 table
    AirTime    TaxiIn    TailNum    CancellationCode
    _______    ______    _______    ________________

      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     

Поиск интересующих строк

Объекты хранилища данных содержат внутренний указатель для отслеживания того, какой блок данных read функция возвращает следующий. Используйте hasdata и read для перехода ко всему набору данных и фильтрации набора данных только по интересующим строкам. При этом рядами интересов являются рейсы на United Airlines («UA»), вылетающие из Бостона («BOS»).

subset = [];

while hasdata(ds)
    t = read(ds);
    t = t(strcmp(t.UniqueCarrier, 'UA') & strcmp(t.Origin, 'BOS'), :);
    subset = vertcat(subset, t);
end

subset(1:10,[9,10,15:17])
ans=10×5 table
    UniqueCarrier    FlightNum    ArrDelay    DepDelay    Origin 
    _____________    _________    ________    ________    _______

       {'UA'}           121          -9           0       {'BOS'}
       {'UA'}          1021          -9          -1       {'BOS'}
       {'UA'}           519          15           8       {'BOS'}
       {'UA'}           354           9           8       {'BOS'}
       {'UA'}           701         -17           0       {'BOS'}
       {'UA'}           673          -9          -1       {'BOS'}
       {'UA'}            91          -3           2       {'BOS'}
       {'UA'}           335          18           4       {'BOS'}
       {'UA'}          1429           1          -2       {'BOS'}
       {'UA'}            53          52          13       {'BOS'}

Введение в mapreduce

MapReduce - это алгоритмический метод «разделения и преодоления» проблем больших данных. В MATLAB, mapreduce требуется три входных аргумента:

  1. Хранилище данных для чтения данных из

  2. Функция «mapper», которая получает подмножество данных для работы. Вывод функции отображения является частичным вычислением. mapreduce вызывает функцию отображения один раз для каждого блока в хранилище данных, при этом каждый вызов работает независимо.

  3. Функция «редуктора», которая задается совокупными выходами функции отображения. Функция редуктора завершает вычисление, начатое функцией отображения, и выводит окончательный ответ.

Это является чрезмерным упрощением в некоторой степени, поскольку выходной сигнал вызова функции отображения может быть перетасован и объединен интересными способами перед передачей функции редуктора. Это будет рассмотрено позже в этом примере.

Использовать mapreduce для выполнения вычисления

Простое использование mapreduce это найти наибольшее время полета во всем наборе данных авиакомпании. Для этого:

  1. Функция «mapper» вычисляет максимум каждого блока из хранилища данных.

  2. Затем функция «редуктора» вычисляет максимальное значение среди всех максимумов, вычисленных вызовами функции отображения.

Сначала следует сбросить хранилище данных и отфильтровать переменные по одному столбцу, представляющему интерес.

reset(ds);
ds.SelectedVariableNames = {'ActualElapsedTime'};

Запишите функцию отображения, maxTimeMapper.m. Требуется три входных аргумента:

  1. Входные данные, которые представляют собой таблицу, полученную путем применения read в хранилище данных.

  2. Совокупность конфигурационной и контекстной информации, info. Это можно игнорировать в большинстве случаев, как здесь.

  3. Промежуточный объект хранения данных, который записывает результаты вычислений из функции отображения. Используйте add добавление пар Ключ/Значение к этому промежуточному выходу. В этом примере имя ключа ('MaxElapsedTime') является произвольным.

Сохраните следующую функцию отображения (maxTimeMapper.m) в текущей папке.

function maxTimeMapper(data, ~, intermKVStore)
  maxElapsedTime = max(data{:,:});
  add(intermKVStore, "MaxElapsedTime", maxElapsedTime)
end

Затем запишите функцию редуктора. Он также принимает три входных аргумента:

  1. Набор входных «клавиш». Ключи будут обсуждаться ниже, но они могут быть проигнорированы в некоторых простых проблемах, как они здесь.

  2. Промежуточный объект ввода данных, mapreduce переходит к функции редуктора. Эти данные имеют форму пар Ключ/Значение и используются hasnext и getnext для итерации значений для каждого ключа.

  3. Конечный объект хранения выходных данных. Используйте add и addmulti функции для непосредственного добавления пар Ключ/Значение к выходу.

Сохраните следующую функцию редуктора (maxTimeReducer.m) в текущей папке.

function maxTimeReducer(~, intermValsIter, outKVStore)
  maxElapsedTime = -Inf;
  while(hasnext(intermValsIter))
    maxElapsedTime = max(maxElapsedTime, getnext(intermValsIter));
  end
  add(outKVStore, "MaxElapsedTime", maxElapsedTime);
end

После записи и сохранения функций мэппера и редуктора в текущей папке можно вызвать mapreduce с использованием хранилища данных, функции отображения и функции редуктора. При наличии панели инструментов Parallel Computing Toolbox (PCT) MATLAB автоматически запускает пул и выполняет параллелизацию. Используйте readall для отображения результатов алгоритма MapReduce.

result = mapreduce(ds, @maxTimeMapper, @maxTimeReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce 100%
readall(result)
ans=1×2 table
           Key             Value  
    __________________    ________

    {'MaxElapsedTime'}    {[1650]}

Использование ключей в mapreduce

Использование ключей является важной и мощной особенностью mapreduce. Каждый вызов функции отображения добавляет промежуточные результаты к одному или нескольким именованным «сегментам», называемым клавишами. Количество вызовов функции отображения по mapreduce соответствует количеству блоков в хранилище данных.

Если функция отображения добавляет значения к нескольким клавишам, это приводит к нескольким вызовам функции редуктора, причем каждый вызов работает только с промежуточными значениями одной клавиши. mapreduce функция автоматически управляет этим перемещением данных между отображением и сокращает фазы алгоритма.

Такая гибкость полезна во многих контекстах. В приведенном ниже примере ключи используются относительно очевидным образом в иллюстративных целях.

Вычисление групповых метрик с помощью mapreduce

Поведение функции отображения в этом приложении более сложное. Для каждого авианосца, найденного во входных данных, используйте add для добавления вектора значений. Этот вектор является подсчетом количества рейсов для данного перевозчика в каждый день за 21 + лет данных. Код несущей является ключом для этого вектора значений. Это гарантирует, что все данные для каждого оператора связи будут сгруппированы вместе, когдаmapreduce передает его функции редуктора.

Сохраните следующую функцию отображения (countFlightsMapper.m) в текущей папке.

function countFlightsMapper(data, ~, intermKVStore)
  dayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1;
  daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;
  [airlineName, ~, airlineIndex] = unique(data.UniqueCarrier, 'stable');

  for i = 1:numel(airlineName)
    dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]);
    add(intermKVStore, airlineName{i}, dayTotals);
  end
end

Функция редуктора менее сложна. Он просто итерирует промежуточные значения и складывает векторы вместе. По завершении она выводит значения в этом агрегированном векторе. Обратите внимание, что функция редуктора не нуждается в сортировке или проверке intermediateKeysIn значения; каждый вызов функции редуктора по mapreduce передает значения только для одного авиаперевозчика.

Сохраните следующую функцию редуктора (countFlightsReducer.m) в текущей папке.

function countFlightsReducer(intermKeysIn, intermValsIter, outKVStore)
  daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;
  dayArray = zeros(daysSinceEpoch, 1);

  while hasnext(intermValsIter)
    dayArray = dayArray + getnext(intermValsIter);
  end
  add(outKVStore, intermKeysIn, dayArray);
end

Сбросьте хранилище данных и выберите интересующие переменные. После записи и сохранения функций мэппера и редуктора в текущей папке можно вызвать mapreduce с использованием хранилища данных, функции отображения и функции редуктора.

reset(ds);
ds.SelectedVariableNames = {'Year', 'Month', 'DayofMonth', 'UniqueCarrier'};
result = mapreduce(ds, @countFlightsMapper, @countFlightsReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce  10%
Map 100% Reduce  21%
Map 100% Reduce  31%
Map 100% Reduce  41%
Map 100% Reduce  52%
Map 100% Reduce  62%
Map 100% Reduce  72%
Map 100% Reduce  83%
Map 100% Reduce  93%
Map 100% Reduce 100%
result = readall(result);

Если этот пример был выполнен только с набором данных образца, загрузите результаты mapreduce алгоритм выполняется по всему набору данных.

load airlineResults

Визуализация результатов

Используя только 7 ведущих операторов связи, сглаживайте данные, чтобы удалить эффекты поездок в выходные дни. Иначе это загромождало бы визуализацию.

lines = result.Value;
lines = horzcat(lines{:});
[~,sortOrder] = sort(sum(lines), 'descend');
lines = lines(:,sortOrder(1:7));
result = result(sortOrder(1:7),:);

lines(lines==0) = nan;
lines = smoothdata(lines,'gaussian');

Постройте график данных.

figure('Position',[1 1 800 600]);
plot(datetime(1987,10,1):caldays(1):datetime(2008,12,31),lines,'LineWidth',2)
title ('Domestic airline flights per day per carrier')
xlabel('Date')
ylabel('Flights per day')
legend(result.Key, 'Location', 'Best')

Figure contains an axes. The axes with title Domestic airline flights per day per carrier contains 7 objects of type line. These objects represent DL, WN, AA, US, UA, NW, CO.

Сюжет показывает появление Southwest Airlines (WN) в этот временной промежуток.

Больше знаний

В этом примере царапается только поверхность того, что возможно с помощью mapreduce. См. документацию для mapreduce для получения дополнительной информации, включая информацию об использовании с Hadoop и MATLAB ® Parallel Server™.

Локальные функции

Здесь перечислены локальные функции, которые mapreduce относится к данным.

function maxTimeMapper(data, ~, intermKVStore)
  maxElapsedTime = max(data{:,:});
  add(intermKVStore, "MaxElapsedTime", maxElapsedTime)
end
%-----------------------------------------------------------------------
function maxTimeReducer(~, intermValsIter, outKVStore)
  maxElapsedTime = -Inf;
  while(hasnext(intermValsIter))
    maxElapsedTime = max(maxElapsedTime, getnext(intermValsIter));
  end
  add(outKVStore, "MaxElapsedTime", maxElapsedTime);
end
%-----------------------------------------------------------------------
function countFlightsMapper(data, ~, intermKVStore)
  dayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1;
  daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;
  [airlineName, ~, airlineIndex] = unique(data.UniqueCarrier, 'stable');

  for i = 1:numel(airlineName)
    dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]);
    add(intermKVStore, airlineName{i}, dayTotals);
  end
end
%-----------------------------------------------------------------------
function countFlightsReducer(intermKeysIn, intermValsIter, outKVStore)
  daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;
  dayArray = zeros(daysSinceEpoch, 1);

  while hasnext(intermValsIter)
    dayArray = dayArray + getnext(intermValsIter);
  end
  add(outKVStore, intermKeysIn, dayArray);
end
%-----------------------------------------------------------------------

См. также

|

Связанные темы