Анализ больших данных в MATLAB Используя MapReduce

Этот пример показывает, как использовать datastore и функции mapreduce, чтобы обработать большую сумму основанных на файле данных. Алгоритм MapReduce является оплотом многих современных "больших данных" приложения. Этот пример работает с одиночным компьютером, но код может масштабировать, чтобы использовать Hadoop®.

Всюду по этому примеру набор данных является набором записей для США внутренние полеты между 1 987 и 2008. Если вы экспериментировали с "большими данными" прежде, можно уже быть знакомы с этим набором данных. Полный набор данных может быть загружен с http://stat-computing.org/dataexpo/2009/the-data.html. Небольшое подмножество набора данных также включено с MATLAB®, чтобы позволить вам запускать это и другие примеры, не загружая целый набор данных.

Введение в datastore

Создание datastore позволяет вам получать доступ к набору данных основанным на фрагменте способом. datastore может обработать произвольно большие объемы данных, и данные могут даже быть распространены через несколько файлов. Можно создать datastore для многих типов файлов, включая набор табличных текстовых файлов (продемонстрированный здесь), база данных SQL (База данных требуемый Toolbox™) или Hadoop® Distributed File System (HDFS™).

Создайте datastore для набора табличных текстовых файлов и предварительно просмотрите содержимое.

ds = datastore('airlinesmall.csv');
dsPreview = preview(ds);
dsPreview(:,10:15)
ans=8×6 table
    FlightNum    TailNum    ActualElapsedTime    CRSElapsedTime    AirTime    ArrDelay
    _________    _______    _________________    ______________    _______    ________

      1503        'NA'              53                 57           'NA'          8   
      1550        'NA'              63                 56           'NA'          8   
      1589        'NA'              83                 82           'NA'         21   
      1655        'NA'              59                 58           'NA'         13   
      1702        'NA'              77                 72           'NA'          4   
      1729        'NA'              61                 65           'NA'         59   
      1763        'NA'              84                 79           'NA'          3   
      1800        'NA'             155                143           'NA'         11   

Datastore автоматически анализирует входные данные и высказывает лучшее предположение относительно типа данных в каждом столбце. В этом случае используйте Аргумент в виде пар "имя-значение" 'TreatAsMissing', чтобы иметь замену datastore отсутствующие значения правильно. Для числовых переменных (таких как 'AirTime'), datastore заменяет каждый экземпляр 'NA' со значением NaN, которое является представлением арифметики IEEE для Не числа.

ds = datastore('airlinesmall.csv', 'TreatAsMissing', 'NA');
ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'TailNum')} = '%s';
ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'CancellationCode')} = '%s';
dsPreview = preview(ds);
dsPreview(:,{'AirTime','TaxiIn','TailNum','CancellationCode'})
ans=8×4 table
    AirTime    TaxiIn    TailNum    CancellationCode
    _______    ______    _______    ________________

      NaN       NaN       'NA'            'NA'      
      NaN       NaN       'NA'            'NA'      
      NaN       NaN       'NA'            'NA'      
      NaN       NaN       'NA'            'NA'      
      NaN       NaN       'NA'            'NA'      
      NaN       NaN       'NA'            'NA'      
      NaN       NaN       'NA'            'NA'      
      NaN       NaN       'NA'            'NA'      

Сканирование для строк, представляющих интерес

Объекты datastore содержат внутренний указатель, чтобы отслеживать, из которого фрагмента данных функция read возвращается затем. Используйте hasdata и функции read, чтобы продвинуться через целый набор данных и отфильтровать набор данных к только строкам, представляющим интерес. В этом случае строки, представляющие интерес являются полетами на United Airlines ("UA"), вылетающий от Бостона ("BOS").

subset = [];

while hasdata(ds)
    t = read(ds);
    t = t(strcmp(t.UniqueCarrier, 'UA') & strcmp(t.Origin, 'BOS'), :);
    subset = vertcat(subset, t);
end

subset(1:10,[9,10,15:17])
ans=10×5 table
    UniqueCarrier    FlightNum    ArrDelay    DepDelay    Origin
    _____________    _________    ________    ________    ______

        'UA'            121          -9           0       'BOS' 
        'UA'           1021          -9          -1       'BOS' 
        'UA'            519          15           8       'BOS' 
        'UA'            354           9           8       'BOS' 
        'UA'            701         -17           0       'BOS' 
        'UA'            673          -9          -1       'BOS' 
        'UA'             91          -3           2       'BOS' 
        'UA'            335          18           4       'BOS' 
        'UA'           1429           1          -2       'BOS' 
        'UA'             53          52          13       'BOS' 

