parallel.pool.DataQueue

Отправляйте и слушайте данные между клиентом и работниками

    Описание

    A DataQueue позволяет асинхронно отправлять данные или сообщения от работников обратно клиенту в параллельном пуле во время расчета. Для примера можно получить промежуточные значения и указание на прогресс расчета.

    Чтобы отправить данные от работника параллельного пула обратно клиенту, сначала создайте DataQueue в клиенте. Передайте это DataQueue в parfor-цикл или другая параллельная конструкция языка, такая как spmd. От работников звоните send отправлять данные обратно клиенту. В клиенте регистрируйте функцию, которая будет вызываться каждый раз, когда данные принимаются при помощи afterEach.

    • Вы можете позвонить send от работника или клиента, который создает DataQueue, при необходимости.

    • Можно создать очередь работников и отправить ее обратно клиенту, чтобы включить коммуникацию в обратном направлении. Однако вы не можете отправить очередь от одного работника другому. Для передачи данных между работниками используйте spmd, labSend, или labReceive вместо этого.

    • В отличие от всех других указателей объектов, DataQueue и PollableDataQueue образцы остаются подключенными, когда их отправляют работникам.

    Создание

    Синтаксис

    Описание

    пример

    q = parallel.pool.DataQueue создает объект, который может использоваться для отправки или прослушивания сообщений (или данных) от различных работников. Создайте DataQueue на работнике или клиенте, где необходимо получить данные.

    Свойства

    расширить все

    Это свойство доступно только для чтения.

    Количество элементов данных, ожидающих удаления из очереди, заданное в виде нуля или положительного целого числа. Значение 0 или положительное целое число для работника или клиента, который создает PollableDataQueue образец. Если клиент создает PollableDataQueue образец, значение 0 на всех рабочих. Если рабочий создает PollableDataQueue, значение 0 на клиента и всех других работников.

    Функции объекта

    afterEachЗадайте функцию для вызова при получении новых данных в DataQueue
    sendОтправка данных от работника клиенту с помощью очереди данных

    Примеры

    свернуть все

    Создайте a DataQueue, и звоните afterEach.

    q = parallel.pool.DataQueue;
    afterEach(q, @disp);
    
    Запуск a parfor-цикл, и отправить сообщение. Ожидающее сообщение передается в afterEach функция, в этом примере @disp.

    parfor i = 1:3
        send(q, i); 
    end;
         1
    
         2
    
         3

    Для получения дополнительной информации о прослушивании данных с помощью a DataQueue, см. afterEach.

    Когда вы отправляете сообщение на DataQueue объект, сообщение ожидает в очереди, пока не будет обработано прослушивателем. Каждое сообщение добавляет 1 в длину очереди. В этом примере вы используете QueueLength свойство для поиска длины DataQueue объект.

    Когда клиент или рабочий процесс создает DataQueue Объекты все сообщения, отправляемые в очередь, хранятся в памяти этого клиента или работника. Если клиент создает DataQueue объект, QueueLength свойство на всех рабочих 0. В этом примере вы создаете DataQueue объект на клиенте и отправка данных от работника.

    Сначала создайте параллельный пул с одним рабочим.

    parpool(1);
    Starting parallel pool (parpool) using the 'local' profile ...
    Connected to the parallel pool (number of workers: 1).
    

    Затем создайте DataQueue.

    q = parallel.pool.DataQueue
    q = 
      DataQueue with properties:
    
        QueueLength: 0
    
    

    Недавно созданный DataQueue имеет пустую очередь. Можно использовать parfor для поиска q.QueueLength на работника. Найдите длину очереди на клиенте и длину очереди на работнике.

    fprintf('On the client: %i\n', q.QueueLength)
    On the client: 0
    
    parfor i = 1
        fprintf('On the worker: %i\n', q.QueueLength)
    end
    On the worker: 0
    

    Когда очередь пуста, QueueLength является 0 как для клиента, так и для работника. Затем отправьте сообщение в очередь от работника. Затем используйте QueueLength свойство для поиска длины очереди.

    % Send a message first
    parfor i = 1
        send(q, 'A message');
    end
    
    % Find the length
    fprintf('On the client: %i\n', q.QueueLength)
    On the client: 1
    
    parfor i = 1
        fprintf('On the worker: %i\n', q.QueueLength)
    end
    On the worker: 0
    

    The QueueLength свойство 1 на клиенте, и 0 на работника. Создайте прослушиватель, чтобы обработать очередь путем немедленного отображения данных.

    el = afterEach(q, @disp);

    Подождите, пока очередь будет пустой, а затем удалите прослушиватель.

    while q.QueueLength > 0
        pause(0.1);
    end
    delete(el);

    Используйте QueueLength свойство для поиска длины очереди.

    fprintf('On the client: %i\n', q.QueueLength)
    On the client: 0
    

    QueueLength является 0 поскольку обработка очереди завершена.

    В этом примере вы используете DataQueue для обновления полосы ожидания с прогресс parfor-цикл.

    Когда вы создаете parfor-цикл, вы разгружаете каждую итерацию рабочим в параллельном пуле. Информация возвращается от работников только тогда, когда parfor-цикл завершает. Можно использовать DataQueue для обновления полосы ожидания в конце каждой итерации.

    Когда вы обновляете панель ожидания с прогрессом вашего parfor-цикл, клиент должен записать информацию о том, сколько итераций остается.

    Совет

    Если вы создаете новый параллельный код и хотите контролировать прогресс вашего кода, рассмотрите использование parfeval рабочий процесс. Для получения дополнительной информации смотрите Обновление пользовательского интерфейса асинхронно Использование afterEach и afterAll.

    Функция помощника parforWaitbar, заданный в конце этого примера, обновляет панель ожидания. Функция использует persistent для хранения информации о количестве оставшихся итераций.

    Использовать waitbar чтобы создать панель ожидания, w.

    w = waitbar(0,'Please wait ...');

    Создайте DataQueue, D. Затем используйте afterEach чтобы запустить parforWaitbar после отправки сообщений в DataQueue.

    % Create DataQueue and listener
    D = parallel.pool.DataQueue;
    afterEach(D,@parforWaitbar);

    Установите количество итераций для вашего parfor-цикл, N. Используйте панель ожидания w и количество итераций N для инициализации функции parforWaitbar.

    В конце каждой итерации parfor-цикл, клиент запускается parforWaitbar и пошагово обновляет панель ожидания.

    N = 100;
    parforWaitbar(w,N)
    

    Функция parforWaitbar использует постоянные переменные для хранения количества завершенных итераций на клиенте. Никакой информации от работников не требуется.

    Запуск parfor-цикл с N итераций. В данном примере используйте pause и rand чтобы симулировать некоторые работы. После каждой итерации используйте send чтобы отправить сообщение в DataQueue. Когда сообщение отправляется в DataQueue, панель ожидания обновляется. Поскольку от работников не требуется никакой информации, отправьте пустое сообщение, чтобы избежать ненужной передачи данных.

    После parfor-цикл завершает, использование delete чтобы закрыть панель ожидания.

    parfor i = 1:N
        pause(rand)
        send(D,[]);
    end
    
    delete(w);
    

    Определите вспомогательную функцию parforWaitbar. Когда вы бегаете parforWaitbar с двумя входными параметрами функция инициализирует три постоянные переменные (count, h, и N). Когда вы бегаете parforWaitbar с одним входным аргументом обновляется панель ожидания.

    function parforWaitbar(waitbarHandle,iterations)
        persistent count h N
        
        if nargin == 2
            % Initialize
            
            count = 0;
            h = waitbarHandle;
            N = iterations;
        else
            % Update the waitbar
            
            % Check whether the handle is a reference to a deleted object
            if isvalid(h)
                count = count + 1;
                waitbar(count / N,h);
            end
        end
    end

    В этом примере показано, как выполнить сдвиг параллельного параметра с parfeval и отправьте результаты назад во время расчетов с DataQueue объект. parfeval не блокирует MATLAB, поэтому можно продолжать работу во время выполнения расчетов.

    Пример выполняет свип параметра на системе Лоренца обыкновенных дифференциальных уравнений, на параметрах σ и ρ, и показывает хаотичность этой системы.

    ddtx=σ(y-z)ddty=x(ρ-z)-yddtz=xy-βx

    Создайте сетку параметров

    Задайте область значений параметров, которые вы хотите исследовать в сдвиге параметра.

    gridSize = 40;
    sigma = linspace(5, 45, gridSize);
    rho = linspace(50, 100, gridSize);
    beta = 8/3;

    Создайте 2-D сетку параметров при помощи meshgrid функция.

    [rho,sigma] = meshgrid(rho,sigma);

    Создайте объект рисунка и установите 'Visible' на true чтобы он открылся в новом окне, вне live скрипта. Чтобы визуализировать результаты сдвига параметра, создайте объемную поверхностную диаграмму. Обратите внимание, что инициализация Z компонент поверхности с NaN создает пустой график.

    figure('Visible',true);
    surface = surf(rho,sigma,NaN(size(sigma)));
    xlabel('\rho','Interpreter','Tex')
    ylabel('\sigma','Interpreter','Tex')

    Настройка параллельного окружения

    Создайте пул параллельных рабочих мест с помощью parpool функция.

    parpool;
    Starting parallel pool (parpool) using the 'local' profile ...
    Connected to the parallel pool (number of workers: 6).
    

    Чтобы отправить данные от работников, создайте DataQueue объект. Настройте функцию, которая обновляет объемную поверхностную диаграмму каждый раз, когда рабочий отправляет данные при помощи afterEach функция. The updatePlot функция является вспомогательной функцией, заданной в конце примера.

    Q = parallel.pool.DataQueue;
    afterEach(Q,@(data) updatePlot(surface,data));

    Выполните параллельное протягивание параметра

    После определения параметров можно выполнить сдвиг параллельного параметра.

    parfeval работает более эффективно, когда вы распределяете рабочую нагрузку. Чтобы распределить рабочую нагрузку, сгруппируйте параметры для исследования в разделы. В данном примере разделите на равномерные разделы размера step при помощи оператора двоеточия (:). Получившийся массив partitions содержит контуры разделов. Обратите внимание, что необходимо добавить конечную точку последнего раздела.

    step = 100;
    partitions = [1:step:numel(sigma), numel(sigma)+1]
    partitions = 1×17
    
               1         101         201         301         401         501         601         701         801         901        1001        1101        1201        1301        1401        1501        1601
    
    

    Для оптимальной эффективности попробуйте разделить на разделы:

    • Достаточно большой, чтобы время расчета было большим по сравнению с накладными расходами на планирование раздела.

    • Достаточно маленькие, чтобы было достаточно разделов, чтобы все рабочие были заняты.

    Чтобы представлять выполнение функций на параллельных работниках и сохранить их результаты, используйте будущие объекты.

    f(1:numel(partitions)-1) = parallel.FevalFuture;

    Выгрузка расчетов параллельным рабочим с помощью parfeval функция. parameterSweep является вспомогательной функцией, заданной в конце этого скрипта, которая решает систему Лоренца на разделе параметров, которые нужно исследовать. У него есть один выходной аргумент, поэтому вы должны задать 1 как количество выходов в parfeval.

    for ii = 1:numel(partitions)-1
        f(ii) = parfeval(@parameterSweep,1,partitions(ii),partitions(ii+1),sigma,rho,beta,Q);
    end

    parfeval не блокирует MATLAB, поэтому можно продолжать работу во время выполнения расчетов. Рабочие вычисляют параллельно и отправляют промежуточные результаты через DataQueue как только они станут доступными.

    Если вы хотите заблокировать MATLAB до parfeval завершает, используйте wait функция на будущих объектах. Использование wait функция полезна, когда последующий код зависит от завершения parfeval.

    wait(f);

    После parfeval завершает расчеты, wait завершает, и вы можете выполнить больше кода. Для примера постройте график контура полученной поверхности. Используйте fetchOutputs функция для извлечения результатов, сохраненных в будущих объектах.

    results = reshape(fetchOutputs(f),gridSize,[]);
    contourf(rho,sigma,results)
    xlabel('\rho','Interpreter','Tex')
    ylabel('\sigma','Interpreter','Tex')

    Если вашему параметру sweep нужно больше вычислительных ресурсов и у вас есть доступ к кластеру, можно масштабировать parfeval расчеты. Дополнительные сведения см. в разделе Шкале с рабочего стола на кластер.

    Определите вспомогательные функции

    Задайте вспомогательную функцию, которая решает систему Лоренца на разбиении параметров, которые нужно исследовать. Отправка промежуточных результатов клиенту MATLAB при помощи send функция на DataQueue объект.

    function results = parameterSweep(first,last,sigma,rho,beta,Q)
        results = zeros(last-first,1);
        for ii = first:last-1
            lorenzSystem = @(t,a) [sigma(ii)*(a(2) - a(1)); a(1)*(rho(ii) - a(3)) - a(2); a(1)*a(2) - beta*a(3)];
            [t,a] = ode45(lorenzSystem,[0 100],[1 1 1]);
            result = a(end,3);
            send(Q,[ii,result]);
            results(ii-first+1) = result;
        end
    end

    Задайте другую вспомогательную функцию, которая обновляет объемную поверхностную диаграмму при поступлении новых данных.

    function updatePlot(surface,data)
        surface.ZData(data(1)) = data(2);
        drawnow('limitrate');
    end
    Введенный в R2017a