exponenta event banner

parallel.pool. DataQueue

Отправка и прослушивание данных между клиентом и работниками

    Описание

    A DataQueue позволяет асинхронно отправлять данные или сообщения от работников обратно клиенту в параллельном пуле во время выполнения вычисления. Например, можно получить промежуточные значения и индикацию хода выполнения вычислений.

    Чтобы отправить данные от работника параллельного пула обратно клиенту, сначала создайте DataQueue в клиенте. Передать это DataQueue в parfor-loop или другая параллельная языковая конструкция, такая как spmd. От работников звоните send для отправки данных обратно клиенту. В клиенте регистрируют вызываемую функцию каждый раз, когда данные принимаются с помощью afterEach.

    • Вы можете позвонить send от работника или клиента, который создает DataQueue, при необходимости.

    • Можно создать очередь на работников и отправить ее обратно клиенту, чтобы включить связь в обратном направлении. Однако невозможно отправить очередь от одного работника другому. Для передачи данных между работниками используйте spmd, labSend, или labReceive вместо этого.

    • В отличие от всех других объектов-дескрипторов, DataQueue и PollableDataQueue экземпляры остаются связанными, когда они отправляются работникам.

    Создание

    Синтаксис

    Описание

    пример

    q = parallel.pool.DataQueue создает объект, который может использоваться для отправки или прослушивания сообщений (или данных) от различных работников. Создать DataQueue на работнике или клиенте, где требуется получить данные.

    Свойства

    развернуть все

    Это свойство доступно только для чтения.

    Число элементов данных, ожидающих удаления из очереди, указанное как нулевое или положительное целое число. Значение: 0 или положительное целое число на работнике или клиенте, создающем PollableDataQueue экземпляр. Если клиент создает PollableDataQueue экземпляр, значение: 0 на всех рабочих. Если работник создает PollableDataQueue, значение равно 0 на клиента и всех остальных работников.

    Функции объекта

    afterEachОпределение функции для вызова при получении новых данных в DataQueue
    sendОтправка данных от работника клиенту с помощью очереди данных

    Примеры

    свернуть все

    Построить DataQueueи звоните afterEach.

    q = parallel.pool.DataQueue;
    afterEach(q, @disp);
    
    Начать parfor-loop и отправьте сообщение. Ожидающее сообщение передается в afterEach функция, в этом примере @disp.

    parfor i = 1:3
        send(q, i); 
    end;
         1
    
         2
    
         3

    Дополнительные сведения о прослушивании данных с помощью DataQueue, см. afterEach.

    При отправке сообщения DataQueue объект, сообщение ожидает в очереди, пока не будет обработано прослушивателем. Каждое сообщение добавляет 1 к длине очереди. В этом примере используется QueueLength свойство для поиска длины DataQueue объект.

    Когда клиент или работник создает DataQueue все сообщения, отправляемые в очередь, хранятся в памяти этого клиента или работника. Если клиент создает DataQueue объект, QueueLength собственность на всех работников 0. В этом примере создается DataQueue объект на клиенте и отправка данных от работника.

    Сначала создайте параллельный пул с одним работником.

    parpool(1);
    Starting parallel pool (parpool) using the 'local' profile ...
    Connected to the parallel pool (number of workers: 1).
    

    Затем создайте DataQueue.

    q = parallel.pool.DataQueue
    q = 
      DataQueue with properties:
    
        QueueLength: 0
    
    

    Вновь созданный DataQueue имеет пустую очередь. Вы можете использовать parfor найти q.QueueLength на рабочего. Найдите длину очереди на клиенте и длину очереди на работнике.

    fprintf('On the client: %i\n', q.QueueLength)
    On the client: 0
    
    parfor i = 1
        fprintf('On the worker: %i\n', q.QueueLength)
    end
    On the worker: 0
    

    Поскольку очередь пуста, QueueLength является 0 как для клиента, так и для работника. Затем отправьте сообщение в очередь от работника. Затем используйте QueueLength для поиска длины очереди.

    % Send a message first
    parfor i = 1
        send(q, 'A message');
    end
    
    % Find the length
    fprintf('On the client: %i\n', q.QueueLength)
    On the client: 1
    
    parfor i = 1
        fprintf('On the worker: %i\n', q.QueueLength)
    end
    On the worker: 0
    

    QueueLength свойство - 1 на клиенте, и 0 на рабочего. Создайте прослушиватель для обработки очереди путем немедленного отображения данных.

    el = afterEach(q, @disp);

    Подождите, пока очередь не будет пуста, затем удалите прослушиватель.

    while q.QueueLength > 0
        pause(0.1);
    end
    delete(el);

    Используйте QueueLength для поиска длины очереди.

    fprintf('On the client: %i\n', q.QueueLength)
    On the client: 0
    

    QueueLength является 0 поскольку обработка очереди завершена.

    В этом примере используется DataQueue для обновления строки ожидания с ходом выполнения parfor-луп.

    При создании parfor-loop, выгрузка каждой итерации в рабочие объекты в параллельном пуле. Информация возвращается от работников только в том случае, если parfor-loop завершается. Вы можете использовать DataQueue для обновления строки ожидания в конце каждой итерации.

    При обновлении полосы ожидания с ходом выполнения parfor-loop, клиент должен записать информацию о количестве оставшихся итераций.

    Совет

    Если вы создаете новый параллельный код и хотите отслеживать ход выполнения кода, рассмотрите возможность использования parfeval workflow-процесс. Дополнительные сведения см. в разделе Асинхронное обновление интерфейса пользователя с помощью afterEach и afterAll.

    Вспомогательная функция parforWaitbar, определенный в конце этого примера, обновляет строку ожидания. Функция использует persistent для хранения информации о числе оставшихся итераций.

    Использовать waitbar для создания панели ожидания, w.

    w = waitbar(0,'Please wait ...');

    Создать DataQueue, D. Затем использовать afterEach бежать parforWaitbar после отправки сообщений в DataQueue.

    % Create DataQueue and listener
    D = parallel.pool.DataQueue;
    afterEach(D,@parforWaitbar);

    Задайте количество итераций для parfor-луп, N. Использовать панель ожидания w и число итераций N для инициализации функции parforWaitbar.

    В конце каждой итерации parfor-loop, клиент запущен parforWaitbar и постепенно обновляет строку ожидания.

    N = 100;
    parforWaitbar(w,N)
    

    Функция parforWaitbar использует постоянные переменные для хранения количества завершенных итераций на клиенте. Никакой информации от работников не требуется.

    Запустить parfor-луп с N итерации. Для этого примера используйте pause и rand для моделирования некоторых работ. После каждой итерации используйте send для отправки сообщения в DataQueue. При отправке сообщения в DataQueue, панель ожидания обновляется. Поскольку информация от работников не требуется, отправьте пустое сообщение, чтобы избежать ненужной передачи данных.

    После parfor-loop завершается, используется delete чтобы закрыть панель ожидания.

    parfor i = 1:N
        pause(rand)
        send(D,[]);
    end
    
    delete(w);
    

    Определение функции помощника parforWaitbar. При выполнении parforWaitbar с двумя входными аргументами функция инициализирует три постоянные переменные (count, h, и N). При выполнении parforWaitbar с одним входным аргументом полоса ожидания обновляется.

    function parforWaitbar(waitbarHandle,iterations)
        persistent count h N
        
        if nargin == 2
            % Initialize
            
            count = 0;
            h = waitbarHandle;
            N = iterations;
        else
            % Update the waitbar
            
            % Check whether the handle is a reference to a deleted object
            if isvalid(h)
                count = count + 1;
                waitbar(count / N,h);
            end
        end
    end

    В этом примере показано, как выполнить параллельный сдвиг параметров с помощью parfeval и отправить результаты обратно во время вычислений с помощью DataQueue объект. parfeval не блокирует MATLAB, поэтому можно продолжить работу во время выполнения вычислений.

    Пример выполняет зачистку параметров в системе Лоренца обыкновенных дифференциальных уравнений, в отношении параметров, и показывает хаотичность этой системы.

    ddtx = λ (y-z) ddty = x (start-z) -yddtz = xy-βx

    Создать сетку параметров

    Определите диапазон параметров, которые требуется исследовать при протягивании параметров.

    gridSize = 40;
    sigma = linspace(5, 45, gridSize);
    rho = linspace(50, 100, gridSize);
    beta = 8/3;

    Создайте 2-D сетку параметров с помощью meshgrid функция.

    [rho,sigma] = meshgrid(rho,sigma);

    Создание объекта фигуры и установка 'Visible' кому true чтобы она открылась в новом окне, за пределами живого сценария. Для визуализации результатов сдвига параметров создайте график поверхности. Обратите внимание, что инициализация Z компонент поверхности с NaN создает пустой график.

    figure('Visible',true);
    surface = surf(rho,sigma,NaN(size(sigma)));
    xlabel('\rho','Interpreter','Tex')
    ylabel('\sigma','Interpreter','Tex')

    Настройка параллельной среды

    Создание пула параллельных работников с помощью parpool функция.

    parpool;
    Starting parallel pool (parpool) using the 'local' profile ...
    Connected to the parallel pool (number of workers: 6).
    

    Чтобы отправить данные от работников, создайте DataQueue объект. Настройте функцию, которая обновляет график поверхности каждый раз, когда работник отправляет данные с помощью afterEach функция. updatePlot функция является вспомогательной функцией, определенной в конце примера.

    Q = parallel.pool.DataQueue;
    afterEach(Q,@(data) updatePlot(surface,data));

    Выполнение сдвига параллельных параметров

    После определения параметров можно выполнить параллельный сдвиг параметров.

    parfeval работает более эффективно при распределении рабочей нагрузки. Чтобы распределить рабочую нагрузку, сгруппируйте параметры для изучения в разделы. Для этого примера разбейте на однородные секции размера step с помощью оператора двоеточия (:). Результирующий массив partitions содержит границы секций. Обратите внимание, что необходимо добавить конечную точку последнего раздела.

    step = 100;
    partitions = [1:step:numel(sigma), numel(sigma)+1]
    partitions = 1×17
    
               1         101         201         301         401         501         601         701         801         901        1001        1101        1201        1301        1401        1501        1601
    
    

    Для достижения оптимальной производительности попробуйте разбить на следующие разделы:

    • Достаточно большое, чтобы время вычисления было большим по сравнению с накладными расходами на планирование раздела.

    • Достаточно маленький, чтобы было достаточно перегородок, чтобы все работники были заняты.

    Чтобы представить выполнение функций на параллельных рабочих и сохранить их результаты, используйте будущие объекты.

    f(1:numel(partitions)-1) = parallel.FevalFuture;

    Разгрузка вычислений для параллельных работников с помощью parfeval функция. parameterSweep - вспомогательная функция, определенная в конце этого сценария, которая решает систему Лоренца на разделе исследуемых параметров. Он имеет один выходной аргумент, поэтому необходимо указать 1 как количество выходов в parfeval.

    for ii = 1:numel(partitions)-1
        f(ii) = parfeval(@parameterSweep,1,partitions(ii),partitions(ii+1),sigma,rho,beta,Q);
    end

    parfeval не блокирует MATLAB, поэтому можно продолжить работу во время выполнения вычислений. Рабочие вычисляют параллельно и отправляют промежуточные результаты через DataQueue как только они станут доступны.

    Если требуется заблокировать MATLAB до parfeval завершает, используйте wait функция на будущих объектах. Использование wait полезна, когда последующий код зависит от завершения parfeval.

    wait(f);

    После parfeval завершает вычисления, wait завершает и можно выполнить больше кода. Например, постройте график контура результирующей поверхности. Используйте fetchOutputs для извлечения результатов, сохраненных в будущих объектах.

    results = reshape(fetchOutputs(f),gridSize,[]);
    contourf(rho,sigma,results)
    xlabel('\rho','Interpreter','Tex')
    ylabel('\sigma','Interpreter','Tex')

    Если для сдвига параметров требуется больше вычислительных ресурсов и у вас есть доступ к кластеру, вы можете увеличить parfeval вычисления. Дополнительные сведения см. в разделе Масштабирование с рабочего стола на кластер.

    Определение вспомогательных функций

    Определите вспомогательную функцию, которая решает систему Лоренца на разделе исследуемых параметров. Отправка промежуточных результатов клиенту MATLAB с помощью send функции на DataQueue объект.

    function results = parameterSweep(first,last,sigma,rho,beta,Q)
        results = zeros(last-first,1);
        for ii = first:last-1
            lorenzSystem = @(t,a) [sigma(ii)*(a(2) - a(1)); a(1)*(rho(ii) - a(3)) - a(2); a(1)*a(2) - beta*a(3)];
            [t,a] = ode45(lorenzSystem,[0 100],[1 1 1]);
            result = a(end,3);
            send(Q,[ii,result]);
            results(ii-first+1) = result;
        end
    end

    Определите другую вспомогательную функцию, которая обновляет график поверхности при поступлении новых данных.

    function updatePlot(surface,data)
        surface.ZData(data(1)) = data(2);
        drawnow('limitrate');
    end
    Представлен в R2017a