Анализ Больших данных в MATLAB с помощью MapReduce

В этом примере показано, как использовать mapreduce функция для обработки большого объема файловых данных. Алгоритм MapReduce является основой многих современных приложений «больших данных». Этот пример работает на одном компьютере, но код может масштабироваться до использования Hadoop ®.

На протяжении всего этого примера набор данных представляет собой набор записей Американской статистической ассоциации для рейсов внутренних авиакомпаний США в период с 1987 по 2008 год. Если вы экспериментировали с «большими данными» раньше, вы, возможно, уже знакомы с этим набором данных. Небольшая подмножество этого набора данных включена в MATLAB ®, чтобы позволить вам запустить этот и другие примеры.

Введение в хранилища данных

Создание datastore позволяет вам получить доступ к набору данных на основе блоков. Datastore может обрабатывать произвольно большие объемы данных, и данные могут даже быть распределены по нескольким файлам. Можно создать datastore для многих типов файлов, включая набор табличных текстовых файлов (демонстрируется здесь), базу данных SQL (Toolbox™ базы данных) или распределенную файловую систему Hadoop ® (HDFS™).

Создайте datastore для набора табличных текстовых файлов и предворительно просмотрите содержимое.

ds = tabularTextDatastore('airlinesmall.csv');
dsPreview = preview(ds);
dsPreview(:,10:15)
ans=8×6 table
    FlightNum    TailNum    ActualElapsedTime    CRSElapsedTime    AirTime    ArrDelay
    _________    _______    _________________    ______________    _______    ________

      1503       {'NA'}             53                 57          {'NA'}         8   
      1550       {'NA'}             63                 56          {'NA'}         8   
      1589       {'NA'}             83                 82          {'NA'}        21   
      1655       {'NA'}             59                 58          {'NA'}        13   
      1702       {'NA'}             77                 72          {'NA'}         4   
      1729       {'NA'}             61                 65          {'NA'}        59   
      1763       {'NA'}             84                 79          {'NA'}         3   
      1800       {'NA'}            155                143          {'NA'}        11   

Datastore автоматически анализирует входные данные и делает лучшее предположение относительно типа данных в каждом столбце. В этом случае используйте 'TreatAsMissing' аргумент пары "имя-значение" для правильной замены отсутствующих значений. Для числовых переменных (таких как 'AirTime'), tabularTextDatastore заменяет каждый образец 'NA' с NaN значение, которое является арифметическим представлением IEEE для не число.

ds = tabularTextDatastore('airlinesmall.csv', 'TreatAsMissing', 'NA');
ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'TailNum')} = '%s';
ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'CancellationCode')} = '%s';
dsPreview = preview(ds);
dsPreview(:,{'AirTime','TaxiIn','TailNum','CancellationCode'})
ans=8×4 table
    AirTime    TaxiIn    TailNum    CancellationCode
    _______    ______    _______    ________________

      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     

Скан для строк , представляющих интерес

Объекты Datastore содержат внутренний указатель, чтобы отслеживать, какой блок данных read функция возвращается следующим. Используйте hasdata и read функций, чтобы пройти через весь набор данных и фильтровать набор данных только к строкам , представляющим интерес. В этом случае строками , представляющими интерес являются рейсы на United Airlines («UA»), вылетающие из Бостона («BOS»).

subset = [];

while hasdata(ds)
    t = read(ds);
    t = t(strcmp(t.UniqueCarrier, 'UA') & strcmp(t.Origin, 'BOS'), :);
    subset = vertcat(subset, t);
end

subset(1:10,[9,10,15:17])
ans=10×5 table
    UniqueCarrier    FlightNum    ArrDelay    DepDelay    Origin 
    _____________    _________    ________    ________    _______

       {'UA'}           121          -9           0       {'BOS'}
       {'UA'}          1021          -9          -1       {'BOS'}
       {'UA'}           519          15           8       {'BOS'}
       {'UA'}           354           9           8       {'BOS'}
       {'UA'}           701         -17           0       {'BOS'}
       {'UA'}           673          -9          -1       {'BOS'}
       {'UA'}            91          -3           2       {'BOS'}
       {'UA'}           335          18           4       {'BOS'}
       {'UA'}          1429           1          -2       {'BOS'}
       {'UA'}            53          52          13       {'BOS'}

Введение в mapreduce

MapReduce является алгоритмическим методом, чтобы «разделить и завоевать» проблемы с большими данными. В MATLAB, mapreduce требует трех входных параметров:

  1. Datastore, для чтения данных из

  2. Функция «mapper», которой задается подмножество данных для работы. Выходы функции map являются частичным вычислением. mapreduce вызывает функцию mapper один раз для каждого блока в datastore, при этом каждый вызов работает независимо.

  3. Функция «reducer», которая задается совокупными выходами от функции mapper. Функция редуктора завершает расчет, начатую функцией mapper, и выводит окончательный ответ.

