Анализ больших данных в MATLAB Используя MapReduce

В этом примере показано, как использовать datastore и mapreduce функции, чтобы обработать большую сумму основанных на файле данных. Алгоритм MapReduce является оплотом многих современных "больших данных" приложения. Этот пример работает с одиночным компьютером, но код может масштабировать, чтобы использовать Hadoop®.

В этом примере набор данных является набором записей от американской Статистической Ассоциации для США внутренние полеты между 1 987 и 2008. Если вы экспериментировали с "большими данными" прежде, можно уже быть знакомы с этим набором данных. Небольшое подмножество этого набора данных включено с MATLAB®, чтобы позволить вам запускать это и другие примеры.

Введение в datastore

Создание datastore позволяет вам получать доступ к набору данных основанным на блоке способом. datastore может обработать произвольно большие объемы данных, и данные могут даже быть распространены через несколько файлов. Можно создать datastore для многих типов файлов, включая набор табличных текстовых файлов (продемонстрированный здесь), база данных SQL (требуемый Database Toolbox™) или Hadoop® Distributed File System (HDFS™).

Создайте datastore для набора табличных текстовых файлов и предварительно просмотрите содержимое.

ds = datastore('airlinesmall.csv');
dsPreview = preview(ds);
dsPreview(:,10:15)
ans=8×6 table
    FlightNum    TailNum    ActualElapsedTime    CRSElapsedTime    AirTime    ArrDelay
    _________    _______    _________________    ______________    _______    ________

      1503       {'NA'}             53                 57          {'NA'}         8   
      1550       {'NA'}             63                 56          {'NA'}         8   
      1589       {'NA'}             83                 82          {'NA'}        21   
      1655       {'NA'}             59                 58          {'NA'}        13   
      1702       {'NA'}             77                 72          {'NA'}         4   
      1729       {'NA'}             61                 65          {'NA'}        59   
      1763       {'NA'}             84                 79          {'NA'}         3   
      1800       {'NA'}            155                143          {'NA'}        11   

Datastore автоматически анализирует входные данные и высказывает лучшее предположение относительно типа данных в каждом столбце. В этом случае используйте 'TreatAsMissing' Аргумент пары "имя-значение", чтобы иметь datastore замените отсутствующие значения правильно. Для числовых переменных (таких как 'AirTime'datastore заменяет каждый экземпляр 'NA' с NaN значение, которое является представлением арифметики IEEE для Не числа.

ds = datastore('airlinesmall.csv', 'TreatAsMissing', 'NA');
ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'TailNum')} = '%s';
ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'CancellationCode')} = '%s';
dsPreview = preview(ds);
dsPreview(:,{'AirTime','TaxiIn','TailNum','CancellationCode'})
ans=8×4 table
    AirTime    TaxiIn    TailNum    CancellationCode
    _______    ______    _______    ________________

      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     

Сканирование для строк, представляющих интерес

datastore объекты содержат внутренний указатель, чтобы отслеживать который блок данных read функция возвращается затем. Используйте hasdata и read функции, чтобы продвинуться через целый набор данных и отфильтровать набор данных только к строкам, представляющим интерес. В этом случае строки, представляющие интерес являются рейсами на United Airlines ("UA"), вылетающий от Бостона ("BOS").

subset = [];

while hasdata(ds)
    t = read(ds);
    t = t(strcmp(t.UniqueCarrier, 'UA') & strcmp(t.Origin, 'BOS'), :);
    subset = vertcat(subset, t);
end

subset(1:10,[9,10,15:17])
ans=10×5 table
    UniqueCarrier    FlightNum    ArrDelay    DepDelay    Origin 
    _____________    _________    ________    ________    _______

       {'UA'}           121          -9           0       {'BOS'}
       {'UA'}          1021          -9          -1       {'BOS'}
       {'UA'}           519          15           8       {'BOS'}
       {'UA'}           354           9           8       {'BOS'}
       {'UA'}           701         -17           0       {'BOS'}
       {'UA'}           673          -9          -1       {'BOS'}
       {'UA'}            91          -3           2       {'BOS'}
       {'UA'}           335          18           4       {'BOS'}
       {'UA'}          1429           1          -2       {'BOS'}
       {'UA'}            53          52          13       {'BOS'}

Введение в mapreduce

MapReduce является алгоритмическим методом, чтобы "разделить и завоевать" большие проблемы данных. В MATLAB, mapreduce требует трех входных параметров:

  1. datastore считывать данные из

  2. Функция "картопостроителя", которой дают подмножество данных, чтобы работать с. Выход функции карты является частичным вычислением. mapreduce вызывает функцию картопостроителя одно время для каждого блока в datastore, с каждым вызовом, действующим независимо.

  3. Функция "редуктора", которой дают совокупные выходные параметры от функции картопостроителя. Функция редуктора закончила расчет, начатый функцией картопостроителя, и выводит окончательный ответ.

