В этом примере показано, как вычислить итоговую статистику, организованную по группам с помощью mapreduce
. Он демонстрирует использование анонимной функции для передачи дополнительного параметра группирования в параметризованную функцию map. Эта параметризация позволяет вам быстро пересчитать статистику с помощью другой сгруппированной переменной.
Создайте datastore с помощью airlinesmall.csv
набор данных. Этот 12-мегабайтный набор данных содержит 29 столбцов информации о рейсе для нескольких авиаперевозчиков, включая время прибытия и вылета. В данном примере выберите Month
, UniqueCarrier
(идентификатор авиаперевозчика), и ArrDelay
(задержка прибытия рейса) как интересующие переменные.
ds = tabularTextDatastore('airlinesmall.csv', 'TreatAsMissing', 'NA'); ds.SelectedVariableNames = {'Month', 'UniqueCarrier', 'ArrDelay'};
datastore лечит 'NA'
значения как отсутствующие и заменяет отсутствующие значения на NaN
значения по умолчанию. Кроме того, SelectedVariableNames
свойство позволяет работать только с выбранными интересующими вас переменными, которые можно проверить используя preview
.
preview(ds)
ans=8×3 table
Month UniqueCarrier ArrDelay
_____ _____________ ________
10 {'PS'} 8
10 {'PS'} 8
10 {'PS'} 21
10 {'PS'} 13
10 {'PS'} 4
10 {'PS'} 59
10 {'PS'} 3
10 {'PS'} 11
The mapreduce
функция требует функции map и функции reduce в качестве входов. mapper получает блоки данных и выводит промежуточные результаты. Редуктор считывает промежуточные результаты и дает конечный результат.
В этом примере mapper вычисляет сгруппированную статистику для каждого блока данных и сохраняет статистику как промежуточные пары "ключ-значение". Каждая промежуточная пара "ключ-значение" имеет ключ для уровня группы и массив ячеек значений с соответствующей статистикой.
Эта функция map принимает четыре входных параметров, в то время как mapreduce
функция требует, чтобы функция map принимала ровно три входных параметров. Вызов mapreduce
(ниже) показывает, как пройти в этом дополнительном параметре.
Отобразите файл функции map.
function statsByGroupMapper(data, ~, intermKVStore, groupVarName) % Data is a n-by-3 table. Remove missing values first delays = data.ArrDelay; groups = data.(groupVarName); notNaN =~isnan(delays); groups = groups(notNaN); delays = delays(notNaN); % Find the unique group levels in this chunk [intermKeys,~,idx] = unique(groups, 'stable'); % Group delays by idx and apply @grpstatsfun function to each group intermVals = accumarray(idx,delays,size(intermKeys),@grpstatsfun); addmulti(intermKVStore,intermKeys,intermVals); function out = grpstatsfun(x) n = length(x); % count m = sum(x)/n; % mean v = sum((x-m).^2)/n; % variance s = sum((x-m).^3)/n; % skewness without normalization k = sum((x-m).^4)/n; % kurtosis without normalization out = {[n, m, v, s, k]}; end end
После фазы Map, mapreduce
группирует промежуточные пары "ключ-значение" по уникальному ключу (в этом случае идентификатор авиаперевозчика), поэтому каждый вызов функции сокращения работает на значениях, сопоставленных с одной авиакомпанией. Редуктор получает список промежуточной статистики для авиакомпании, заданный входом ключом (intermKey
) и объединяет статистику в отдельные векторы: n
, m
, v
, s
, и k
. Затем редуктор использует эти векторы, чтобы вычислить количество, среднее значение, отклонение, перекос и куртоз для одной авиакомпании. Конечным ключом является код авиаперевозчика, и соответствующие значения хранятся в структуре с пятью полями.
Отобразите файл функции сокращения.
function statsByGroupReducer(intermKey, intermValIter, outKVStore) n = []; m = []; v = []; s = []; k = []; % Get all sets of intermediate statistics while hasnext(intermValIter) value = getnext(intermValIter); n = [n; value(1)]; m = [m; value(2)]; v = [v; value(3)]; s = [s; value(4)]; k = [k; value(5)]; end % Note that this approach assumes the concatenated intermediate values fit % in memory. Refer to the reducer function, covarianceReducer, of the % CovarianceMapReduceExample for an alternative pairwise reduction approach % Combine the intermediate results count = sum(n); meanVal = sum(n.*m)/count; d = m - meanVal; variance = (sum(n.*v) + sum(n.*d.^2))/count; skewnessVal = (sum(n.*s) + sum(n.*d.*(3*v + d.^2)))./(count*variance^(1.5)); kurtosisVal = (sum(n.*k) + sum(n.*d.*(4*s + 6.*v.*d +d.^3)))./(count*variance^2); outValue = struct('Count',count, 'Mean',meanVal, 'Variance',variance,... 'Skewness',skewnessVal, 'Kurtosis',kurtosisVal); % Add results to the output datastore add(outKVStore,intermKey,outValue); end
Использование mapreduce
для применения карты и сокращения функций к datastore, ds
. Поскольку параметризованная функция map принимает четыре входных параметра, используйте анонимную функцию, чтобы пройти в идентификаторах авиаперевозчика в качестве четвертого входа.
outds1 = mapreduce(ds, ... @(data,info,kvs)statsByGroupMapper(data,info,kvs,'UniqueCarrier'), ... @statsByGroupReducer);
******************************** * MAPREDUCE PROGRESS * ******************************** Map 0% Reduce 0% Map 16% Reduce 0% Map 32% Reduce 0% Map 48% Reduce 0% Map 65% Reduce 0% Map 81% Reduce 0% Map 97% Reduce 0% Map 100% Reduce 0% Map 100% Reduce 10% Map 100% Reduce 21% Map 100% Reduce 31% Map 100% Reduce 41% Map 100% Reduce 52% Map 100% Reduce 62% Map 100% Reduce 72% Map 100% Reduce 83% Map 100% Reduce 93% Map 100% Reduce 100%
mapreduce
возвращает datastore, outds1
, с файлами в текущей папке.
Прочтите конечные результаты из выхода datastore.
r1 = readall(outds1)
r1=29×2 table
Key Value
__________ ____________
{'PS' } {1x1 struct}
{'TW' } {1x1 struct}
{'UA' } {1x1 struct}
{'WN' } {1x1 struct}
{'EA' } {1x1 struct}
{'HP' } {1x1 struct}
{'NW' } {1x1 struct}
{'PA (1)'} {1x1 struct}
{'PI' } {1x1 struct}
{'CO' } {1x1 struct}
{'DL' } {1x1 struct}
{'AA' } {1x1 struct}
{'US' } {1x1 struct}
{'AS' } {1x1 struct}
{'ML (1)'} {1x1 struct}
{'AQ' } {1x1 struct}
⋮
Чтобы лучше организовать результаты, преобразуйте структуру, содержащую статистику, в таблицу и используйте идентификаторы перевозчика в качестве имен строк. mapreduce
возвращает пары "ключ-значение" в том же порядке, в котором они были добавлены функцией сокращения, поэтому сортируйте таблицу по идентификатору поставщика услуг.
statsByCarrier = struct2table(cell2mat(r1.Value), 'RowNames', r1.Key); statsByCarrier = sortrows(statsByCarrier, 'RowNames')
statsByCarrier=29×5 table
Count Mean Variance Skewness Kurtosis
_____ _______ ________ ________ ________
9E 507 5.3669 1889.5 6.2676 61.706
AA 14578 6.9598 1123 6.0321 93.085
AQ 153 1.0065 230.02 3.9905 28.383
AS 2826 8.0771 717 3.6547 24.083
B6 793 11.936 2087.4 4.0072 27.45
CO 7999 7.048 1053.8 4.6601 41.038
DH 673 7.575 1491.7 2.9929 15.461
DL 16284 7.4971 697.48 4.4746 41.115
EA 875 8.2434 1221.3 5.2955 43.518
EV 1655 10.028 1325.4 2.9347 14.878
F9 332 8.4849 1138.6 4.2983 30.742
FL 1248 9.5144 1360.4 3.6277 21.866
HA 271 -1.5387 323.27 8.4245 109.63
HP 3597 7.5897 744.51 5.2534 50.004
ML (1) 69 0.15942 169.32 2.8354 16.559
MQ 3805 8.8591 1530.5 7.054 105.51
⋮
Использование анонимной функции для прохождения в сгруппированной переменной позволяет быстро пересчитать статистику с другой группировкой.
В данном примере пересчитайте статистику и сгруппируйте результаты по Month
вместо идентификаторов перевозчика путем простого прохождения Month
переменная в анонимную функцию.
outds2 = mapreduce(ds, ... @(data,info,kvs)statsByGroupMapper(data,info,kvs,'Month'), ... @statsByGroupReducer);
******************************** * MAPREDUCE PROGRESS * ******************************** Map 0% Reduce 0% Map 16% Reduce 0% Map 32% Reduce 0% Map 48% Reduce 0% Map 65% Reduce 0% Map 81% Reduce 0% Map 97% Reduce 0% Map 100% Reduce 0% Map 100% Reduce 17% Map 100% Reduce 33% Map 100% Reduce 50% Map 100% Reduce 67% Map 100% Reduce 83% Map 100% Reduce 100%
Прочитайте конечные результаты и организуйте их в таблицу.
r2 = readall(outds2); r2 = sortrows(r2,'Key'); statsByMonth = struct2table(cell2mat(r2.Value)); mon = {'Jan','Feb','Mar','Apr','May','Jun', ... 'Jul','Aug','Sep','Oct','Nov','Dec'}; statsByMonth.Properties.RowNames = mon
statsByMonth=12×5 table
Count Mean Variance Skewness Kurtosis
_____ ______ ________ ________ ________
Jan 9870 8.5954 973.69 4.1142 35.152
Feb 9160 7.3275 911.14 4.7241 45.03
Mar 10219 7.5536 976.34 5.1678 63.155
Apr 9949 6.0081 1077.4 8.9506 170.52
May 10180 5.2949 737.09 4.0535 30.069
Jun 10045 10.264 1266.1 4.8777 43.5
Jul 10340 8.7797 1069.7 5.1428 64.896
Aug 10470 7.4522 908.64 4.1959 29.66
Sep 9691 3.6308 664.22 4.6573 38.964
Oct 10590 4.6059 684.94 5.6407 74.805
Nov 10071 5.2835 808.65 8.0297 186.68
Dec 10281 10.571 1087.6 3.8564 28.823
Вот карта и сокращение функций, которые mapreduce
применяется к данным.
function statsByGroupMapper(data, ~, intermKVStore, groupVarName) % Data is a n-by-3 table. Remove missing values first delays = data.ArrDelay; groups = data.(groupVarName); notNaN =~isnan(delays); groups = groups(notNaN); delays = delays(notNaN); % Find the unique group levels in this chunk [intermKeys,~,idx] = unique(groups, 'stable'); % Group delays by idx and apply @grpstatsfun function to each group intermVals = accumarray(idx,delays,size(intermKeys),@grpstatsfun); addmulti(intermKVStore,intermKeys,intermVals); function out = grpstatsfun(x) n = length(x); % count m = sum(x)/n; % mean v = sum((x-m).^2)/n; % variance s = sum((x-m).^3)/n; % skewness without normalization k = sum((x-m).^4)/n; % kurtosis without normalization out = {[n, m, v, s, k]}; end end %--------------------------------------------------------------------- function statsByGroupReducer(intermKey, intermValIter, outKVStore) n = []; m = []; v = []; s = []; k = []; % Get all sets of intermediate statistics while hasnext(intermValIter) value = getnext(intermValIter); n = [n; value(1)]; m = [m; value(2)]; v = [v; value(3)]; s = [s; value(4)]; k = [k; value(5)]; end % Note that this approach assumes the concatenated intermediate values fit % in memory. Refer to the reducer function, covarianceReducer, of the % CovarianceMapReduceExample for an alternative pairwise reduction approach % Combine the intermediate results count = sum(n); meanVal = sum(n.*m)/count; d = m - meanVal; variance = (sum(n.*v) + sum(n.*d.^2))/count; skewnessVal = (sum(n.*s) + sum(n.*d.*(3*v + d.^2)))./(count*variance^(1.5)); kurtosisVal = (sum(n.*k) + sum(n.*d.*(4*s + 6.*v.*d +d.^3)))./(count*variance^2); outValue = struct('Count',count, 'Mean',meanVal, 'Variance',variance,... 'Skewness',skewnessVal, 'Kurtosis',kurtosisVal); % Add results to the output datastore add(outKVStore,intermKey,outValue); end %---------------------------------------------------------------------
mapreduce
| tabularTextDatastore