Разделите Datastore параллельно

Разделение datastore параллельно, с фрагментом datastore на каждом рабочем в параллельном пуле, может предоставить преимущества во многих случаях:

  • Выполните некоторое действие только с одной частью целого datastore, или на нескольких заданных частях одновременно.

  • Ищите определенные значения в хранилище данных со всеми рабочими, действующими одновременно на их собственные разделы.

  • Выполните вычисление сокращения на рабочих через все разделы.

В этом примере показано, как использовать partition параллелизировать чтение данных из datastore. Это использует маленький datastore данных авиакомпании, обеспеченных в MATLAB®, и находит среднее значение незначений NaN от его 'ArrDelay' столбец.

Простой способ вычислить среднее значение состоит в том, чтобы разделить сумму всех незначений NaN количеством незначений NaN. Следующий код делает это для datastore сначала непараллельным способом. Чтобы начаться, вы задаете функцию, чтобы накопить количество и сумму. Если вы хотите запустить этот пример, скопируйте и сохраните эту функцию в папке на пути поиска файлов команды MATLAB.

function [total,count] = sumAndCountArrivalDelay(ds)
total = 0;
count = 0;
while hasdata(ds)
    data = read(ds);
    total = total + sum(data.ArrDelay,1,'OmitNaN');
    count = count + sum(~isnan(data.ArrDelay));
end
end

Следующий код создает datastore, вызывает функцию и вычисляет среднее значение без любого параллельного выполнения. tic и toc функции используются ко времени выполнение, здесь и в более поздних аналогичных случаях.

ds = datastore(repmat({'airlinesmall.csv'},20,1),'TreatAsMissing','NA');
ds.SelectedVariableNames = 'ArrDelay';

reset(ds);
tic
  [total,count] = sumAndCountArrivalDelay(ds)
sumtime = toc
mean = total/count
total =

    17211680


count =

     2417320


sumtime =

    7.7905


mean =

    7.1201

partition функция позволяет вам делить datastore в меньшие части, каждый представленный как сам datastore. Эти меньшие хранилища данных работают полностью друг независимо от друга, так, чтобы можно было работать с ними в функциях языка параллельного программирования, таких как parfor циклы и spmd блоки.

Номер разделов в следующем коде определяется numpartitions функция, на основе самого datastore (ds) и параллельный пул (gcpразмер. Это не обязательно равняется количеству рабочих в пуле. В этом случае номер итераций цикла затем определяется к количеству разделов (N).

Следующий код запускает параллельный пул на локальном кластере, затем делит datastore среди рабочих для итерации по циклу. Снова, отдельная функция вызвана, который включает parfor цикл, чтобы накопить количество и суммарные итоги. Скопируйте и сохраните эту функцию, если вы хотите запустить пример.

function [total, count] = parforSumAndCountArrivalDelay(ds)

N = numpartitions(ds,gcp);
total = 0;
count = 0;
parfor ii = 1:N
    % Get partition ii of the datastore.
    subds = partition(ds,N,ii);
    
    [localTotal,localCount] = sumAndCountArrivalDelay(subds);
    total = total + localTotal;
    count = count + localCount;
end
end

Теперь код MATLAB вызывает эту новую функцию, так, чтобы подсчет и подведение итогов незначений NaN могли произойти в параллельных итерациях цикла.

p = parpool('local',4);

reset(ds);
tic
  [total,count] = parforSumAndCountArrivalDelay(ds)
parfortime = toc
mean = total/count
Starting parallel pool (parpool) using the 'local' profile ...
Connected to the parallel pool (number of workers: 4).

total =

    17211680


count =

     2417320


parfortime =

    6.4133


mean =

    7.1201

Вместо того, чтобы позволять программному обеспечению вычислить количество разделов, можно явным образом установить это значение, так, чтобы данные могли быть соответственно разделены, чтобы соответствовать алгоритму. Например, чтобы параллелизировать данные из spmd блокируйтесь, можно задать количество рабочих (numlabs) как количество разделов, чтобы использовать. Следующая функция использует spmd блокируйтесь, чтобы выполнить параллельное чтение, и явным образом определяет номер разделов, равных количеству рабочих. Чтобы запустить этот пример, скопируйте и сохраните функцию.

function [total,count] = spmdSumAndCountArrivalDelay(ds)
spmd
    subds = partition(ds,numlabs,labindex);
    [total,count] = sumAndCountArrivalDelay(subds);
end
total = sum([total{:}]);
count = sum([count{:}]);
end

Теперь код MATLAB вызывает функцию, которая использует spmd блок.

reset(ds);
tic
  [total,count] = spmdSumAndCountArrivalDelay(ds)
spmdtime = toc
mean = total/count
total =

    17211680


count =

     2417320


spmdtime =

    4.6729


mean =

    7.1201
delete(p);
Parallel pool using the 'local' profile is shutting down.

Вы можете получить некоторое представление о скромных повышениях производительности путем сравнения времен, зарегистрированных в переменных sumtime, parfortime, и spmdtime. Ваши результаты могут варьироваться, когда производительность может быть затронута размером datastore, параллельным размером пула, аппаратной конфигурацией и другими факторами.