exponenta event banner

Использование массивов Tall в кластере Hadoop с поддержкой искр

Создание и использование таблиц Tall

В этом примере показано, как изменить пример MATLAB ® при создании таблицы высокого уровня для запуска в кластере Hadoop ® с поддержкой Spark ®. Эту таблицу можно использовать для создания массивов и расчета статистических свойств. Вы можете разработать код локально, а затем расширить его, чтобы воспользоваться возможностями Parallel Computing Toolbox™ и MATLAB Parallel Server™ без необходимости переписывать алгоритм. См. также Рабочий процесс больших данных с использованием массивов Tall и хранилищ данных и настройка кластера Hadoop (параллельный сервер MATLAB)

Во-первых, необходимо установить переменные среды и свойства кластера в соответствии с конкретной конфигурацией кластера Hadoop с поддержкой Spark. Значения этих и других свойств, необходимых для отправки заданий в кластер, см. у системного администратора.

setenv('HADOOP_HOME', '/path/to/hadoop/install')
setenv('SPARK_HOME', '/path/to/spark/install');
cluster = parallel.cluster.Hadoop;

% Optionally, if you want to control the exact number of workers:
cluster.SparkProperties('spark.executor.instances') = '16';

mapreducer(cluster);

Примечание

На шаге настройки используется mapreducer для установки среды выполнения кластера. На следующем шаге создается массив высокого уровня. Если среда выполнения кластера изменяется или удаляется после создания массива высокого уровня, то массив высокого уровня является недопустимым и его необходимо создать заново.

Примечание

При необходимости разработки в серийном формате без использования локальных работников введите следующую команду.

mapreducer(0);

После установки переменных среды и свойств кластера можно запустить пример таблицы уровня MATLAB в кластере Hadoop с поддержкой Spark, а не на локальном компьютере. Создайте хранилище данных и преобразуйте его в высокую таблицу. MATLAB автоматически запускает задание Spark для выполнения последующих вычислений в таблице высокого уровня.

ds = datastore('airlinesmall.csv');
varnames = {'ArrDelay', 'DepDelay'};
ds.SelectedVariableNames = varnames;
ds.TreatAsMissing = 'NA';

Создание таблицы высокого уровня tt из хранилища данных.

tt = tall(ds)
Starting a Spark Job on the Hadoop cluster. This could take a few minutes ...done.

tt =

  M×2 tall table 

    ArrDelay    DepDelay
    ________    ________

     8          12      
     8           1      
    21          20      
    13          12      
     4          -1      
    59          63      
     3          -2      
    11          -1      
    :           :
    :           :

На дисплее отображается количество строк, M, пока не известно. M является местозаполнителем до завершения расчета.

Извлечь задержку поступления ArrDelay с высокого стола. Это действие создает новую переменную массива высокого уровня для использования в последующих вычислениях.

a = tt.ArrDelay;

Можно указать ряд операций в массиве высокого уровня, которые не выполняются до вызова gather. Это позволяет группировать команды, которые могут занять длительное время. В качестве примера вычислите среднее и стандартное отклонение задержки поступления. Эти значения используются для построения верхнего и нижнего пороговых значений для задержек, которые находятся в пределах 1 стандартного отклонения от среднего значения.

m = mean(a,'omitnan');
s = std(a,'omitnan');
one_sigma_bounds = [m-s m m+s];

Использовать gather вычислять one_sigma_boundsи внести ответ в память.

sig1 = gather(one_sigma_bounds)
Evaluating tall expression using the Spark Cluster:
- Pass 1 of 1: Completed in 0.95 sec
Evaluation completed in 1.3 sec

sig1 =

  -23.4572    7.1201   37.6975

Можно указать несколько входов и выходов для gather если вы хотите оценить сразу несколько вещей. Это происходит быстрее, чем вызов gather отдельно на каждом высоком массиве. Например, вычислите минимальную и максимальную задержку поступления.

[max_delay, min_delay] = gather(max(a),min(a))
max_delay =

        1014

min_delay =

   -64

Примечание

Если MATLAB запускается на рабочих кластерах, для выполнения этих примеров требуется больше времени.

При использовании массивов tall в кластере Hadoop с поддержкой Spark вычислительные ресурсы из кластера Hadoop будут зарезервированы на время существования среды выполнения mapreducer. Чтобы очистить эти ресурсы, необходимо удалить mapreducer:

delete(gcmr);
Кроме того, можно перейти в другую среду выполнения, например:
mapreducer(0);

См. также

| | | | |

Связанные примеры

Подробнее