В этом примере показано, как использовать mapreduce
функционируйте, чтобы обработать большую сумму основанных на файле данных. Алгоритм MapReduce является оплотом многих современных "больших данных" приложения. Этот пример работает с одиночным компьютером, но код может масштабировать, чтобы использовать Hadoop®.
В этом примере набор данных является набором записей от американской Статистической Ассоциации для США внутренние полеты между 1 987 и 2008. Если вы экспериментировали с "большими данными" прежде, можно уже быть знакомы с этим набором данных. Небольшое подмножество этого набора данных включено с MATLAB®, чтобы позволить вам запускать это и другие примеры.
Создание datastore позволяет вам получать доступ к набору данных основанным на блоке способом. Datastore может обработать произвольно большие объемы данных, и данные могут даже быть распространены через несколько файлов. Можно создать datastore для многих типов файлов, включая набор табличных текстовых файлов (продемонстрированный здесь), база данных SQL (требуемый Database Toolbox™) или Hadoop® Distributed File System (HDFS™).
Создайте datastore для набора табличных текстовых файлов и предварительно просмотрите содержимое.
ds = tabularTextDatastore('airlinesmall.csv');
dsPreview = preview(ds);
dsPreview(:,10:15)
ans=8×6 table
FlightNum TailNum ActualElapsedTime CRSElapsedTime AirTime ArrDelay
_________ _______ _________________ ______________ _______ ________
1503 {'NA'} 53 57 {'NA'} 8
1550 {'NA'} 63 56 {'NA'} 8
1589 {'NA'} 83 82 {'NA'} 21
1655 {'NA'} 59 58 {'NA'} 13
1702 {'NA'} 77 72 {'NA'} 4
1729 {'NA'} 61 65 {'NA'} 59
1763 {'NA'} 84 79 {'NA'} 3
1800 {'NA'} 155 143 {'NA'} 11
Datastore автоматически анализирует входные данные и высказывает лучшее предположение относительно типа данных в каждом столбце. В этом случае используйте 'TreatAsMissing'
аргумент пары "имя-значение", чтобы заменить отсутствующие значения правильно. Для числовых переменных (таких как 'AirTime'
TabularTextDatastore
заменяет каждый экземпляр 'NA'
с NaN
значение, которое является представлением арифметики IEEE для Не числа.
ds = tabularTextDatastore('airlinesmall.csv', 'TreatAsMissing', 'NA'); ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'TailNum')} = '%s'; ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'CancellationCode')} = '%s'; dsPreview = preview(ds); dsPreview(:,{'AirTime','TaxiIn','TailNum','CancellationCode'})
ans=8×4 table
AirTime TaxiIn TailNum CancellationCode
_______ ______ _______ ________________
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
Объекты datastore содержат внутренний указатель, чтобы отслеживать который блок данных read
функция возвращается затем. Используйте hasdata
и read
функции, чтобы продвинуться через целый набор данных и отфильтровать набор данных только к строкам, представляющим интерес. В этом случае строки, представляющие интерес являются рейсами на United Airlines ("UA"), вылетающий от Бостона ("BOS").
subset = []; while hasdata(ds) t = read(ds); t = t(strcmp(t.UniqueCarrier, 'UA') & strcmp(t.Origin, 'BOS'), :); subset = vertcat(subset, t); end subset(1:10,[9,10,15:17])
ans=10×5 table
UniqueCarrier FlightNum ArrDelay DepDelay Origin
_____________ _________ ________ ________ _______
{'UA'} 121 -9 0 {'BOS'}
{'UA'} 1021 -9 -1 {'BOS'}
{'UA'} 519 15 8 {'BOS'}
{'UA'} 354 9 8 {'BOS'}
{'UA'} 701 -17 0 {'BOS'}
{'UA'} 673 -9 -1 {'BOS'}
{'UA'} 91 -3 2 {'BOS'}
{'UA'} 335 18 4 {'BOS'}
{'UA'} 1429 1 -2 {'BOS'}
{'UA'} 53 52 13 {'BOS'}
Введение в mapreduce
MapReduce является алгоритмическим методом, чтобы "разделить и завоевать" большие проблемы данных. В MATLAB, mapreduce
требует трех входных параметров:
Datastore, чтобы считать данные из
Функция "картопостроителя", которой дают подмножество данных, чтобы работать с. Выход функции карты является частичным вычислением. mapreduce
вызывает функцию картопостроителя одно время для каждого блока в datastore, с каждым вызовом, действующим независимо.
Функция "редуктора", которой дают совокупные выходные параметры от функции картопостроителя. Функция редуктора закончила расчет, начатый функцией картопостроителя, и выводит окончательный ответ.
Это - упрощение в некоторой степени, поскольку выход вызова функции картопостроителя может быть переставлен и объединен интересными способами прежде чем быть переданным функции редуктора. Это будет исследовано позже в этом примере.
mapreduce
выполнять расчетПростое использование mapreduce
должен найти самое долгое время полета в целом наборе данных авиакомпании. Для этого:
Функция "картопостроителя" вычисляет максимум каждого блока из datastore.
Функция "редуктора" затем вычисляет максимальное значение среди всех максимумов, вычисленных вызовами функции картопостроителя.
Во-первых, сбросьте datastore и отфильтруйте переменные к одному столбцу интереса.
reset(ds);
ds.SelectedVariableNames = {'ActualElapsedTime'};
Запишите функцию картопостроителя, maxTimeMapper.m
. Требуется три входных параметра:
Входные данные, который является таблицей, полученной путем применения read
функционируйте к datastore.
Набор настройки и контекстной информации, info
. Это может быть проигнорировано в большинстве случаев, как это здесь.
Промежуточный объект хранения данных, который записывает результаты вычислений от функции картопостроителя. Используйте add
функция, чтобы добавить Пары ключ/значение в этот промежуточный выход. В этом примере, имени ключа ('MaxElapsedTime'
) произвольно.
Сохраните следующую функцию картопостроителя (maxTimeMapper.m
) в вашей текущей папке.
function maxTimeMapper(data, ~, intermKVStore) maxElapsedTime = max(data{:,:}); add(intermKVStore, "MaxElapsedTime", maxElapsedTime) end
Затем запишите функцию редуктора. Также требуется три входных параметра:
Набор входа "ключи". Ключи будут обсуждены далее ниже, но они могут быть проигнорированы в некоторых простых проблемах, как они здесь.
Промежуточный ввод данных возражает тому mapreduce
передачи в функцию редуктора. Эти данные в форме Пар ключ/значение, и вы используете hasnext
и getnext
функции, чтобы выполнить итерации через значения для каждого ключа.
Объект хранения данных окончательного результата. Используйте add
и addmulti
функции, чтобы непосредственно добавить Пары ключ/значение в выход.
Сохраните следующую функцию редуктора (maxTimeReducer.m
) в вашей текущей папке.
function maxTimeReducer(~, intermValsIter, outKVStore) maxElapsedTime = -Inf; while(hasnext(intermValsIter)) maxElapsedTime = max(maxElapsedTime, getnext(intermValsIter)); end add(outKVStore, "MaxElapsedTime", maxElapsedTime); end
Если картопостроитель и функции редуктора записаны и сохранены в вашей текущей папке, можно вызвать mapreduce
с помощью datastore, функции картопостроителя и функции редуктора. Если у вас будет Parallel Computing Toolbox (PCT), MATLAB автоматически запустит пул и параллелизирует выполнение. Используйте readall
функция, чтобы отобразить результаты алгоритма MapReduce.
result = mapreduce(ds, @maxTimeMapper, @maxTimeReducer);
******************************** * MAPREDUCE PROGRESS * ******************************** Map 0% Reduce 0% Map 16% Reduce 0% Map 32% Reduce 0% Map 48% Reduce 0% Map 65% Reduce 0% Map 81% Reduce 0% Map 97% Reduce 0% Map 100% Reduce 0% Map 100% Reduce 100%
readall(result)
ans=1×2 table
Key Value
__________________ ________
{'MaxElapsedTime'} {[1650]}
Использование ключей в mapreduce
Использование ключей является важной и мощной функцией mapreduce
. Каждый вызов функции картопостроителя добавляет промежуточные результаты в один или несколько названного "блоками", названными ключами. Количество вызовов картопостроителя функционирует mapreduce
соответствует количеству блоков в datastore.
Если функция картопостроителя добавляет значения в несколько ключей, это приводит ко множественным вызовам функции редуктора с каждым вызовом, работающим над промежуточными значениями только одного ключа. mapreduce
функция автоматически справляется с этим перемещением данных между картой, и уменьшайте фазы алгоритма.
Эта гибкость полезна во многих контекстах. Пример ниже использования вводит относительно очевидный путь к иллюстративным целям.
Вычисление мудрых группой метрик с mapreduce
Поведение функции картопостроителя в этом приложении является более комплексным. Для каждой несущей рейса, найденной во входных данных, используйте add
функция, чтобы добавить вектор из значений. Этот вектор является количеством количества рейсов для той несущей в каждый день в 21 + годы данных. Код несущей является ключом для этого вектора из значений. Это гарантирует, что все данные для каждой несущей будут группироваться когда mapreduce
передачи это к функции редуктора.
Сохраните следующую функцию картопостроителя (countFlightsMapper.m
) в вашей текущей папке.
function countFlightsMapper(data, ~, intermKVStore) dayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1; daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1; [airlineName, ~, airlineIndex] = unique(data.UniqueCarrier, 'stable'); for i = 1:numel(airlineName) dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]); add(intermKVStore, airlineName{i}, dayTotals); end end
Функция редуктора является менее комплексной. Это просто выполняет итерации по промежуточным значениям и добавляет векторы вместе. При завершении это выводит значения в этом совокупном векторе. Обратите внимание на то, что функция редуктора не должна отсортировать или исследовать intermediateKeysIn
значения; каждый вызов редуктора функционирует mapreduce
только передает значения для одной несущей авиакомпании.
Сохраните следующую функцию редуктора (countFlightsReducer.m
) в вашей текущей папке.
function countFlightsReducer(intermKeysIn, intermValsIter, outKVStore) daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1; dayArray = zeros(daysSinceEpoch, 1); while hasnext(intermValsIter) dayArray = dayArray + getnext(intermValsIter); end add(outKVStore, intermKeysIn, dayArray); end
Сбросьте datastore и выберите переменные интереса. Если картопостроитель и функции редуктора записаны и сохранены в вашей текущей папке, можно вызвать mapreduce
с помощью datastore, функции картопостроителя и функции редуктора.
reset(ds); ds.SelectedVariableNames = {'Year', 'Month', 'DayofMonth', 'UniqueCarrier'}; result = mapreduce(ds, @countFlightsMapper, @countFlightsReducer);
******************************** * MAPREDUCE PROGRESS * ******************************** Map 0% Reduce 0% Map 16% Reduce 0% Map 32% Reduce 0% Map 48% Reduce 0% Map 65% Reduce 0% Map 81% Reduce 0% Map 97% Reduce 0% Map 100% Reduce 0% Map 100% Reduce 10% Map 100% Reduce 21% Map 100% Reduce 31% Map 100% Reduce 41% Map 100% Reduce 52% Map 100% Reduce 62% Map 100% Reduce 72% Map 100% Reduce 83% Map 100% Reduce 93% Map 100% Reduce 100%
result = readall(result);
В случае, если этот пример был запущен только с набором выборочных данных, загрузите результаты mapreduce
алгоритм работает на целом наборе данных.
load airlineResults
Только Используя лучшие 7 несущих, сглаживайте данные, чтобы удалить эффекты перемещения выходных дней. Это в противном случае создало бы помехи визуализации.
lines = result.Value; lines = horzcat(lines{:}); [~,sortOrder] = sort(sum(lines), 'descend'); lines = lines(:,sortOrder(1:7)); result = result(sortOrder(1:7),:); lines(lines==0) = nan; lines = smoothdata(lines,'gaussian');
Отобразите данные на графике.
figure('Position',[1 1 800 600]); plot(datetime(1987,10,1):caldays(1):datetime(2008,12,31),lines,'LineWidth',2) title ('Domestic airline flights per day per carrier') xlabel('Date') ylabel('Flights per day') legend(result.Key, 'Location', 'Best')
График показывает появление Southwest Airlines (WN) в это время период.
Этот пример только царапает поверхность того, что возможно с mapreduce
. См. документацию для mapreduce
для получения дополнительной информации, включая информацию об использовании его с Hadoop и MATLAB® Parallel Server™.
Перечисленный здесь локальные функции что mapreduce
применяется к данным.
function maxTimeMapper(data, ~, intermKVStore) maxElapsedTime = max(data{:,:}); add(intermKVStore, "MaxElapsedTime", maxElapsedTime) end %----------------------------------------------------------------------- function maxTimeReducer(~, intermValsIter, outKVStore) maxElapsedTime = -Inf; while(hasnext(intermValsIter)) maxElapsedTime = max(maxElapsedTime, getnext(intermValsIter)); end add(outKVStore, "MaxElapsedTime", maxElapsedTime); end %----------------------------------------------------------------------- function countFlightsMapper(data, ~, intermKVStore) dayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1; daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1; [airlineName, ~, airlineIndex] = unique(data.UniqueCarrier, 'stable'); for i = 1:numel(airlineName) dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]); add(intermKVStore, airlineName{i}, dayTotals); end end %----------------------------------------------------------------------- function countFlightsReducer(intermKeysIn, intermValsIter, outKVStore) daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1; dayArray = zeros(daysSinceEpoch, 1); while hasnext(intermValsIter) dayArray = dayArray + getnext(intermValsIter); end add(outKVStore, intermKeysIn, dayArray); end %-----------------------------------------------------------------------
mapreduce
| tabularTextDatastore