Это в некоторой степени избыточное упрощение, поскольку выход вызова функции mapper может быть перемещен и объединен интересными способами перед передачей в функцию reducer. Это будет рассмотрено позже в этом примере.

Использование mapreduce для выполнения расчетов

Простое использование mapreduce - найти самое длинное время рейса во всем наборе данных авиакомпании. Для этого:

  1. Функция «mapper» вычисляет максимум каждого блока из datastore.

  2. Функция «reducer» затем вычисляет максимальное значение среди всех максимумов, вычисленных вызовами функции mapper.

Сначала сбросьте datastore и фильтруйте переменные в один интересующий столбец.

reset(ds);
ds.SelectedVariableNames = {'ActualElapsedTime'};

Напишите функцию mapper, maxTimeMapper.m. Для этого требуется три входных параметров:

  1. Входные данные, которая является таблицей, полученной путем применения read функцию в datastore.

  2. Набор строения и контекстной информации, info. Это может быть проигнорировано в большинстве случаев, как и здесь.

  3. Промежуточный объект хранения данных, который записывает результаты вычислений из функции mapper. Используйте add функция для добавления пар Ключ/Значение к этому промежуточному выходу. В этом примере имя ключа ('MaxElapsedTime') произвольно.

Сохраните следующую функцию mapper (maxTimeMapper.m) в текущей папке.

function maxTimeMapper(data, ~, intermKVStore)
  maxElapsedTime = max(data{:,:});
  add(intermKVStore, "MaxElapsedTime", maxElapsedTime)
end

Далее запишите функцию редуктора. Это также принимает три входных параметров:

  1. Набор входа «ключей». Ключи будут обсуждаться далее ниже, но их можно игнорировать в некоторых простых задачах, так как они здесь.

  2. Объект входа промежуточных данных, который mapreduce переходит к функции редуктора. Эти данные находятся в форме пар Ключ/Значение, и вы используете hasnext и getnext функции для итерации значений для каждой клавиши.

  3. Конечный выходной объект хранения данных. Используйте add и addmulti функции для непосредственного добавления пар Ключ/Значение к выходу.

Сохраните следующую функцию редуктора (maxTimeReducer.m) в текущей папке.

function maxTimeReducer(~, intermValsIter, outKVStore)
  maxElapsedTime = -Inf;
  while(hasnext(intermValsIter))
    maxElapsedTime = max(maxElapsedTime, getnext(intermValsIter));
  end
  add(outKVStore, "MaxElapsedTime", maxElapsedTime);
end

Если функции mapper и reducer записаны и сохранены в текущей папке, можно вызвать mapreduce использование datastore, функции mapper и функции редуктора. Если у вас есть Parallel Computing Toolbox (PCT), MATLAB автоматически запустит пул и распараллелит выполнение. Используйте readall функция для отображения результатов алгоритма MapReduce.

result = mapreduce(ds, @maxTimeMapper, @maxTimeReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce 100%
readall(result)
ans=1×2 table
           Key             Value  
    __________________    ________

    {'MaxElapsedTime'}    {[1650]}

Использование ключей в mapreduce

Использование ключей является важной и мощной функцией mapreduce. Каждый вызов функции mapper добавляет промежуточные результаты в один или несколько именованных «блоков», называемых ключами. Количество вызовов функции mapper по mapreduce соответствует количеству блоков в datastore.

Если функция mapper добавляет значения нескольким клавишам, это приводит к нескольким вызовам функции редуктора, при этом каждый вызов работает только с промежуточными значениями одной клавиши. The mapreduce функция автоматически управляет этим перемещением данных между картой и уменьшением фаз алгоритма.

Эта гибкость полезна во многих контекстах. Приведенный ниже пример использует ключи относительно очевидным способом в иллюстративных целях.

Вычисление групповых метрик с mapreduce

Поведение функции mapper в этом приложении сложнее. Для каждого летного аппарата, найденного в входных данных, используйте add функция для добавления вектора значений. Этот вектор является количеством рейсов для этого перевозчика в каждый день за 21 + год данных. Код поставщика услуг является ключом для этого вектора значений. Это гарантирует, что все данные для каждой системы связи будут сгруппированы вместе при mapreduce передает его в функцию редуктора.

Сохраните следующую функцию mapper (countFlightsMapper.m) в текущей папке.

function countFlightsMapper(data, ~, intermKVStore)
  dayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1;
  daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;
  [airlineName, ~, airlineIndex] = unique(data.UniqueCarrier, 'stable');

  for i = 1:numel(airlineName)
    dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]);
    add(intermKVStore, airlineName{i}, dayTotals);
  end