Это - упрощение в некоторой степени, поскольку выход вызова функции картопостроителя может быть переставлен и объединен интересными способами прежде чем быть переданным функции редуктора. Это будет исследовано позже в этом примере.

Используйте mapreduce выполнять расчет

Простое использование mapreduce должен найти самое долгое время полета в целом наборе данных авиакомпании. Для этого:

  1. Функция "картопостроителя" вычисляет максимум каждого блока из datastore.

  2. Функция "редуктора" затем вычисляет максимальное значение среди всех максимумов, вычисленных вызовами функции картопостроителя.

Во-первых, сбросьте datastore и отфильтруйте переменные к одному столбцу интереса.

reset(ds);
ds.SelectedVariableNames = {'ActualElapsedTime'};

Запишите функцию картопостроителя, maxTimeMapper.m. Требуется три входных параметра:

  1. Входные данные, который является таблицей, полученной путем применения read функционируйте к datastore.

  2. Набор настройки и контекстной информации, info. Это может быть проигнорировано в большинстве случаев, как это здесь.

  3. Промежуточный объект хранения данных, который записывает результаты вычислений от функции картопостроителя. Используйте add функция, чтобы добавить Пары ключ/значение в этот промежуточный выход. В этом примере, имени ключа ('MaxElapsedTime') произвольно.

Сохраните следующую функцию картопостроителя (maxTimeMapper.m) в вашей текущей папке.

function maxTimeMapper(data, ~, intermKVStore)
  maxElapsedTime = max(data{:,:});
  add(intermKVStore, "MaxElapsedTime", maxElapsedTime)
end

Затем запишите функцию редуктора. Также требуется три входных параметра:

  1. Набор входа "ключи". Ключи будут обсуждены далее ниже, но они могут быть проигнорированы в некоторых простых проблемах, как они здесь.

  2. Промежуточный ввод данных возражает тому mapreduce передачи в функцию редуктора. Эти данные в форме Пар ключ/значение, и вы используете hasnext и getnext функции, чтобы выполнить итерации через значения для каждого ключа.

  3. Объект хранения данных окончательного результата. Используйте add и addmulti функции, чтобы непосредственно добавить Пары ключ/значение в выход.

Сохраните следующую функцию редуктора (maxTimeReducer.m) в вашей текущей папке.

function maxTimeReducer(~, intermValsIter, outKVStore)
  maxElapsedTime = -Inf;
  while(hasnext(intermValsIter))
    maxElapsedTime = max(maxElapsedTime, getnext(intermValsIter));
  end
  add(outKVStore, "MaxElapsedTime", maxElapsedTime);
end

Если картопостроитель и функции редуктора записаны и сохранены в вашей текущей папке, можно вызвать mapreduce использование datastore, функция картопостроителя и функция редуктора. Если у вас будет Parallel Computing Toolbox (PCT), MATLAB автоматически запустит пул и параллелизирует выполнение. Используйте readall функция, чтобы отобразить результаты алгоритма MapReduce.

result = mapreduce(ds, @maxTimeMapper, @maxTimeReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce 100%
readall(result)
ans=1×2 table
           Key             Value  
    __________________    ________

    {'MaxElapsedTime'}    {[1650]}

Использование ключей в mapreduce

Использование ключей является важной и мощной функцией mapreduce. Каждый вызов функции картопостроителя добавляет промежуточные результаты в один или несколько названного "блоками", названными ключами. Количество вызовов картопостроителя функционирует mapreduce соответствует количеству блоков в datastore.

Если функция картопостроителя добавляет значения в несколько ключей, это приводит ко множественным вызовам функции редуктора с каждым вызовом, работающим над промежуточными значениями только одного ключа. mapreduce функция автоматически справляется с этим перемещением данных между картой, и уменьшайте фазы алгоритма.

Эта гибкость полезна во многих контекстах. Пример ниже использования вводит относительно очевидный путь к иллюстративным целям.

Вычисление мудрых группой метрик с mapreduce

Поведение функции картопостроителя в этом приложении является более комплексным. Для каждого поставщика услуг рейса, найденного во входных данных, используйте add функция, чтобы добавить вектор значений. Этот вектор является количеством количества рейсов для того поставщика услуг в каждый день в 21 + годы данных. Код поставщика услуг является ключом для этого вектора значений. Это гарантирует, что все данные для каждого поставщика услуг будут группироваться когда mapreduce передачи это к функции редуктора.

Сохраните следующую функцию картопостроителя (countFlightsMapper.m) в вашей текущей папке.

function countFlightsMapper(data, ~, intermKVStore)
  dayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1;
  daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;
  [airlineName, ~, airlineIndex] = unique(data.UniqueCarrier, 'stable');

  for i = 1:numel(airlineName)
    dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]);
    add(intermKVStore, airlineName{i}, dayTotals);
  end
