В этом примере показано, как использовать datastore
и mapreduce
функции, чтобы обработать большую сумму основанных на файле данных. Алгоритм MapReduce является оплотом многих современных "больших данных" приложения. Этот пример работает с одиночным компьютером, но код может масштабировать, чтобы использовать Hadoop®.
В этом примере набор данных является набором записей от американской Статистической Ассоциации для США внутренние полеты между 1 987 и 2008. Если вы экспериментировали с "большими данными" прежде, можно уже быть знакомы с этим набором данных. Небольшое подмножество этого набора данных включено с MATLAB®, чтобы позволить вам запускать это и другие примеры.
Введение в datastore
Создание datastore
позволяет вам получать доступ к набору данных основанным на блоке способом. datastore
может обработать произвольно большие объемы данных, и данные могут даже быть распространены через несколько файлов. Можно создать datastore
для многих типов файлов, включая набор табличных текстовых файлов (продемонстрированный здесь), база данных SQL (требуемый Database Toolbox™) или Hadoop® Distributed File System (HDFS™).
Создайте datastore для набора табличных текстовых файлов и предварительно просмотрите содержимое.
ds = datastore('airlinesmall.csv');
dsPreview = preview(ds);
dsPreview(:,10:15)
ans=8×6 table
FlightNum TailNum ActualElapsedTime CRSElapsedTime AirTime ArrDelay
_________ _______ _________________ ______________ _______ ________
1503 {'NA'} 53 57 {'NA'} 8
1550 {'NA'} 63 56 {'NA'} 8
1589 {'NA'} 83 82 {'NA'} 21
1655 {'NA'} 59 58 {'NA'} 13
1702 {'NA'} 77 72 {'NA'} 4
1729 {'NA'} 61 65 {'NA'} 59
1763 {'NA'} 84 79 {'NA'} 3
1800 {'NA'} 155 143 {'NA'} 11
Datastore автоматически анализирует входные данные и высказывает лучшее предположение относительно типа данных в каждом столбце. В этом случае используйте 'TreatAsMissing'
Аргумент пары "имя-значение", чтобы иметь datastore
замените отсутствующие значения правильно. Для числовых переменных (таких как 'AirTime'
datastore
заменяет каждый экземпляр 'NA'
с NaN
значение, которое является представлением арифметики IEEE для Не числа.
ds = datastore('airlinesmall.csv', 'TreatAsMissing', 'NA'); ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'TailNum')} = '%s'; ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'CancellationCode')} = '%s'; dsPreview = preview(ds); dsPreview(:,{'AirTime','TaxiIn','TailNum','CancellationCode'})
ans=8×4 table
AirTime TaxiIn TailNum CancellationCode
_______ ______ _______ ________________
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
datastore
объекты содержат внутренний указатель, чтобы отслеживать который блок данных read
функция возвращается затем. Используйте hasdata
и read
функции, чтобы продвинуться через целый набор данных и отфильтровать набор данных только к строкам, представляющим интерес. В этом случае строки, представляющие интерес являются рейсами на United Airlines ("UA"), вылетающий от Бостона ("BOS").
subset = []; while hasdata(ds) t = read(ds); t = t(strcmp(t.UniqueCarrier, 'UA') & strcmp(t.Origin, 'BOS'), :); subset = vertcat(subset, t); end subset(1:10,[9,10,15:17])
ans=10×5 table
UniqueCarrier FlightNum ArrDelay DepDelay Origin
_____________ _________ ________ ________ _______
{'UA'} 121 -9 0 {'BOS'}
{'UA'} 1021 -9 -1 {'BOS'}
{'UA'} 519 15 8 {'BOS'}
{'UA'} 354 9 8 {'BOS'}
{'UA'} 701 -17 0 {'BOS'}
{'UA'} 673 -9 -1 {'BOS'}
{'UA'} 91 -3 2 {'BOS'}
{'UA'} 335 18 4 {'BOS'}
{'UA'} 1429 1 -2 {'BOS'}
{'UA'} 53 52 13 {'BOS'}
Введение в mapreduce
MapReduce является алгоритмическим методом, чтобы "разделить и завоевать" большие проблемы данных. В MATLAB, mapreduce
требует трех входных параметров:
datastore
считывать данные из
Функция "картопостроителя", которой дают подмножество данных, чтобы работать с. Выход функции карты является частичным вычислением. mapreduce
вызывает функцию картопостроителя одно время для каждого блока в datastore
, с каждым вызовом, действующим независимо.
Функция "редуктора", которой дают совокупные выходные параметры от функции картопостроителя. Функция редуктора закончила расчет, начатый функцией картопостроителя, и выводит окончательный ответ.
Это - упрощение в некоторой степени, поскольку выход вызова функции картопостроителя может быть переставлен и объединен интересными способами прежде чем быть переданным функции редуктора. Это будет исследовано позже в этом примере.
mapreduce
выполнять расчетПростое использование mapreduce
должен найти самое долгое время полета в целом наборе данных авиакомпании. Для этого:
Функция "картопостроителя" вычисляет максимум каждого блока из datastore
.
Функция "редуктора" затем вычисляет максимальное значение среди всех максимумов, вычисленных вызовами функции картопостроителя.
Во-первых, сбросьте datastore
и отфильтруйте переменные к одному столбцу интереса.
reset(ds);
ds.SelectedVariableNames = {'ActualElapsedTime'};
Запишите функцию картопостроителя, maxTimeMapper.m
. Требуется три входных параметра:
Входные данные, который является таблицей, полученной путем применения read
функционируйте к datastore
.
Набор настройки и контекстной информации, info
. Это может быть проигнорировано в большинстве случаев, как это здесь.
Промежуточный объект хранения данных, который записывает результаты вычислений от функции картопостроителя. Используйте add
функция, чтобы добавить Пары ключ/значение в этот промежуточный выход. В этом примере, имени ключа ('MaxElapsedTime'
) произвольно.
Сохраните следующую функцию картопостроителя (maxTimeMapper.m
) в вашей текущей папке.
function maxTimeMapper(data, ~, intermKVStore) maxElapsedTime = max(data{:,:}); add(intermKVStore, "MaxElapsedTime", maxElapsedTime) end
Затем запишите функцию редуктора. Также требуется три входных параметра:
Набор входа "ключи". Ключи будут обсуждены далее ниже, но они могут быть проигнорированы в некоторых простых проблемах, как они здесь.
Промежуточный ввод данных возражает тому mapreduce
передачи в функцию редуктора. Эти данные в форме Пар ключ/значение, и вы используете hasnext
и getnext
функции, чтобы выполнить итерации через значения для каждого ключа.
Объект хранения данных окончательного результата. Используйте add
и addmulti
функции, чтобы непосредственно добавить Пары ключ/значение в выход.
Сохраните следующую функцию редуктора (maxTimeReducer.m
) в вашей текущей папке.
function maxTimeReducer(~, intermValsIter, outKVStore) maxElapsedTime = -Inf; while(hasnext(intermValsIter)) maxElapsedTime = max(maxElapsedTime, getnext(intermValsIter)); end add(outKVStore, "MaxElapsedTime", maxElapsedTime); end
Если картопостроитель и функции редуктора записаны и сохранены в вашей текущей папке, можно вызвать mapreduce
использование datastore
, функция картопостроителя и функция редуктора. Если у вас будет Parallel Computing Toolbox (PCT), MATLAB автоматически запустит пул и параллелизирует выполнение. Используйте readall
функция, чтобы отобразить результаты алгоритма MapReduce.
result = mapreduce(ds, @maxTimeMapper, @maxTimeReducer);
******************************** * MAPREDUCE PROGRESS * ******************************** Map 0% Reduce 0% Map 16% Reduce 0% Map 32% Reduce 0% Map 48% Reduce 0% Map 65% Reduce 0% Map 81% Reduce 0% Map 97% Reduce 0% Map 100% Reduce 0% Map 100% Reduce 100%
readall(result)
ans=1×2 table
Key Value
__________________ ________
{'MaxElapsedTime'} {[1650]}
Использование ключей в mapreduce
Использование ключей является важной и мощной функцией mapreduce
. Каждый вызов функции картопостроителя добавляет промежуточные результаты в один или несколько названного "блоками", названными ключами. Количество вызовов картопостроителя функционирует mapreduce
соответствует количеству блоков в datastore
.
Если функция картопостроителя добавляет значения в несколько ключей, это приводит ко множественным вызовам функции редуктора с каждым вызовом, работающим над промежуточными значениями только одного ключа. mapreduce
функция автоматически справляется с этим перемещением данных между картой, и уменьшайте фазы алгоритма.
Эта гибкость полезна во многих контекстах. Пример ниже использования вводит относительно очевидный путь к иллюстративным целям.
Вычисление мудрых группой метрик с mapreduce
Поведение функции картопостроителя в этом приложении является более комплексным. Для каждого поставщика услуг рейса, найденного во входных данных, используйте add
функция, чтобы добавить вектор значений. Этот вектор является количеством количества рейсов для того поставщика услуг в каждый день в 21 + годы данных. Код поставщика услуг является ключом для этого вектора значений. Это гарантирует, что все данные для каждого поставщика услуг будут группироваться когда mapreduce
передачи это к функции редуктора.
Сохраните следующую функцию картопостроителя (countFlightsMapper.m
) в вашей текущей папке.
function countFlightsMapper(data, ~, intermKVStore) dayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1; daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1; [airlineName, ~, airlineIndex] = unique(data.UniqueCarrier, 'stable'); for i = 1:numel(airlineName) dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]); add(intermKVStore, airlineName{i}, dayTotals); end end
Функция редуктора является менее комплексной. Это просто выполняет итерации по промежуточным значениям и добавляет векторы вместе. При завершении это выводит значения в этом совокупном векторе. Обратите внимание на то, что функция редуктора не должна отсортировать или исследовать intermediateKeysIn
значения; каждый вызов редуктора функционирует mapreduce
только передает значения для одного поставщика услуг авиакомпании.
Сохраните следующую функцию редуктора (countFlightsReducer.m
) в вашей текущей папке.
function countFlightsReducer(intermKeysIn, intermValsIter, outKVStore) daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1; dayArray = zeros(daysSinceEpoch, 1); while hasnext(intermValsIter) dayArray = dayArray + getnext(intermValsIter); end add(outKVStore, intermKeysIn, dayArray); end
Сбросьте datastore
и выберите переменные интереса. Если картопостроитель и функции редуктора записаны и сохранены в вашей текущей папке, можно вызвать mapreduce
использование datastore
, функция картопостроителя и функция редуктора.
reset(ds); ds.SelectedVariableNames = {'Year', 'Month', 'DayofMonth', 'UniqueCarrier'}; result = mapreduce(ds, @countFlightsMapper, @countFlightsReducer);
******************************** * MAPREDUCE PROGRESS * ******************************** Map 0% Reduce 0% Map 16% Reduce 0% Map 32% Reduce 0% Map 48% Reduce 0% Map 65% Reduce 0% Map 81% Reduce 0% Map 97% Reduce 0% Map 100% Reduce 0% Map 100% Reduce 10% Map 100% Reduce 21% Map 100% Reduce 31% Map 100% Reduce 41% Map 100% Reduce 52% Map 100% Reduce 62% Map 100% Reduce 72% Map 100% Reduce 83% Map 100% Reduce 93% Map 100% Reduce 100%
result = readall(result);
В случае, если этот пример был запущен только с набором выборочных данных, загрузите результаты mapreduce
алгоритм работает на целом наборе данных.
load airlineResults
Только Используя лучшие 7 поставщиков услуг, примените фильтр к данным, чтобы сгладить эффекты перемещения выходных дней. Это в противном случае создало бы помехи визуализации.
lines = result.Value; lines = horzcat(lines{:}); [~,sortOrder] = sort(sum(lines), 'descend'); lines = lines(:,sortOrder(1:7)); result = result(sortOrder(1:7),:); lines(lines==0) = nan; for carrier=1:size(lines,2) lines(:,carrier) = filter(repmat(1/7, [7 1]), 1, lines(:,carrier)); end
Отобразите данные на графике.
figure('Position',[1 1 800 600]); plot(datetime(1987,10,1):caldays(1):datetime(2008,12,31),lines) title ('Domestic airline flights per day per carrier') xlabel('Date') ylabel('Flights per day (7-day moving average)') legend(result.Key, 'Location', 'South')
График показывает появление Southwest Airlines (WN) в это время период.
Этот пример только царапает поверхность того, что возможно с mapreduce
. См. документацию для mapreduce
для получения дополнительной информации, включая информацию об использовании его с Hadoop и MATLAB® Parallel Server™.
Перечисленный здесь локальные функции что mapreduce
применяется к данным.
function maxTimeMapper(data, ~, intermKVStore) maxElapsedTime = max(data{:,:}); add(intermKVStore, "MaxElapsedTime", maxElapsedTime) end %----------------------------------------------------------------------- function maxTimeReducer(~, intermValsIter, outKVStore) maxElapsedTime = -Inf; while(hasnext(intermValsIter)) maxElapsedTime = max(maxElapsedTime, getnext(intermValsIter)); end add(outKVStore, "MaxElapsedTime", maxElapsedTime); end %----------------------------------------------------------------------- function countFlightsMapper(data, ~, intermKVStore) dayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1; daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1; [airlineName, ~, airlineIndex] = unique(data.UniqueCarrier, 'stable'); for i = 1:numel(airlineName) dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]); add(intermKVStore, airlineName{i}, dayTotals); end end %----------------------------------------------------------------------- function countFlightsReducer(intermKeysIn, intermValsIter, outKVStore) daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1; dayArray = zeros(daysSinceEpoch, 1); while hasnext(intermValsIter) dayArray = dayArray + getnext(intermValsIter); end add(outKVStore, intermKeysIn, dayArray); end %-----------------------------------------------------------------------