end

Функция редуктора менее комплексна. Это просто итерация над промежуточными значениями и добавление векторов вместе. По завершении он выводит значения в этом совокупном векторе. Обратите внимание, что функции редуктора не нужно сортировать или исследовать intermediateKeysIn значения; каждый вызов функции редуктора по mapreduce передает значения только для одного авиаперевозчика.

Сохраните следующую функцию редуктора (countFlightsReducer.m) в текущей папке.

function countFlightsReducer(intermKeysIn, intermValsIter, outKVStore)
  daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;
  dayArray = zeros(daysSinceEpoch, 1);

  while hasnext(intermValsIter)
    dayArray = dayArray + getnext(intermValsIter);
  end
  add(outKVStore, intermKeysIn, dayArray);
end

Сбросьте datastore и выберите интересующие вас переменные. Если функции mapper и reducer записаны и сохранены в текущей папке, можно вызвать mapreduce использование datastore, функции mapper и функции редуктора.

reset(ds);
ds.SelectedVariableNames = {'Year', 'Month', 'DayofMonth', 'UniqueCarrier'};
result = mapreduce(ds, @countFlightsMapper, @countFlightsReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce  10%
Map 100% Reduce  21%
Map 100% Reduce  31%
Map 100% Reduce  41%
Map 100% Reduce  52%
Map 100% Reduce  62%
Map 100% Reduce  72%
Map 100% Reduce  83%
Map 100% Reduce  93%
Map 100% Reduce 100%
result = readall(result);

В случае, если этот пример был запущен только с набором выборочных данных, загрузите результаты mapreduce алгоритм запускается на целом наборе данных.

load airlineResults

Визуализация результатов

Используя только 7 лучших перевозчиков, сглаживайте данные, чтобы удалить эффекты путешествий выходного дня. Это в противном случае загромождает визуализацию.

lines = result.Value;
lines = horzcat(lines{:});
[~,sortOrder] = sort(sum(lines), 'descend');
lines = lines(:,sortOrder(1:7));
result = result(sortOrder(1:7),:);

lines(lines==0) = nan;
lines = smoothdata(lines,'gaussian');

Постройте график данных.

figure('Position',[1 1 800 600]);
plot(datetime(1987,10,1):caldays(1):datetime(2008,12,31),lines,'LineWidth',2)
title ('Domestic airline flights per day per carrier')
xlabel('Date')
ylabel('Flights per day')
legend(result.Key, 'Location', 'Best')

Figure contains an axes. The axes with title Domestic airline flights per day per carrier contains 7 objects of type line. These objects represent DL, WN, AA, US, UA, NW, CO.

График показывает появление Southwest Airlines (WN) в течение этого временного периода.

Узнайте больше

Этот пример только царапает поверхность того, что возможно с mapreduce. См. документацию для mapreduce для получения дополнительной информации, включая информацию об использовании с Hadoop и MATLAB ® Parallel Server™.

Локальные функции

Здесь перечислены локальные функции, которые mapreduce применяется к данным.

function maxTimeMapper(data, ~, intermKVStore)
  maxElapsedTime = max(data{:,:});
  add(intermKVStore, "MaxElapsedTime", maxElapsedTime)
end
%-----------------------------------------------------------------------
function maxTimeReducer(~, intermValsIter, outKVStore)
  maxElapsedTime = -Inf;
  while(hasnext(intermValsIter))
    maxElapsedTime = max(maxElapsedTime, getnext(intermValsIter));
  end
  add(outKVStore, "MaxElapsedTime", maxElapsedTime);
end
%-----------------------------------------------------------------------
function countFlightsMapper(data, ~, intermKVStore)
  dayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1;
  daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;
  [airlineName, ~, airlineIndex] = unique(data.UniqueCarrier, 'stable');

  for i = 1:numel(airlineName)
    dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]);
    add(intermKVStore, airlineName{i}, dayTotals);
  end
end
%-----------------------------------------------------------------------
function countFlightsReducer(intermKeysIn, intermValsIter, outKVStore)
  daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;
  dayArray = zeros(daysSinceEpoch, 1);

  while hasnext(intermValsIter)
    dayArray = dayArray + getnext(intermValsIter);
  end
  add(outKVStore, intermKeysIn, dayArray);
end
%-----------------------------------------------------------------------

См. также

|

Похожие темы