parallel.pool.DataQueue

Отправьте и прислушайтесь к данным между клиентом и рабочими

    Описание

    DataQueue включает асинхронные данные об отправке или сообщения от рабочих назад клиенту в параллельном пуле, в то время как расчет выполняется. Например, можно получить промежуточные значения и индикацию относительно прогресса расчета.

    Чтобы отправить данные от параллельного рабочего пула назад клиенту, сначала создайте DataQueue в клиенте. Передайте этот DataQueue в parfor- цикл или другое построение языка параллельного программирования, такое как spmd. От рабочих вызвать send передать данные обратно клиенту. В клиенте зарегистрируйте функцию, которая будет названа каждый раз, когда данные получены при помощи afterEach.

    • Можно вызвать send от рабочего или клиента, который создает DataQueue, при необходимости.

    • Можно создать очередь на рабочих и передать ее обратно клиенту, чтобы включить коммуникацию в обратном направлении. Однако вы не можете отправить очередь от одного рабочего другому. Чтобы передать данные между рабочими, используйте spmd, labSend, или labReceive вместо этого.

    • В отличие от всех других объектов указателя, DataQueue и PollableDataQueue экземпляры действительно остаются связанными, когда они отправляются рабочим.

    Создание

    Синтаксис

    Описание

    пример

    q = parallel.pool.DataQueue создает объект, который может использоваться, чтобы отправить или прислушаться к сообщениям (или данные) от различных рабочих. Создайте DataQueue на рабочем или клиенте, где вы хотите получить данные.

    Свойства

    развернуть все

    Это свойство доступно только для чтения.

    Количество элементов данных, ожидающих, чтобы быть удаленным из очереди в виде нулевого или положительного целого числа. Значением является 0 или положительное целое число на рабочем или клиенте, который создает PollableDataQueue экземпляр. Если клиент создает PollableDataQueue экземпляр, значением является 0 на всех рабочих. Если рабочий создает PollableDataQueue, значением является 0 на клиенте и всех других рабочих.

    Функции объекта

    afterEachЗадайте функцию, чтобы вызвать, когда новые данные будут получены на DataQueue
    sendОтправьте данные от рабочего клиенту, использующему очередь данных

    Примеры

    свернуть все

    Создайте a DataQueue, и вызовите afterEach.

    q = parallel.pool.DataQueue;
    afterEach(q, @disp);
    
    Запустите a parfor- цикл, и отправляет сообщение. Незаконченное сообщение передается afterEach функция, в этом примере @disp.

    parfor i = 1:3
        send(q, i); 
    end;
         1
    
         2
    
         3

    Для получения дополнительной информации о прислушивании к данным с помощью a DataQueue, смотрите afterEach.

    Когда вы отправляете сообщение в DataQueue объект, сообщение ожидает в очереди, пока это не обрабатывается прослушивателем. Каждое сообщение добавляет 1 к длине очереди. В этом примере вы используете QueueLength свойство найти длину DataQueue объект.

    Когда клиент или рабочий создают DataQueue объект, любые сообщения, которые отправляются очереди, сохранен в памяти о том клиенте или рабочем. Если клиент создает DataQueue объект, QueueLength свойством на всех рабочих является 0. В этом примере вы создаете DataQueue объект на клиенте, и отправляет данные от рабочего.

    Во-первых, создайте параллельный пул с одним рабочим.

    parpool(1);
    Starting parallel pool (parpool) using the 'local' profile ...
    Connected to the parallel pool (number of workers: 1).
    

    Затем создайте DataQueue.

    q = parallel.pool.DataQueue
    q = 
      DataQueue with properties:
    
        QueueLength: 0
    
    

    Недавно созданный DataQueue имеет пустую очередь. Можно использовать parfor найти q.QueueLength на рабочем. Найдите длину очереди на клиенте и длину очереди на рабочем.

    fprintf('On the client: %i\n', q.QueueLength)
    On the client: 0
    
    parfor i = 1
        fprintf('On the worker: %i\n', q.QueueLength)
    end
    On the worker: 0
    

    Когда очередь пуста, QueueLength 0 и для клиента и для рабочего. Затем отправьте сообщение очереди от рабочего. Затем используйте QueueLength свойство найти длину очереди.

    % Send a message first
    parfor i = 1
        send(q, 'A message');
    end
    
    % Find the length
    fprintf('On the client: %i\n', q.QueueLength)
    On the client: 1
    
    parfor i = 1
        fprintf('On the worker: %i\n', q.QueueLength)
    end
    On the worker: 0
    

    QueueLength свойством является 1 на клиенте и 0 на рабочем. Создайте прослушиватель, чтобы обработать очередь путем мгновенного отображения данных.

    el = afterEach(q, @disp);

    Ожидайте, пока очередь не пуста, затем удалите прослушиватель.

    while q.QueueLength > 0
        pause(0.1);
    end
    delete(el);

    Используйте QueueLength свойство найти длину очереди.

    fprintf('On the client: %i\n', q.QueueLength)
    On the client: 0
    

    QueueLength 0 потому что обработка очереди завершена.

    В этом примере вы используете DataQueue обновить панель ожидания с прогрессом parfor- цикл.

    Когда вы создаете parfor- цикл, вы разгружаете каждую итерацию рабочим в параллельном пуле. Информация только возвращена от рабочих когда parfor- цикл завершается. Можно использовать DataQueue обновить панель ожидания в конце каждой итерации.

    Когда вы обновляете панель ожидания с прогрессом вашего parfor- цикл, клиент должен записать информацию о том, сколько итераций остается.

    Совет

    Если вы создаете новый параллельный код и хотите контролировать прогресс своего кода, рассмотреть использование a parfeval рабочий процесс. Для получения дополнительной информации смотрите Пользовательский интерфейс Обновления Асинхронно Используя afterEach и afterAll.

    Функция помощника parforWaitbar, заданный в конце этого примера, обновляет панель ожидания. Функция использует persistent хранить информацию о количестве остающихся итераций.

    Использование waitbar создать панель ожидания, w.

    w = waitbar(0,'Please wait ...');

    Создайте DataQueueD. Затем используйте afterEach запускать parforWaitbar после того, как сообщения отправляются в DataQueue.

    % Create DataQueue and listener
    D = parallel.pool.DataQueue;
    afterEach(D,@parforWaitbar);

    Определите номер итераций для вашего parfor- цикл, N. Используйте панель ожидания w и количество итераций N инициализировать функциональный parforWaitbar.

    В конце каждой итерации parfor- цикл, клиент запускает parforWaitbar и инкрементно обновляет панель ожидания.

    N = 100;
    parforWaitbar(w,N)
    

    Функциональный parforWaitbar использует персистентные переменные, чтобы сохранить количество завершенных итераций на клиенте. Никакая информация не запрашивается от рабочих.

    Запустите parfor- цикл с N итерации. В данном примере используйте pause и rand чтобы симулировать некоторых работают. После каждой итерации используйте send отправить сообщение в DataQueue. Когда сообщение отправляется в DataQueue, обновления панели ожидания. Поскольку никакая информация не запрашивается от рабочих, отправьте пустое сообщение, чтобы избежать ненужной передачи данных.

    После parfor- цикл завершается, использовать delete закрыть панель ожидания.

    parfor i = 1:N
        pause(rand)
        send(D,[]);
    end
    
    delete(w);
    

    Задайте функцию помощника parforWaitbar. Когда вы запускаетесь parforWaitbar с двумя входными параметрами функция инициализирует три персистентных переменные (countH, и N). Когда вы запускаетесь parforWaitbar с одним входным параметром, обновлениями панели ожидания.

    function parforWaitbar(waitbarHandle,iterations)
        persistent count h N
        
        if nargin == 2
            % Initialize
            
            count = 0;
            h = waitbarHandle;
            N = iterations;
        else
            % Update the waitbar
            
            % Check whether the handle is a reference to a deleted object
            if isvalid(h)
                count = count + 1;
                waitbar(count / N,h);
            end
        end
    end

    В этом примере показано, как выполнить параллельную развертку параметра с parfeval и передайте результаты обратно во время расчетов с DataQueue объект. parfeval не делает блока MATLAB, таким образом, можно продолжить работать, в то время как расчеты происходят.

    Пример выполняет развертку параметра в системе Лоренца обыкновенных дифференциальных уравнений на параметрах σ и ρ, и показывает хаотическую природу этой системы.

    ddtx=σ(y-z)ddty=x(ρ-z)-yddtz=xy-βx

    Создайте сетку параметра

    Задайте область значений параметров, которые вы хотите исследовать в развертке параметра.

    gridSize = 40;
    sigma = linspace(5, 45, gridSize);
    rho = linspace(50, 100, gridSize);
    beta = 8/3;

    Создайте 2D сетку параметров при помощи meshgrid функция.

    [rho,sigma] = meshgrid(rho,sigma);

    Создайте объект фигуры и установите 'Visible' к true так, чтобы это открылось в новом окне, за пределами live скрипта. Чтобы визуализировать результаты развертки параметра, создайте объемную поверхностную диаграмму. Обратите внимание на то, что инициализация Z компонент поверхности с NaN создает пустой график.

    figure('Visible',true);
    surface = surf(rho,sigma,NaN(size(sigma)));
    xlabel('\rho','Interpreter','Tex')
    ylabel('\sigma','Interpreter','Tex')

    Настройте параллельную среду

    Создайте пул параллельных рабочих при помощи parpool функция.

    parpool;
    Starting parallel pool (parpool) using the 'local' profile ...
    Connected to the parallel pool (number of workers: 6).
    

    Чтобы отправить данные от рабочих, создайте DataQueue объект. Настройте функцию, которая обновляет объемную поверхностную диаграмму каждый раз, когда рабочий отправляет данные при помощи afterEach функция. updatePlot функция является функцией, определяемой поддержки в конце примера.

    Q = parallel.pool.DataQueue;
    afterEach(Q,@(data) updatePlot(surface,data));

    Выполните параллельную развертку параметра

    После того, как вы зададите параметры, можно выполнить параллельную развертку параметра.

    parfeval работает более эффективно, когда вы распределяете рабочую нагрузку. Чтобы распределить рабочую нагрузку, сгруппируйте параметры, чтобы исследовать в разделы. В данном примере разделите в универсальные разделы размера step при помощи оператора двоеточия (:). Полученный массив partitions содержит контуры разделов. Обратите внимание на то, что необходимо добавить конечную точку последнего раздела.

    step = 100;
    partitions = [1:step:numel(sigma), numel(sigma)+1]
    partitions = 1×17
    
               1         101         201         301         401         501         601         701         801         901        1001        1101        1201        1301        1401        1501        1601
    
    

    Для лучшей эффективности попытайтесь разделить в разделы, которые являются:

    • Достаточно большой, что время вычисления является большим по сравнению с издержками планирования раздела.

    • Достаточно маленький, что существует достаточно разделов, чтобы заставить всех рабочих напряженно трудиться.

    Чтобы представлять функциональное выполнение на параллельных рабочих и содержать их результаты, используйте будущие объекты.

    f(1:numel(partitions)-1) = parallel.FevalFuture;

    Разгрузите расчеты, чтобы быть параллельными рабочим при помощи parfeval функция. parameterSweep функция, определяемая помощника в конце этого скрипта, который решает систему Лоренца на разделе параметров, чтобы исследовать. Это имеет один выходной аргумент, таким образом, необходимо задать 1 как количество выходных параметров в parfeval.

    for ii = 1:numel(partitions)-1
        f(ii) = parfeval(@parameterSweep,1,partitions(ii),partitions(ii+1),sigma,rho,beta,Q);
    end

    parfeval не делает блока MATLAB, таким образом, можно продолжить работать, в то время как расчеты происходят. Рабочие вычисляют параллельно и отправляют промежуточные результаты через DataQueue как только они становятся доступными.

    Если вы хотите с блоком MATLAB до parfeval завершается, используйте wait функция на будущих объектах. Используя wait функция полезна, когда последующий код зависит от завершения parfeval.

    wait(f);

    После parfeval закончил расчеты, wait концы и вы можете выполнить больше кода. Например, постройте контур получившейся поверхности. Используйте fetchOutputs функция, чтобы получить результаты, сохраненные в будущих объектах.

    results = reshape(fetchOutputs(f),gridSize,[]);
    contourf(rho,sigma,results)
    xlabel('\rho','Interpreter','Tex')
    ylabel('\sigma','Interpreter','Tex')

    Если вашей развертке параметра нужно больше вычислительных ресурсов, и у вас есть доступ к кластеру, можно увеличить parfeval расчеты. Для получения дополнительной информации смотрите, Масштабируют от Рабочего стола до Кластера.

    Определение функций помощника

    Задайте функцию помощника, которая решает систему Лоренца на разделе параметров, чтобы исследовать. Отправьте промежуточные результаты клиенту MATLAB при помощи send функция на DataQueue объект.

    function results = parameterSweep(first,last,sigma,rho,beta,Q)
        results = zeros(last-first,1);
        for ii = first:last-1
            lorenzSystem = @(t,a) [sigma(ii)*(a(2) - a(1)); a(1)*(rho(ii) - a(3)) - a(2); a(1)*a(2) - beta*a(3)];
            [t,a] = ode45(lorenzSystem,[0 100],[1 1 1]);
            result = a(end,3);
            send(Q,[ii,result]);
            results(ii-first+1) = result;
        end
    end

    Задайте другую функцию помощника, которая обновляет объемную поверхностную диаграмму, когда новые данные прибывают.

    function updatePlot(surface,data)
        surface.ZData(data(1)) = data(2);
        drawnow('limitrate');
    end
    Введенный в R2017a