В этом примере показано, как использовать mapreduce
функция для обработки большого объема файловых данных. Алгоритм MapReduce является основой многих современных приложений «больших данных». Этот пример работает на одном компьютере, но код может масштабироваться до использования Hadoop ®.
На протяжении всего этого примера набор данных представляет собой набор записей Американской статистической ассоциации для рейсов внутренних авиакомпаний США в период с 1987 по 2008 год. Если вы экспериментировали с «большими данными» раньше, вы, возможно, уже знакомы с этим набором данных. Небольшая подмножество этого набора данных включена в MATLAB ®, чтобы позволить вам запустить этот и другие примеры.
Создание datastore позволяет вам получить доступ к набору данных на основе блоков. Datastore может обрабатывать произвольно большие объемы данных, и данные могут даже быть распределены по нескольким файлам. Можно создать datastore для многих типов файлов, включая набор табличных текстовых файлов (демонстрируется здесь), базу данных SQL (Toolbox™ базы данных) или распределенную файловую систему Hadoop ® (HDFS™).
Создайте datastore для набора табличных текстовых файлов и предворительно просмотрите содержимое.
ds = tabularTextDatastore('airlinesmall.csv');
dsPreview = preview(ds);
dsPreview(:,10:15)
ans=8×6 table
FlightNum TailNum ActualElapsedTime CRSElapsedTime AirTime ArrDelay
_________ _______ _________________ ______________ _______ ________
1503 {'NA'} 53 57 {'NA'} 8
1550 {'NA'} 63 56 {'NA'} 8
1589 {'NA'} 83 82 {'NA'} 21
1655 {'NA'} 59 58 {'NA'} 13
1702 {'NA'} 77 72 {'NA'} 4
1729 {'NA'} 61 65 {'NA'} 59
1763 {'NA'} 84 79 {'NA'} 3
1800 {'NA'} 155 143 {'NA'} 11
Datastore автоматически анализирует входные данные и делает лучшее предположение относительно типа данных в каждом столбце. В этом случае используйте 'TreatAsMissing'
аргумент пары "имя-значение" для правильной замены отсутствующих значений. Для числовых переменных (таких как 'AirTime'
), tabularTextDatastore
заменяет каждый образец 'NA'
с NaN
значение, которое является арифметическим представлением IEEE для не число.
ds = tabularTextDatastore('airlinesmall.csv', 'TreatAsMissing', 'NA'); ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'TailNum')} = '%s'; ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'CancellationCode')} = '%s'; dsPreview = preview(ds); dsPreview(:,{'AirTime','TaxiIn','TailNum','CancellationCode'})
ans=8×4 table
AirTime TaxiIn TailNum CancellationCode
_______ ______ _______ ________________
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
Объекты Datastore содержат внутренний указатель, чтобы отслеживать, какой блок данных read
функция возвращается следующим. Используйте hasdata
и read
функций, чтобы пройти через весь набор данных и фильтровать набор данных только к строкам , представляющим интерес. В этом случае строками , представляющими интерес являются рейсы на United Airlines («UA»), вылетающие из Бостона («BOS»).
subset = []; while hasdata(ds) t = read(ds); t = t(strcmp(t.UniqueCarrier, 'UA') & strcmp(t.Origin, 'BOS'), :); subset = vertcat(subset, t); end subset(1:10,[9,10,15:17])
ans=10×5 table
UniqueCarrier FlightNum ArrDelay DepDelay Origin
_____________ _________ ________ ________ _______
{'UA'} 121 -9 0 {'BOS'}
{'UA'} 1021 -9 -1 {'BOS'}
{'UA'} 519 15 8 {'BOS'}
{'UA'} 354 9 8 {'BOS'}
{'UA'} 701 -17 0 {'BOS'}
{'UA'} 673 -9 -1 {'BOS'}
{'UA'} 91 -3 2 {'BOS'}
{'UA'} 335 18 4 {'BOS'}
{'UA'} 1429 1 -2 {'BOS'}
{'UA'} 53 52 13 {'BOS'}
mapreduce
MapReduce является алгоритмическим методом, чтобы «разделить и завоевать» проблемы с большими данными. В MATLAB, mapreduce
требует трех входных параметров:
Datastore, для чтения данных из
Функция «mapper», которой задается подмножество данных для работы. Выходы функции map являются частичным вычислением. mapreduce
вызывает функцию mapper один раз для каждого блока в datastore, при этом каждый вызов работает независимо.
Функция «reducer», которая задается совокупными выходами от функции mapper. Функция редуктора завершает расчет, начатую функцией mapper, и выводит окончательный ответ.
Это в некоторой степени избыточное упрощение, поскольку выход вызова функции mapper может быть перемещен и объединен интересными способами перед передачей в функцию reducer. Это будет рассмотрено позже в этом примере.
mapreduce
для выполнения расчетовПростое использование mapreduce
- найти самое длинное время рейса во всем наборе данных авиакомпании. Для этого:
Функция «mapper» вычисляет максимум каждого блока из datastore.
Функция «reducer» затем вычисляет максимальное значение среди всех максимумов, вычисленных вызовами функции mapper.
Сначала сбросьте datastore и фильтруйте переменные в один интересующий столбец.
reset(ds);
ds.SelectedVariableNames = {'ActualElapsedTime'};
Напишите функцию mapper, maxTimeMapper.m
. Для этого требуется три входных параметров:
Входные данные, которая является таблицей, полученной путем применения read
функцию в datastore.
Набор строения и контекстной информации, info
. Это может быть проигнорировано в большинстве случаев, как и здесь.
Промежуточный объект хранения данных, который записывает результаты вычислений из функции mapper. Используйте add
функция для добавления пар Ключ/Значение к этому промежуточному выходу. В этом примере имя ключа ('MaxElapsedTime'
) произвольно.
Сохраните следующую функцию mapper (maxTimeMapper.m
) в текущей папке.
function maxTimeMapper(data, ~, intermKVStore) maxElapsedTime = max(data{:,:}); add(intermKVStore, "MaxElapsedTime", maxElapsedTime) end
Далее запишите функцию редуктора. Это также принимает три входных параметров:
Набор входа «ключей». Ключи будут обсуждаться далее ниже, но их можно игнорировать в некоторых простых задачах, так как они здесь.
Объект входа промежуточных данных, который mapreduce
переходит к функции редуктора. Эти данные находятся в форме пар Ключ/Значение, и вы используете hasnext
и getnext
функции для итерации значений для каждой клавиши.
Конечный выходной объект хранения данных. Используйте add
и addmulti
функции для непосредственного добавления пар Ключ/Значение к выходу.
Сохраните следующую функцию редуктора (maxTimeReducer.m
) в текущей папке.
function maxTimeReducer(~, intermValsIter, outKVStore) maxElapsedTime = -Inf; while(hasnext(intermValsIter)) maxElapsedTime = max(maxElapsedTime, getnext(intermValsIter)); end add(outKVStore, "MaxElapsedTime", maxElapsedTime); end
Если функции mapper и reducer записаны и сохранены в текущей папке, можно вызвать mapreduce
использование datastore, функции mapper и функции редуктора. Если у вас есть Parallel Computing Toolbox (PCT), MATLAB автоматически запустит пул и распараллелит выполнение. Используйте readall
функция для отображения результатов алгоритма MapReduce.
result = mapreduce(ds, @maxTimeMapper, @maxTimeReducer);
******************************** * MAPREDUCE PROGRESS * ******************************** Map 0% Reduce 0% Map 16% Reduce 0% Map 32% Reduce 0% Map 48% Reduce 0% Map 65% Reduce 0% Map 81% Reduce 0% Map 97% Reduce 0% Map 100% Reduce 0% Map 100% Reduce 100%
readall(result)
ans=1×2 table
Key Value
__________________ ________
{'MaxElapsedTime'} {[1650]}
mapreduce
Использование ключей является важной и мощной функцией mapreduce
. Каждый вызов функции mapper добавляет промежуточные результаты в один или несколько именованных «блоков», называемых ключами. Количество вызовов функции mapper по mapreduce
соответствует количеству блоков в datastore.
Если функция mapper добавляет значения нескольким клавишам, это приводит к нескольким вызовам функции редуктора, при этом каждый вызов работает только с промежуточными значениями одной клавиши. The mapreduce
функция автоматически управляет этим перемещением данных между картой и уменьшением фаз алгоритма.
Эта гибкость полезна во многих контекстах. Приведенный ниже пример использует ключи относительно очевидным способом в иллюстративных целях.
mapreduce
Поведение функции mapper в этом приложении сложнее. Для каждого летного аппарата, найденного в входных данных, используйте add
функция для добавления вектора значений. Этот вектор является количеством рейсов для этого перевозчика в каждый день за 21 + год данных. Код поставщика услуг является ключом для этого вектора значений. Это гарантирует, что все данные для каждой системы связи будут сгруппированы вместе при mapreduce
передает его в функцию редуктора.
Сохраните следующую функцию mapper (countFlightsMapper.m
) в текущей папке.
function countFlightsMapper(data, ~, intermKVStore) dayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1; daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1; [airlineName, ~, airlineIndex] = unique(data.UniqueCarrier, 'stable'); for i = 1:numel(airlineName) dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]); add(intermKVStore, airlineName{i}, dayTotals); end end
Функция редуктора менее комплексна. Это просто итерация над промежуточными значениями и добавление векторов вместе. По завершении он выводит значения в этом совокупном векторе. Обратите внимание, что функции редуктора не нужно сортировать или исследовать intermediateKeysIn
значения; каждый вызов функции редуктора по mapreduce
передает значения только для одного авиаперевозчика.
Сохраните следующую функцию редуктора (countFlightsReducer.m
) в текущей папке.
function countFlightsReducer(intermKeysIn, intermValsIter, outKVStore) daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1; dayArray = zeros(daysSinceEpoch, 1); while hasnext(intermValsIter) dayArray = dayArray + getnext(intermValsIter); end add(outKVStore, intermKeysIn, dayArray); end
Сбросьте datastore и выберите интересующие вас переменные. Если функции mapper и reducer записаны и сохранены в текущей папке, можно вызвать mapreduce
использование datastore, функции mapper и функции редуктора.
reset(ds); ds.SelectedVariableNames = {'Year', 'Month', 'DayofMonth', 'UniqueCarrier'}; result = mapreduce(ds, @countFlightsMapper, @countFlightsReducer);
******************************** * MAPREDUCE PROGRESS * ******************************** Map 0% Reduce 0% Map 16% Reduce 0% Map 32% Reduce 0% Map 48% Reduce 0% Map 65% Reduce 0% Map 81% Reduce 0% Map 97% Reduce 0% Map 100% Reduce 0% Map 100% Reduce 10% Map 100% Reduce 21% Map 100% Reduce 31% Map 100% Reduce 41% Map 100% Reduce 52% Map 100% Reduce 62% Map 100% Reduce 72% Map 100% Reduce 83% Map 100% Reduce 93% Map 100% Reduce 100%
result = readall(result);
В случае, если этот пример был запущен только с набором выборочных данных, загрузите результаты mapreduce
алгоритм запускается на целом наборе данных.
load airlineResults
Используя только 7 лучших перевозчиков, сглаживайте данные, чтобы удалить эффекты путешествий выходного дня. Это в противном случае загромождает визуализацию.
lines = result.Value; lines = horzcat(lines{:}); [~,sortOrder] = sort(sum(lines), 'descend'); lines = lines(:,sortOrder(1:7)); result = result(sortOrder(1:7),:); lines(lines==0) = nan; lines = smoothdata(lines,'gaussian');
Постройте график данных.
figure('Position',[1 1 800 600]); plot(datetime(1987,10,1):caldays(1):datetime(2008,12,31),lines,'LineWidth',2) title ('Domestic airline flights per day per carrier') xlabel('Date') ylabel('Flights per day') legend(result.Key, 'Location', 'Best')
График показывает появление Southwest Airlines (WN) в течение этого временного периода.
Этот пример только царапает поверхность того, что возможно с mapreduce
. См. документацию для mapreduce
для получения дополнительной информации, включая информацию об использовании с Hadoop и MATLAB ® Parallel Server™.
Здесь перечислены локальные функции, которые mapreduce
применяется к данным.
function maxTimeMapper(data, ~, intermKVStore) maxElapsedTime = max(data{:,:}); add(intermKVStore, "MaxElapsedTime", maxElapsedTime) end %----------------------------------------------------------------------- function maxTimeReducer(~, intermValsIter, outKVStore) maxElapsedTime = -Inf; while(hasnext(intermValsIter)) maxElapsedTime = max(maxElapsedTime, getnext(intermValsIter)); end add(outKVStore, "MaxElapsedTime", maxElapsedTime); end %----------------------------------------------------------------------- function countFlightsMapper(data, ~, intermKVStore) dayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1; daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1; [airlineName, ~, airlineIndex] = unique(data.UniqueCarrier, 'stable'); for i = 1:numel(airlineName) dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]); add(intermKVStore, airlineName{i}, dayTotals); end end %----------------------------------------------------------------------- function countFlightsReducer(intermKeysIn, intermValsIter, outKVStore) daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1; dayArray = zeros(daysSinceEpoch, 1); while hasnext(intermValsIter) dayArray = dayArray + getnext(intermValsIter); end add(outKVStore, intermKeysIn, dayArray); end %-----------------------------------------------------------------------
mapreduce
| tabularTextDatastore