Введение в mapreduce

MapReduce является алгоритмическим методом, чтобы "разделить и завоевать" большие проблемы данных. В MATLAB mapreduce требует трех входных параметров:

  1. datastore, чтобы считать данные из

  2. Функция "картопостроителя", которой дают подмножество данных, чтобы работать с. Вывод функции карты является частичным вычислением. mapreduce вызывает функцию картопостроителя одно время для каждого фрагмента в datastore с каждым вызовом, действующим независимо.

  3. Функция "редуктора", которой дают совокупные выходные параметры от функции картопостроителя. Функция редуктора заканчивает вычисление, начатое функцией картопостроителя, и выводит окончательный ответ.

Это - упрощение в некоторой степени, поскольку вывод вызова функции картопостроителя может быть переставлен и объединен интересными способами прежде чем быть переданным функции редуктора. Это будет исследовано позже в этом примере.

Использование mapreduce, чтобы выполнить вычисление

Простое использование mapreduce должно найти самое долгое время полета в целом наборе данных авиакомпании. Для этого:

  1. Функция "картопостроителя" вычисляет максимум каждого фрагмента от datastore.

  2. Функция "редуктора" затем вычисляет максимальное значение среди всех максимумов, вычисленных вызовами функции картопостроителя.

Во-первых, сбросьте datastore и отфильтруйте переменные к одному столбцу интереса.

reset(ds);
ds.SelectedVariableNames = {'ActualElapsedTime'};

Запишите функцию картопостроителя, maxTimeMapper.m m. Требуется три входных параметра:

  1. Входные данные, который является таблицей, полученной путем применения функции read к datastore.

  2. Набор настройки и контекстной информации, info. Это может быть проигнорировано в большинстве случаев, как это здесь.

  3. Промежуточный объект хранения данных, который записывает результаты вычислений от функции картопостроителя. Используйте функцию add, чтобы добавить Пары ключ/значение к этому промежуточному выводу. В этом примере имя ключа ('MaxElapsedTime') произвольно.

Сохраните следующую функцию картопостроителя (maxTimeMapper.m) в вашей текущей папке.

type maxTimeMapper
function maxTimeMapper(data, ~, intermKVStore)
% Copyright 2014 The MathWorks, Inc.

maxElaspedTime = max(data{:,:});
add(intermKVStore, 'MaxElaspedTime',maxElaspedTime);
end

Затем, запишите функцию редуктора. Также требуется три входных параметра:

  1. Набор входного параметра "ключи". Ключи будут обсуждены далее ниже, но они могут быть проигнорированы в некоторых простых проблемах, как они здесь.

  2. Промежуточный ввод данных возражает, что mapreduce передает функции редуктора. Это, которое данные в форме Пар ключ/значение, и вы используете hasnext и функции getnext, чтобы выполнить итерации через значения для каждого ключа.

  3. Объект хранения данных окончательного результата. Используйте add и функции addmulti, чтобы непосредственно добавить Пары ключ/значение к выводу.

Сохраните следующую функцию редуктора (maxTimeReducer.m) в вашей текущей папке.

type maxTimeReducer
function maxTimeReducer(~, intermValsIter, outKVStore)
% Copyright 2014 The MathWorks, Inc.

maxElaspedTime = -inf;
while hasnext(intermValsIter)
    maxElaspedTime = max(maxElaspedTime, getnext(intermValsIter));
end
add(outKVStore, 'MaxElaspedTime', maxElaspedTime);
end

Если картопостроитель и функции редуктора записаны и сохранены в вашей текущей папке, можно вызвать mapreduce с помощью datastore, функции картопостроителя и функции редуктора. Если у вас будет Parallel Computing Toolbox (PCT), MATLAB автоматически запустит пул и параллелизирует выполнение. Используйте функцию readall, чтобы отобразить результаты алгоритма MapReduce.

