Этот пример показывает, как использовать datastore
и функции mapreduce
, чтобы обработать большую сумму основанных на файле данных. Алгоритм MapReduce является оплотом многих современных "больших данных" приложения. Этот пример работает с одиночным компьютером, но код может масштабировать, чтобы использовать Hadoop®.
Всюду по этому примеру набор данных является набором записей для США внутренние полеты между 1 987 и 2008. Если вы экспериментировали с "большими данными" прежде, можно уже быть знакомы с этим набором данных. Полный набор данных может быть загружен с http://stat-computing.org/dataexpo/2009/the-data.html. Небольшое подмножество набора данных также включено с MATLAB®, чтобы позволить вам запускать это и другие примеры, не загружая целый набор данных.
Введение в datastore
Создание datastore
позволяет вам получать доступ к набору данных основанным на фрагменте способом. datastore
может обработать произвольно большие объемы данных, и данные могут даже быть распространены через несколько файлов. Можно создать datastore
для многих типов файлов, включая набор табличных текстовых файлов (продемонстрированный здесь), база данных SQL (База данных требуемый Toolbox™) или Hadoop® Distributed File System (HDFS™).
Создайте datastore для набора табличных текстовых файлов и предварительно просмотрите содержимое.
ds = datastore('airlinesmall.csv');
dsPreview = preview(ds);
dsPreview(:,10:15)
ans=8×6 table
FlightNum TailNum ActualElapsedTime CRSElapsedTime AirTime ArrDelay
_________ _______ _________________ ______________ _______ ________
1503 'NA' 53 57 'NA' 8
1550 'NA' 63 56 'NA' 8
1589 'NA' 83 82 'NA' 21
1655 'NA' 59 58 'NA' 13
1702 'NA' 77 72 'NA' 4
1729 'NA' 61 65 'NA' 59
1763 'NA' 84 79 'NA' 3
1800 'NA' 155 143 'NA' 11
Datastore автоматически анализирует входные данные и высказывает лучшее предположение относительно типа данных в каждом столбце. В этом случае используйте Аргумент в виде пар "имя-значение" 'TreatAsMissing'
, чтобы иметь замену datastore
отсутствующие значения правильно. Для числовых переменных (таких как 'AirTime'
), datastore
заменяет каждый экземпляр 'NA'
со значением NaN
, которое является представлением арифметики IEEE для Не числа.
ds = datastore('airlinesmall.csv', 'TreatAsMissing', 'NA'); ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'TailNum')} = '%s'; ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'CancellationCode')} = '%s'; dsPreview = preview(ds); dsPreview(:,{'AirTime','TaxiIn','TailNum','CancellationCode'})
ans=8×4 table
AirTime TaxiIn TailNum CancellationCode
_______ ______ _______ ________________
NaN NaN 'NA' 'NA'
NaN NaN 'NA' 'NA'
NaN NaN 'NA' 'NA'
NaN NaN 'NA' 'NA'
NaN NaN 'NA' 'NA'
NaN NaN 'NA' 'NA'
NaN NaN 'NA' 'NA'
NaN NaN 'NA' 'NA'
Объекты datastore
содержат внутренний указатель, чтобы отслеживать, из которого фрагмента данных функция read
возвращается затем. Используйте hasdata
и функции read
, чтобы продвинуться через целый набор данных и отфильтровать набор данных к только строкам, представляющим интерес. В этом случае строки, представляющие интерес являются полетами на United Airlines ("UA"), вылетающий от Бостона ("BOS").
subset = []; while hasdata(ds) t = read(ds); t = t(strcmp(t.UniqueCarrier, 'UA') & strcmp(t.Origin, 'BOS'), :); subset = vertcat(subset, t); end subset(1:10,[9,10,15:17])
ans=10×5 table
UniqueCarrier FlightNum ArrDelay DepDelay Origin
_____________ _________ ________ ________ ______
'UA' 121 -9 0 'BOS'
'UA' 1021 -9 -1 'BOS'
'UA' 519 15 8 'BOS'
'UA' 354 9 8 'BOS'
'UA' 701 -17 0 'BOS'
'UA' 673 -9 -1 'BOS'
'UA' 91 -3 2 'BOS'
'UA' 335 18 4 'BOS'
'UA' 1429 1 -2 'BOS'
'UA' 53 52 13 'BOS'
Введение в mapreduce
MapReduce является алгоритмическим методом, чтобы "разделить и завоевать" большие проблемы данных. В MATLAB mapreduce
требует трех входных параметров:
datastore
, чтобы считать данные из
Функция "картопостроителя", которой дают подмножество данных, чтобы работать с. Вывод функции карты является частичным вычислением. mapreduce
вызывает функцию картопостроителя одно время для каждого фрагмента в datastore
с каждым вызовом, действующим независимо.
Функция "редуктора", которой дают совокупные выходные параметры от функции картопостроителя. Функция редуктора заканчивает вычисление, начатое функцией картопостроителя, и выводит окончательный ответ.
Это - упрощение в некоторой степени, поскольку вывод вызова функции картопостроителя может быть переставлен и объединен интересными способами прежде чем быть переданным функции редуктора. Это будет исследовано позже в этом примере.
Использование mapreduce, чтобы выполнить вычисление
Простое использование mapreduce
должно найти самое долгое время полета в целом наборе данных авиакомпании. Для этого:
Функция "картопостроителя" вычисляет максимум каждого фрагмента от datastore
.
Функция "редуктора" затем вычисляет максимальное значение среди всех максимумов, вычисленных вызовами функции картопостроителя.
Во-первых, сбросьте datastore
и отфильтруйте переменные к одному столбцу интереса.
reset(ds);
ds.SelectedVariableNames = {'ActualElapsedTime'};
Запишите функцию картопостроителя, maxTimeMapper.m
m. Требуется три входных параметра:
Входные данные, который является таблицей, полученной путем применения функции read
к datastore
.
Набор настройки и контекстной информации, info
. Это может быть проигнорировано в большинстве случаев, как это здесь.
Промежуточный объект хранения данных, который записывает результаты вычислений от функции картопостроителя. Используйте функцию add
, чтобы добавить Пары ключ/значение к этому промежуточному выводу. В этом примере имя ключа ('MaxElapsedTime'
) произвольно.
Сохраните следующую функцию картопостроителя (maxTimeMapper.m
) в вашей текущей папке.
type maxTimeMapper
function maxTimeMapper(data, ~, intermKVStore) % Copyright 2014 The MathWorks, Inc. maxElaspedTime = max(data{:,:}); add(intermKVStore, 'MaxElaspedTime',maxElaspedTime); end
Затем, запишите функцию редуктора. Также требуется три входных параметра:
Набор входного параметра "ключи". Ключи будут обсуждены далее ниже, но они могут быть проигнорированы в некоторых простых проблемах, как они здесь.
Промежуточный ввод данных возражает, что mapreduce
передает функции редуктора. Это, которое данные в форме Пар ключ/значение, и вы используете hasnext
и функции getnext
, чтобы выполнить итерации через значения для каждого ключа.
Объект хранения данных окончательного результата. Используйте add
и функции addmulti
, чтобы непосредственно добавить Пары ключ/значение к выводу.
Сохраните следующую функцию редуктора (maxTimeReducer.m
) в вашей текущей папке.
type maxTimeReducer
function maxTimeReducer(~, intermValsIter, outKVStore) % Copyright 2014 The MathWorks, Inc. maxElaspedTime = -inf; while hasnext(intermValsIter) maxElaspedTime = max(maxElaspedTime, getnext(intermValsIter)); end add(outKVStore, 'MaxElaspedTime', maxElaspedTime); end
Если картопостроитель и функции редуктора записаны и сохранены в вашей текущей папке, можно вызвать mapreduce
с помощью datastore
, функции картопостроителя и функции редуктора. Если у вас будет Parallel Computing Toolbox (PCT), MATLAB автоматически запустит пул и параллелизирует выполнение. Используйте функцию readall
, чтобы отобразить результаты алгоритма MapReduce.
result = mapreduce(ds, @maxTimeMapper, @maxTimeReducer);
******************************** * MAPREDUCE PROGRESS * ******************************** Map 0% Reduce 0% Map 16% Reduce 0% Map 32% Reduce 0% Map 48% Reduce 0% Map 65% Reduce 0% Map 81% Reduce 0% Map 97% Reduce 0% Map 100% Reduce 0% Map 100% Reduce 100%
readall(result)
ans=1×2 table
Key Value
________________ ______
'MaxElaspedTime' [1650]
Использование ключей в mapreduce
Использование ключей является важной и мощной функцией mapreduce
. Каждый вызов функции картопостроителя добавляет промежуточные результаты к одному или нескольким названному "блоками", названными ключами. Количество вызовов функции картопостроителя mapreduce
соответствует количеству фрагментов в datastore
.
Если функция картопостроителя добавляет значения к нескольким ключам, это приводит ко множественным вызовам функции редуктора с каждым вызовом, работающим на промежуточных значениях только одного ключа. Функция mapreduce
автоматически справляется с этим перемещением данных между картой, и уменьшите фазы алгоритма.
Эта гибкость полезна во многих контекстах. Пример ниже использования вводит относительно очевидный путь к иллюстративным целям.
Вычисление мудрых группой метрик с mapreduce
Поведение функции картопостроителя в этом приложении является более комплексным. Для каждого поставщика услуг полета, найденного во входных данных, используйте функцию add
, чтобы добавить вектор значений. Этот вектор является количеством количества полетов для того поставщика услуг в каждый день в 21 + годы данных. Код поставщика услуг является ключом для этого вектора значений. Это гарантирует, что все данные для каждого поставщика услуг будут группироваться, когда mapreduce
передаст его функции редуктора.
Сохраните следующую функцию картопостроителя (countFlightsMapper.m
) в вашей текущей папке.
type countFlightsMapper
function countFlightsMapper(data, ~, intermKVStore) % Copyright 2014 The MathWorks, Inc. dayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1; daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1; [airlineName, ~, airlineIndex] = unique(data.UniqueCarrier, 'stable'); for i = 1:numel(airlineName) dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]); add(intermKVStore, airlineName{i}, dayTotals); end end
Функция редуктора является менее комплексной. Это просто выполняет итерации по промежуточным значениям и добавляет векторы вместе. При завершении это выводит значения в этом совокупном векторе. Обратите внимание на то, что функция редуктора не должна отсортировать или исследовать значения intermediateKeysIn
; каждый вызов функции редуктора mapreduce
только передает значения для одного поставщика услуг авиакомпании.
Сохраните следующую функцию редуктора (countFlightsReducer.m
) в вашей текущей папке.
type countFlightsReducer
function countFlightsReducer(intermKeysIn, intermValsIter, outKVStore) %countFlightsReducer Reducer function for mapreduce to count flights % Copyright 2014 The MathWorks, Inc. daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1; dayArray = zeros(daysSinceEpoch, 1); while hasnext(intermValsIter) dayArray = dayArray + getnext(intermValsIter); end add(outKVStore, intermKeysIn, dayArray); end
Сбросьте datastore
и выберите переменные интереса. Если картопостроитель и функции редуктора записаны и сохранены в вашей текущей папке, можно вызвать mapreduce
с помощью datastore
, функции картопостроителя и функции редуктора.
reset(ds); ds.SelectedVariableNames = {'Year', 'Month', 'DayofMonth', 'UniqueCarrier'}; result = mapreduce(ds, @countFlightsMapper, @countFlightsReducer);
******************************** * MAPREDUCE PROGRESS * ******************************** Map 0% Reduce 0% Map 16% Reduce 0% Map 32% Reduce 0% Map 48% Reduce 0% Map 65% Reduce 0% Map 81% Reduce 0% Map 97% Reduce 0% Map 100% Reduce 0% Map 100% Reduce 10% Map 100% Reduce 21% Map 100% Reduce 31% Map 100% Reduce 41% Map 100% Reduce 52% Map 100% Reduce 62% Map 100% Reduce 72% Map 100% Reduce 83% Map 100% Reduce 93% Map 100% Reduce 100%
result = readall(result);
В случае, если этот пример был запущен с только демонстрационным набором данных, загрузка, результаты алгоритма mapreduce
работают на целом наборе данных.
load airlineResults
Используя только лучшие 7 поставщиков услуг, примените фильтр к данным, чтобы сгладить эффекты перемещения выходных дней. Это в противном случае создало бы помехи визуализации.
lines = result.Value; lines = horzcat(lines{:}); [~,sortOrder] = sort(sum(lines), 'descend'); lines = lines(:,sortOrder(1:7)); result = result(sortOrder(1:7),:); lines(lines==0) = nan; for carrier=1:size(lines,2) lines(:,carrier) = filter(repmat(1/7, [7 1]), 1, lines(:,carrier)); end
Отобразите данные на графике.
figure('Position',[1 1 800 600]); plot(datetime(1987,10,1):caldays(1):datetime(2008,12,31),lines) title ('Domestic airline flights per day per carrier') xlabel('Date') ylabel('Flights per day (7-day moving average)') legend(result.Key, 'Location', 'South')
График показывает появление Southwest Airlines (WN) в это время период.
Этот пример только царапает поверхность того, что возможно с mapreduce
. См. документацию для mapreduce
для получения дополнительной информации, включая информацию об использовании его с Hadoop и MATLAB Distributed Computing Server.