end

Функция редуктора является менее комплексной. Это просто выполняет итерации по промежуточным значениям и добавляет векторы вместе. При завершении это выводит значения в этом совокупном векторе. Обратите внимание на то, что функция редуктора не должна отсортировать или исследовать intermediateKeysIn значения; каждый вызов редуктора функционирует mapreduce только передает значения для одного поставщика услуг авиакомпании.

Сохраните следующую функцию редуктора (countFlightsReducer.m) в вашей текущей папке.

function countFlightsReducer(intermKeysIn, intermValsIter, outKVStore)
  daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;
  dayArray = zeros(daysSinceEpoch, 1);

  while hasnext(intermValsIter)
    dayArray = dayArray + getnext(intermValsIter);
  end
  add(outKVStore, intermKeysIn, dayArray);
end

Сбросьте datastore и выберите переменные интереса. Если картопостроитель и функции редуктора записаны и сохранены в вашей текущей папке, можно вызвать mapreduce использование datastore, функция картопостроителя и функция редуктора.

reset(ds);
ds.SelectedVariableNames = {'Year', 'Month', 'DayofMonth', 'UniqueCarrier'};
result = mapreduce(ds, @countFlightsMapper, @countFlightsReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce  10%
Map 100% Reduce  21%
Map 100% Reduce  31%
Map 100% Reduce  41%
Map 100% Reduce  52%
Map 100% Reduce  62%
Map 100% Reduce  72%
Map 100% Reduce  83%
Map 100% Reduce  93%
Map 100% Reduce 100%
result = readall(result);

В случае, если этот пример был запущен только с набором выборочных данных, загрузите результаты mapreduce алгоритм работает на целом наборе данных.

load airlineResults

Визуализация результатов

Только Используя лучшие 7 поставщиков услуг, примените фильтр к данным, чтобы сгладить эффекты перемещения выходных дней. Это в противном случае создало бы помехи визуализации.

lines = result.Value;
lines = horzcat(lines{:});
[~,sortOrder] = sort(sum(lines), 'descend');
lines = lines(:,sortOrder(1:7));
result = result(sortOrder(1:7),:);

lines(lines==0) = nan;
for carrier=1:size(lines,2)
    lines(:,carrier) = filter(repmat(1/7, [7 1]), 1, lines(:,carrier));
end

Отобразите данные на графике.

figure('Position',[1 1 800 600]);
plot(datetime(1987,10,1):caldays(1):datetime(2008,12,31),lines)
title ('Domestic airline flights per day per carrier')
xlabel('Date')
ylabel('Flights per day (7-day moving average)')
legend(result.Key, 'Location', 'South')

График показывает появление Southwest Airlines (WN) в это время период.

Узнавание больше

Этот пример только царапает поверхность того, что возможно с mapreduce. См. документацию для mapreduce для получения дополнительной информации, включая информацию об использовании его с Hadoop и MATLAB® Parallel Server™.

Локальные функции

Перечисленный здесь локальные функции что mapreduce применяется к данным.

function maxTimeMapper(data, ~, intermKVStore)
  maxElapsedTime = max(data{:,:});
  add(intermKVStore, "MaxElapsedTime", maxElapsedTime)
end
%-----------------------------------------------------------------------
function maxTimeReducer(~, intermValsIter, outKVStore)
  maxElapsedTime = -Inf;
  while(hasnext(intermValsIter))
    maxElapsedTime = max(maxElapsedTime, getnext(intermValsIter));
  end
  add(outKVStore, "MaxElapsedTime", maxElapsedTime);
end
%-----------------------------------------------------------------------
function countFlightsMapper(data, ~, intermKVStore)
  dayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1;
  daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;
  [airlineName, ~, airlineIndex] = unique(data.UniqueCarrier, 'stable');

  for i = 1:numel(airlineName)
    dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]);
    add(intermKVStore, airlineName{i}, dayTotals);
  end
end
%-----------------------------------------------------------------------
function countFlightsReducer(intermKeysIn, intermValsIter, outKVStore)
  daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;
  dayArray = zeros(daysSinceEpoch, 1);

  while hasnext(intermValsIter)
    dayArray = dayArray + getnext(intermValsIter);
  end
  add(outKVStore, intermKeysIn, dayArray);
end
%-----------------------------------------------------------------------

Смотрите также

|

Похожие темы