result = mapreduce(ds, @maxTimeMapper, @maxTimeReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce 100%
readall(result)
ans=1×2 table
          Key           Value 
    ________________    ______

    'MaxElaspedTime'    [1650]

Использование ключей в mapreduce

Использование ключей является важной и мощной функцией mapreduce. Каждый вызов функции картопостроителя добавляет промежуточные результаты к одному или нескольким названному "блоками", названными ключами. Количество вызовов функции картопостроителя mapreduce соответствует количеству фрагментов в datastore.

Если функция картопостроителя добавляет значения к нескольким ключам, это приводит ко множественным вызовам функции редуктора с каждым вызовом, работающим на промежуточных значениях только одного ключа. Функция mapreduce автоматически справляется с этим перемещением данных между картой, и уменьшите фазы алгоритма.

Эта гибкость полезна во многих контекстах. Пример ниже использования вводит относительно очевидный путь к иллюстративным целям.

Вычисление мудрых группой метрик с mapreduce

Поведение функции картопостроителя в этом приложении является более комплексным. Для каждого поставщика услуг полета, найденного во входных данных, используйте функцию add, чтобы добавить вектор значений. Этот вектор является количеством количества полетов для того поставщика услуг в каждый день в 21 + годы данных. Код поставщика услуг является ключом для этого вектора значений. Это гарантирует, что все данные для каждого поставщика услуг будут группироваться, когда mapreduce передаст его функции редуктора.

Сохраните следующую функцию картопостроителя (countFlightsMapper.m) в вашей текущей папке.

type countFlightsMapper
function countFlightsMapper(data, ~, intermKVStore)

% Copyright 2014 The MathWorks, Inc.

dayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1;
daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;

[airlineName, ~, airlineIndex] = unique(data.UniqueCarrier, 'stable');

for i = 1:numel(airlineName)
    dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]);
    add(intermKVStore, airlineName{i}, dayTotals);
end
end

Функция редуктора является менее комплексной. Это просто выполняет итерации по промежуточным значениям и добавляет векторы вместе. При завершении это выводит значения в этом совокупном векторе. Обратите внимание на то, что функция редуктора не должна отсортировать или исследовать значения intermediateKeysIn; каждый вызов функции редуктора mapreduce только передает значения для одного поставщика услуг авиакомпании.

Сохраните следующую функцию редуктора (countFlightsReducer.m) в вашей текущей папке.

type countFlightsReducer
function countFlightsReducer(intermKeysIn, intermValsIter, outKVStore)
%countFlightsReducer Reducer function for mapreduce to count flights

% Copyright 2014 The MathWorks, Inc.

daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;
dayArray = zeros(daysSinceEpoch, 1);

while hasnext(intermValsIter)
    dayArray = dayArray + getnext(intermValsIter);
end
add(outKVStore, intermKeysIn, dayArray);
end

Сбросьте datastore и выберите переменные интереса. Если картопостроитель и функции редуктора записаны и сохранены в вашей текущей папке, можно вызвать mapreduce с помощью datastore, функции картопостроителя и функции редуктора.

reset(ds);
ds.SelectedVariableNames = {'Year', 'Month', 'DayofMonth', 'UniqueCarrier'};

result = mapreduce(ds, @countFlightsMapper, @countFlightsReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce  10%
Map 100% Reduce  21%
Map 100% Reduce  31%
Map 100% Reduce  41%
Map 100% Reduce  52%
Map 100% Reduce  62%
Map 100% Reduce  72%
Map 100% Reduce  83%
Map 100% Reduce  93%
Map 100% Reduce 100%
result = readall(result);

В случае, если этот пример был запущен с только демонстрационным набором данных, загрузка, результаты алгоритма mapreduce работают на целом наборе данных.

load airlineResults

Визуализация результатов

Используя только лучшие 7 поставщиков услуг, примените фильтр к данным, чтобы сгладить эффекты перемещения выходных дней. Это в противном случае создало бы помехи визуализации.

lines = result.Value;
lines = horzcat(lines{:});
[~,sortOrder] = sort(sum(lines), 'descend');
lines = lines(:,sortOrder(1:7));
result = result(sortOrder(1:7),:);

lines(lines==0) = nan;
for carrier=1:size(lines,2)
    lines(:,carrier) = filter(repmat(1/7, [7 1]), 1, lines(:,carrier));
end

Отобразите данные на графике.

figure('Position',[1 1 800 600]);
plot(datetime(1987,10,1):caldays(1):datetime(2008,12,31),lines)
title ('Domestic airline flights per day per carrier')
xlabel('Date')
ylabel('Flights per day (7-day moving average)')
legend(result.Key, 'Location', 'South')

График показывает появление Southwest Airlines (WN) в это время период.

Узнавание больше

Этот пример только царапает поверхность того, что возможно с mapreduce. См. документацию для mapreduce для получения дополнительной информации, включая информацию об использовании его с Hadoop и MATLAB Distributed Computing Server.

Смотрите также

|

Похожие темы

Была ли эта тема полезной?