Пакетная обработка данных спектров Используя последовательные и параллельные вычисления

Этот пример показывает, как можно использовать одиночный компьютер, многоядерный компьютер или кластер компьютеров, чтобы предварительно обработать большой набор сигналов масс-спектрометрии. Примечание: Parallel Computing Toolbox™ и MATLAB® Parallel Server™ требуются для последней части этого примера.

Введение

Этот пример показывает необходимые шаги, чтобы настроить пакетную обработку по группе массовых спектров, содержавшихся в одной или нескольких директориях. Можно достигнуть этого последовательно, или в параллели с помощью или многоядерного компьютера или кластера компьютеров. Пакетная обработка данных адаптируется к модели параллельных вычислений нескольких-данных одно программы (SPMD), и это подходит лучше всего для Parallel Computing Toolbox™ и MATLAB® Parallel Server™.

Сигналы предварительно обработать прибывают из белка улучшенный поверхностью лазер desorption/ionization-time рейса (SELDI-TOF) спектры массы. Данные в этом примере являются от FDA-NCI Клиническим Банком данных Программы Протеомики (http://home.ccr.cancer.gov/ncifdaproteomics/). В частности, пример использует набор данных рака яичника с высоким разрешением, который был сгенерирован с помощью массива белка WCX2.

Установка репозитория для данных

Этот пример принимает, что вы загрузили и распаковали наборы данных в свой репозиторий. Идеально, необходимо поместить наборы данных в сетевой диск. Если рабочие, у всех есть доступ к тем же дискам в сети, они могут получить доступ к необходимым данным, которые находятся на этих совместно используемых ресурсах. Это - предпочтительный метод для обмена данными, когда это минимизирует сетевой трафик.

Во-первых, завоюйте репутацию и полный путь к репозиторию данных. Заданы две строки: путь от локального компьютера до репозитория и путь, требуемый кластерными компьютерами получить доступ к той же директории. Измените обоих соответственно на вашу конфигурацию сети.

local_repository  = '//home/user/MassSpecRepository/OvarianCD_PostQAQC/';
worker_repository = '//public/user/MassSpecRepository/OvarianCD_PostQAQC/';

Для этого конкретного примера файлы хранятся в двух подкаталогах: 'Normal' и 'Рак'. Можно создать списки, содержащие файлы к процессу с помощью команды DIR,

cancerFiles = dir([local_repository 'Cancer/*.txt'])
normalFiles = dir([local_repository 'Normal/*.txt'])
cancerFiles = 

121x1 struct array with fields:

    name
    date
    bytes
    isdir
    datenum


normalFiles = 

95x1 struct array with fields:

    name
    date
    bytes
    isdir
    datenum

и помещенный их в одну переменную:

files = [ strcat('Cancer/',{cancerFiles.name}) ...
          strcat('Normal/',{normalFiles.name})];
N = numel(files)   % total number of files
N =

   216

Последовательная пакетная обработка данных

Прежде, чем попытаться обработать все файлы параллельно, необходимо протестировать алгоритмы локально с циклом for.

Запишите функцию с последовательным набором инструкций, которые должны быть применены к каждому набору данных. Входные параметры являются путем к данным (в зависимости от того, как машина, которая на самом деле сделает работу, видит их), и список файлов к процессу. Выходными аргументами являются предварительно обработанные сигналы и вектор M/Z. Поскольку вектор M/Z является тем же самым для каждой спектрограммы после предварительной обработки, необходимо сохранить его только однажды. Например:

type msbatchprocessing
function [MZ,Y] = msbatchprocessing(repository,files)
% MSBATCHPROCESSING Example function for BIODISTCOMPDEMO
%
% [MZ,Y] = MSBATCHPROCESSING(REPOSITORY,FILES) Preprocesses the
% spectrogram in files FILES and returns the mass/charge (MZ) and ion
% intensities (Y) vectors.
%
% Hard-coded parameters in the preprocessing steps have been adjusted to
% deal with the high-resolution spectrograms of the example.

% Copyright 2004-2013 The MathWorks, Inc.

K = numel(files);
Y = zeros(15000,K); % need to preset the size of Y for memory performance
MZ = zeros(15000,1);

parfor k = 1:K

    file = [repository files{k}]; 
    
    % read the two-column text file with mass-charge and intensity values
    fid = fopen(file,'r');
    ftext = textscan(fid, '%f%f');
    fclose(fid);
    signal = ftext{1};
    intensity = ftext{2};

    % resample the signal to 15000 points between 2000 and 11900
    mzout = (sqrt(2000)+(0:(15000-1))'*diff(sqrt([2000,11900]))/15000).^2;
    [mz,YR] = msresample(signal,intensity,mzout);
    
    % align the spectrograms to two good reference peaks
    P = [3883.766 7766.166];
    YA = msalign(mz,YR,P,'WIDTH',2);

    % estimate and adjust the background
    YB = msbackadj(mz,YA,'STEP',50,'WINDOW',50);

    % reduce the noise using a nonparametric filter
    Y(:,k) = mslowess(mz,YB,'SPAN',5);
    
    % the mass/charge vector is the same for all spectra after the resample
    if k==1
        MZ(:,k) = mz;
    end

end

Отметьте в функциональном MSBATCHPROCESSING намеренное использование PARFOR вместо FOR. Пакетная обработка данных обычно реализуется задачами, которые независимы между итерациями. В таком случае оператор FOR может безразлично быть изменен на PARFOR, создав последовательность операторов MATLAB® (или программа), который может запуститься беспрепятственно на последовательном компьютере, многоядерном компьютере или кластере компьютеров, не имея необходимость изменять его. В этом случае цикл выполняется последовательно, потому что вы не создали Параллельный Пул (принимающий, что в Настройках Parallel Computing Toolbox™ флажок для того, чтобы автоматически создать Параллельный Пул не проверяется, в противном случае MATLAB выполнится параллельно так или иначе). Для демонстрационных целей только 20 спектрограмм предварительно обрабатываются и хранятся в матрице Y. Можно измерить количество времени, которое MATLAB® занимает, чтобы завершить цикл с помощью команд TOC и TIC.

tic
repository = local_repository;
K = 20; % change to N to do all

[MZ,Y] = msbatchprocessing(repository,files(1:K));

disp(sprintf('Sequential time for %d spectrograms: %f seconds',K,toc))
Sequential time for 20 spectrograms: 29.937129 seconds

Параллельная пакетная обработка данных с многоядерными компьютерами

Если у вас есть Parallel Computing Toolbox™, можно использовать локальных рабочих, чтобы параллелизировать итерации цикла. Например, если ваша локальная машина имеет четыре ядра, можно запустить Параллельный Пул с четырех рабочих, использующих 'локальный' кластерный профиль по умолчанию:

POOL = parpool('local',4);

tic
repository = local_repository;
K = 20; % change to N to do all

[MZ,Y] = msbatchprocessing(repository,files(1:K));

disp(sprintf('Parallel time with four local workers for %d spectrograms: %f seconds',K,toc))
Starting parallel pool (parpool) using the 'local' profile ... connected to 4 workers.
Parallel time with four local workers for 20 spectrograms: 10.876799 seconds

Остановите локальный пул рабочего:

delete(POOL)
Parallel pool using the 'local' profile has been stopped. 

Параллельная пакетная обработка данных с распределенными вычислениями

Если у вас есть Parallel Computing Toolbox™ и MATLAB® Parallel Server™, можно также распределить итерации цикла большему числу компьютеров. В этом примере кластерный профиль 'compbio_config_01' соединяется с двумя машинами с 8 ядрами (16 рабочих). Для получения информации о подготовке и выборе параллельных настроек, см. "Кластерные Профили и Вычисление, Масштабирующееся" в документации Parallel Computing Toolbox™.

Обратите внимание на то, что, если вы записали свою собственную функцию пакетной обработки данных, необходимо включать ее в соответствующий кластерный профиль при помощи Кластерного менеджера по Профилю. Это гарантирует, что MATLAB® правильно передает вашу новую функцию рабочим. Вы получаете доступ к Кластерному менеджеру по Профилю, использующему выпадающее меню Parallel на рабочем столе MATLAB®.

POOL = parpool('compbio_config_01',16);

tic
repository = worker_repository;
K = 20; % change to N to do all

[MZ,Y] = msbatchprocessing(repository,files(1:K));

disp(sprintf('Parallel time with 16 remote workers for %d spectrograms: %f seconds',K,toc))
Starting parallel pool (parpool) using the 'compbio_config_01' profile ... connected to 16 workers.
Parallel time with 16 remote workers for 20 spectrograms: 11.121062 seconds

Остановите кластерный пул:

delete(POOL)
Parallel pool using the 'compbio_config_01' profile has been stopped. 

Асинхронная параллельная пакетная обработка данных

Схемы выполнения, описанные, прежде всего, действуют синхронно, то есть, они блокируют командную строку MATLAB®, пока их выполнение не завершается. Если вы хотите запустить задание процесса пакетной обработки и получить доступ к командной строке, в то время как вычисления запускаются асинхронно (асинхронный), можно вручную распределить параллельные задачи и собрать результаты позже. Этот пример использует тот же кластерный профиль как прежде.

Создайте одно задание с одной задачей (MSBATCHPROCESSING). Задача работает на одном из рабочих, и его внутренний цикл PARFOR распределяется среди всех доступных рабочих в параллельной настройке. Обратите внимание на то, что, если N (количество спектрограмм) намного больше, чем количество доступных рабочих в вашей параллельной настройке, Parallel Computing Toolbox™ автоматически балансирует рабочую нагрузку, даже если у вас есть неоднородный кластер.

tic % start the clock
repository = worker_repository;
K = N; % do all spectrograms
CLUSTER = parcluster('compbio_config_01');
JOB = createCommunicatingJob(CLUSTER,'NumWorkersRange',[16 16]);
TASK = createTask(JOB,@msbatchprocessing,2,{repository,files(1:K)});

submit(JOB)

Когда задание представлено, ваша локальная подсказка MATLAB® сразу возвращается. Ваше параллельное задание запускается, если параллельные ресурсы становятся доступными. Между тем можно контролировать параллельное задание путем осмотра объектов JOB или TASK. Используйте метод WAIT, чтобы программно ожидать, пока ваша задача не закончится:

wait(TASK)
TASK.OutputArguments
ans = 

    [15000x1 double]    [15000x216 double]

MZ = TASK.OutputArguments{1};
Y = TASK.OutputArguments{2};
destroy(JOB) % done retrieving the results
disp(sprintf('Parallel time (asynchronous) with 16 remote workers for %d spectrograms: %f seconds',K,toc))
Parallel time (asynchronous) with 16 remote workers for 216 spectrograms: 131.031854 seconds

Постобработка

После сбора всех данных можно использовать его локально. Например, можно применить нормализацию группы:

Y = msnorm(MZ,Y,'QUANTILE',0.5,'LIMITS',[3500 11000],'MAX',50);

Создайте группирующийся вектор с типом для каждой спектрограммы, а также векторы индексации. Эта "маркировка" поможет, чтобы выполнить последующий анализ на наборе данных.

grp = [repmat({'Cancer'},size(cancerFiles));...
       repmat({'Normal'},size(normalFiles))];
cancerIdx = find(strcmp(grp,'Cancer'));
numel(cancerIdx) % number of files in the "Cancer" subdirectory
ans =

   121

normalIdx = find(strcmp(grp,'Normal'));
numel(normalIdx) % number of files in the "Normal" subdirectory
ans =

    95

Если данные маркированы, можно отобразить некоторые спектрограммы каждого класса с помощью различного цвета (первые пять из каждой группы в этом примере).

h = plot(MZ,Y(:,cancerIdx(1:5)),'b',MZ,Y(:,normalIdx(1:5)),'r');
axis([7650 8200 -2 50])
xlabel('Mass/Charge (M/Z)');ylabel('Relative Intensity')
legend(h([1 end]),{'Ovarian Cancer','Control'})
title('Region of the pre-processed spectrograms')

Сохраните предварительно обработанный набор данных, потому что он будет использоваться в примерах, Идентифицирующих Значительные Функции и Классифицирующих Профили Белка и Поиск Генетического алгоритма Функций в Данных о Масс-спектрометрии.

save OvarianCancerQAQCdataset.mat Y MZ grp

Правовая оговорка

Тик- синхронизация TOC представлена здесь как пример. Последовательное и параллельное время выполнения будет отличаться в зависимости от оборудования, которое вы используете. В этом примере мы использовали Intel® Xeon®, с 4 ядрами (2.66 ГГц, 24 ГБ, win64) как локальная машина и 16 рабочих, использующих 4 Intel® Xeon®, с 8 ядрами (2.4 ГГц, 32 ГБ, Linux®) машины. Менеджер по заданию также работает на компьютере Linux.

Для просмотра документации необходимо авторизоваться на сайте