Использование MapReduce для вычисления ковариационных и смежных величин

В этом примере показано, как вычислить среднее значение и ковариацию для нескольких переменных в большом наборе данных с помощью mapreduce. Затем он использует ковариацию, чтобы выполнить несколько последующих вычислений, которые не требуют другой итерации по всему набору данных.

Подготовка данных

Создайте datastore с помощью airlinesmall.csv набор данных. Этот 12-мегабайтный набор данных содержит 29 столбцов информации о рейсе для нескольких авиаперевозчиков, включая время прибытия и вылета. В этом примере выберите ActualElapsedTime (общее время рейса), Distance (общая дальность рейса), DepDelay(задержка вылета рейса), и ArrDelay(задержка прибытия рейса) как интересующие переменные.

ds = tabularTextDatastore('airlinesmall.csv', 'TreatAsMissing', 'NA');
ds.SelectedVariableNames = {'ActualElapsedTime', 'Distance', ...
                                     'DepDelay', 'ArrDelay'};

datastore лечит 'NA' значения как отсутствующие и заменяет отсутствующие значения на NaN значения по умолчанию. Кроме того, SelectedVariableNames свойство позволяет работать только с выбранными интересующими вас переменными, которые можно проверить используя preview.

preview(ds)
ans=8×4 table
    ActualElapsedTime    Distance    DepDelay    ArrDelay
    _________________    ________    ________    ________

            53             308          12           8   
            63             296           1           8   
            83             480          20          21   
            59             296          12          13   
            77             373          -1           4   
            61             308          63          59   
            84             447          -2           3   
           155             954          -1          11   

Запуск MapReduce

The mapreduce функция требует функции map и функции reduce в качестве входов. mapper получает блоки данных и выводит промежуточные результаты. Редуктор считывает промежуточные результаты и дает конечный результат.

В этом примере mapper вычисляет количество, среднее значение и ковариацию для переменных в каждом блоке данных в datastore, ds. Затем mapper сохраняет вычисленные значения для каждого блока как промежуточную пару "ключ-значение", состоящую из одного ключа с массивом ячеек, содержащим три вычисленных значения.

Отобразите файл функции map.

function covarianceMapper(t,~,intermKVStore)
  % Get data from input table and remove any rows with missing values
  x = t{:,:};
  x = x(~any(isnan(x),2),:);

  % Compute and save the count, mean, and covariance
  n = size(x,1);
  m = mean(x,1);
  c = cov(x,1);

  % Store values as a single item in the intermediate key/value store
  add(intermKVStore,'key',{n m c})
end

Редуктор объединяет промежуточные результаты для каждого блока, чтобы получить количество, среднее значение и ковариацию для каждой интересующей переменной во всем наборе данных. Редуктор сохраняет последние пары "ключ-значение" для ключей 'count', 'mean', и 'cov' с соответствующими значениями для каждой переменной.

Отобразите файл функции сокращения.

function covarianceReducer(~,intermValIter,outKVStore)
  % We will combine results computed in the mapper for different chunks of
  % the data, updating the count, mean, and covariance each time we add a new
  % chunk.

  % First, initialize everything to zero (scalar 0 is okay)
  n1 = 0; % no rows so far
  m1 = 0; % mean so far
  c1 = 0; % covariance so far

  while hasnext(intermValIter)
    % Get the next chunk, and extract the count, mean, and covariance
    t = getnext(intermValIter);
    n2 = t{1};
    m2 = t{2};
    c2 = t{3};
    
    % Use weighting formulas to update the values so far
    n = n1+n2;                     % new count
    m = (n1*m1 + n2*m2) / n;       % new mean
    
    % New covariance is a weighted combination of the two covariance, plus
    % additional terms that relate to the difference in means
    c1 = (n1*c1 + n2*c2 + n1*(m1-m)'*(m1-m) + n2*(m2-m)'*(m2-m))/ n;
    
    % Store the new mean and count for the next iteration
    m1 = m;
    n1 = n;
  end

  % Save results in the output key/value store
  add(outKVStore,'count',n1);
  add(outKVStore,'mean',m1);
  add(outKVStore,'cov',c1);
end

Использование mapreduce для применения карты и сокращения функций к datastore, ds.

