Разбиение Datastore параллельно

Разбиение datastore параллельно с фрагментом datastore на каждом рабочем месте в параллельном пуле может обеспечить преимущества во многих случаях:

  • Выполните некоторое действие только для одной части всего datastore или для нескольких определенных частей одновременно.

  • Поиск конкретных значений в хранилище данных со всеми работниками, действующими одновременно на собственных разделах.

  • Выполните расчет сокращения для работников во всех разделах.

В этом примере показано, как использовать partition для параллелизации считывания данных из datastore. Он использует небольшой datastore данных авиакомпаний, представленных в MATLAB®и находит среднее значение Незначений NaN из его 'ArrDelay' столбец.

Простой способ вычислить среднее значение - разделить сумму всех Незначений NaN на количество Незначений NaN. Следующий код делает это для datastore сначала непараллельным способом. Чтобы начать, вы задаете функцию, чтобы усилить количество и сумму. Если вы хотите запустить этот пример, скопируйте и сохраните эту функцию в папке в пути поиска файлов команды MATLAB.

function [total,count] = sumAndCountArrivalDelay(ds)
    total = 0;
    count = 0;
    while hasdata(ds)
        data = read(ds);
        total = total + sum(data.ArrDelay,1,'OmitNaN');
        count = count + sum(~isnan(data.ArrDelay));
    end
end

Следующий код создает datastore, вызывает функцию и вычисляет среднее значение без какого-либо параллельного выполнения. tic и toc функции используются для времени выполнения, здесь и в более поздних параллельных случаях.

ds = datastore(repmat({'airlinesmall.csv'},20,1),'TreatAsMissing','NA');
ds.SelectedVariableNames = 'ArrDelay';

reset(ds);
tic
  [total,count] = sumAndCountArrivalDelay(ds)
sumtime = toc
mean = total/count
total =

    17211680


count =

     2417320


sumtime =

    7.7905


mean =

    7.1201

partition функция позволяет вам разбить datastore на меньшие части, каждый из которых представлен как сам datastore. Эти меньшие хранилища данных работают полностью независимо друг от друга, так что вы можете работать с ними внутри параллельных языковых функций, таких как parfor циклы и spmd блоки.

Количество разделов в следующем коде устанавливается numpartitions функция, основанная на самом datastore (ds) и параллельный пул (gcp) размер. Это не обязательно равняется количеству работников в пуле. В этом случае количество итераций цикла затем устанавливается на количество разбиений (N).

Следующий код запускает параллельный пул в локальном кластере, а затем разделяет datastore среди рабочих процессов для итерации по циклу. Снова вызывается отдельная функция, которая включает в себя parfor цикл для усиления итогов подсчета и суммы. Скопируйте и сохраните эту функцию, если вы хотите запустить пример.

function [total, count] = parforSumAndCountArrivalDelay(ds)
    N = numpartitions(ds,gcp);
    total = 0;
    count = 0;    
    parfor ii = 1:N
        % Get partition ii of the datastore.
        subds = partition(ds,N,ii);
    
        [localTotal,localCount] = sumAndCountArrivalDelay(subds);
        total = total + localTotal;
        count = count + localCount;
    end
end

Теперь код MATLAB вызывает эту новую функцию, так что подсчет и суммирование Незначений NaN могут происходить параллельно итерациям цикла.

p = parpool('local',4);

reset(ds);
tic
[total,count] = parforSumAndCountArrivalDelay(ds)
parfortime = toc
mean = total/count
Starting parallel pool (parpool) using the 'local' profile ...
Connected to the parallel pool (number of workers: 4).

total =

    17211680


count =

     2417320


parfortime =

    6.4133


mean =

    7.1201

Вместо того, чтобы позволить программному обеспечению вычислить количество разделов, можно явно задать это значение, чтобы данные могли быть соответствующим образом разбиты в соответствии с вашим алгоритмом. Например, чтобы параллелизировать данные из spmd блок, можно задать количество работников (numlabs) как количество используемых разделов. Следующая функция использует spmd блок для выполнения параллельного чтения и явным образом устанавливает количество разделов равным количеству рабочих мест. Чтобы запустить этот пример, скопируйте и сохраните функцию.

function [total,count] = spmdSumAndCountArrivalDelay(ds)
    spmd
        subds = partition(ds,numlabs,labindex);
        [total,count] = sumAndCountArrivalDelay(subds);    
    end
    total = sum([total{:}]);
    count = sum([count{:}]);
end

Теперь код MATLAB вызывает функцию, которая использует spmd блок.

reset(ds);
tic
[total,count] = spmdSumAndCountArrivalDelay(ds)
spmdtime = toc
mean = total/count
total =

    17211680


count =

     2417320


spmdtime =

    4.6729


mean =

    7.1201
delete(p);
Parallel pool using the 'local' profile is shutting down.

Вы можете получить некоторое представление о скромном повышении эффективности путем сравнения времени, записанного в переменных sumtime, parfortime, и spmdtime. Результаты могут варьироваться, так как на эффективность могут влиять размер datastore, размер параллельного пула, аппаратные строения и другие факторы.