outds = mapreduce(ds, @covarianceMapper, @covarianceReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce 100%

mapreduce возвращает datastore, outds, с файлами в текущей папке.

Просмотр результатов mapreduce вызов при помощи readall функция на выход datastore.

results = readall(outds)
results=3×2 table
       Key                      Value               
    _________    ___________________________________

    {'count'}    {[                         120664]}
    {'mean' }    {[120.2452 703.3926 8.1334 7.1235]}
    {'cov'  }    {4x4 double                       }

Count = results.Value{1};
MeanVal = results.Value{2};
Covariance = results.Value{3};

Вычислительная матрица корреляции

Ковариационные, средние и счетные значения полезны для выполнения дальнейших вычислений. Вычислите матрицу корреляции путем нахождения стандартных отклонений и нормализации их к форме корреляции.

s = sqrt(diag(Covariance));
Correlation = Covariance ./ (s*s')
Correlation = 4×4

    1.0000    0.9666    0.0278    0.0902
    0.9666    1.0000    0.0216    0.0013
    0.0278    0.0216    1.0000    0.8748
    0.0902    0.0013    0.8748    1.0000

Истекшее время (первый столбец) и расстояние (второй столбец) сильно коррелируют, так как Correlation(2,1) = 0.9666. Задержка отправления (третий столбец) и задержка прибытия (четвертый столбец) также сильно коррелируют, так как Correlation(4,3) = 0.8748.

Вычисление коэффициентов регрессии

Вычислите некоторые коэффициенты регрессии, чтобы предсказать задержку прибытия, ArrDelay, используя другие три переменные в качестве предикторов.

slopes = Covariance(1:3,1:3)\Covariance(1:3,4);
intercept = MeanVal(4) - MeanVal(1:3)*slopes;
b = table([intercept; slopes], 'VariableNames', {'Estimate'}, ...
    'RowNames', {'Intercept','ActualElapsedTime','Distance','DepDelay'})
b=4×1 table
                         Estimate 
                         _________

    Intercept              -19.912
    ActualElapsedTime      0.56278
    Distance             -0.068721
    DepDelay               0.94689

Выполните PCA

Использование svd для выполнения PCA (анализа основных компонентов). PCA является методом для нахождения более низких размерных сводных данных набора данных. Следующий расчет является упрощенной версией PCA, но из pca доступно больше опций и pcacov функции в Statistics and Machine Learning Toolbox™.

Можно выполнить PCA, используя или ковариацию, или корреляцию. В этом случае используйте корреляцию, поскольку различие в масштабе переменных большая. Первые два компонента захватывают большую часть отклонения.

[~,latent,pcacoef] = svd(Correlation);
latent = diag(latent)
latent = 4×1

    2.0052
    1.8376
    0.1407
    0.0164

Отобразите матрицу коэффициентов. Каждый столбец матрицы коэффициентов описывает, как один компонент определяется как линейная комбинация стандартизированных исходных переменных. Первый компонент в основном является средним значением первых двух переменных с некоторым дополнительным вкладом от других переменных. Точно так же второй компонент в основном является средним значением двух последних переменных.

pcacoef
pcacoef = 4×4

   -0.6291    0.3222   -0.2444   -0.6638
   -0.6125    0.3548    0.2591    0.6572
   -0.3313   -0.6244    0.6673   -0.2348
   -0.3455   -0.6168   -0.6541    0.2689

Локальные функции

Вот карта и сокращение функций, которые mapreduce применяется к данным.

function covarianceMapper(t,~,intermKVStore)
  % Get data from input table and remove any rows with missing values
  x = t{:,:};
  x = x(~any(isnan(x),2),:);

  % Compute and save the count, mean, and covariance
  n = size(x,1);
  m = mean(x,1);
  c = cov(x,1);

  % Store values as a single item in the intermediate key/value store
  add(intermKVStore,'key',{n m c})
end
%------------------------------------------------------------------
function covarianceReducer(~,intermValIter,outKVStore)
  % We will combine results computed in the mapper for different chunks of
  % the data, updating the count, mean, and covariance each time we add a new
  % chunk.

  % First, initialize everything to zero (scalar 0 is okay)
  n1 = 0; % no rows so far
  m1 = 0; % mean so far
  c1 = 0; % covariance so far

  while hasnext(intermValIter)
    % Get the next chunk, and extract the count, mean, and covariance
    t = getnext(intermValIter);
    n2 = t{1};
    m2 = t{2};
    c2 = t{3};
    
    % Use weighting formulas to update the values so far
    n = n1+n2;                     % new count
    m = (n1*m1 + n2*m2) / n;       % new mean
    
    % New covariance is a weighted combination of the two covariance, plus
    % additional terms that relate to the difference in means
    c1 = (n1*c1 + n2*c2 + n1*(m1-m)'*(m1-m) + n2*(m2-m)'*(m2-m))/ n;
    
    % Store the new mean and count for the next iteration
    m1 = m;
    n1 = n;
  end

  % Save results in the output key/value store
  add(outKVStore,'count',n1);
  add(outKVStore,'mean',m1);
  add(outKVStore,'cov',c1);
end
%------------------------------------------------------------------

См. также

|

Похожие темы