Пример развертывания длинных массивов на кластере Hadoop с поддержкой Spark

Поддерживаемая платформа: Linux® только.

В этом примере показано, как развернуть MATLAB® приложение, содержащее длинные массивы для Hadoop с Spark™ поддержкой® кластер.

Цель: Вычислите среднюю задержку прибытия и самые большие задержки прибытия авиакомпаний из данного набора данных.

Набор данных:airlinesmall.csv
Описание:

Информация о вылете и прибытии из 1987-2008.

Расположение:/usr/local/MATLAB/R2021a/toolbox/matlab/demos

Примечание

Можно следовать тем же инструкциям для развертывания приложений длинный массив Spark на Cloudera® CDH. Чтобы увидеть пример на Answers™ MATLAB, нажмите здесь.

Чтобы использовать зоны шифрования Cloudera CDH, добавьте файл JAR commons-codec-1.9.jar к статическому пути классов среды MATLAB Runtime. Расположение файла: $HADOOP_PREFIX/lib/commons-codec-1.9.jar, где $ HADOOP _ PREFIX является местом установки Hadoop.

Примечание

Если вы используете Spark версии 1.6 или выше, вам нужно будет увеличить Java® размер кучки в MATLAB по меньшей мере до 512MB. Для получения информации об увеличении размера Java-кучи в MATLAB, смотрите Java Heap Настройки.

Необходимые условия

  1. Начните этот пример, создав новую рабочую папку, которая видна по путь поиска файлов MATLAB.

  2. Установите среду MATLAB Runtime в папку, доступную каждому рабочему узлу кластера Hadoop. Этот пример использует/ usr/local/MATLAB/MATLAB_Runtime/v<reservedrangesplaceholder0 > как расположение папки MATLAB Runtime.

    Если у вас нет MATLAB Runtime, вы можете скачать его с сайта в: https://www.mathworks.com/products/compiler/mcr.

    Примечание

    Замените все ссылки на MATLAB Runtime версии v ## в этом примере с номером версии MATLAB Runtime, соответствующим вашему релизу MATLAB. Например, MATLAB R2017b имеет номер версии MATLAB Runtime v92. Для получения информации о номерах версий MATLAB Runtime, соответствующих релизам MATLAB, смотрите этот список.

  3. Скопируйте файл airlinesmall.csv в папку Hadoop Distributed File System (HDFS™) /user/<username>/datasets. Вот <username> ссылается на ваше имя пользователя в HDFS.

    $ ./hadoop fs -copyFromLocal airlinesmall.csv hdfs://host:54310/user/<username>/datasets

Процедура

  1. Настройте переменную окружения, HADOOP_PREFIX указывать на папку установки Hadoop. Эти свойства необходимы для отправки заданий в кластер Hadoop.

    setenv('HADOOP_PREFIX','/usr/lib/hadoop')

    The HADOOP_PREFIX переменная окружения должна быть установлена при использовании MATLAB datastore функция для указания данных на HDFS. Установка этого переменного окружения не имеет никакого отношения к Spark. Смотрите Отношения между Spark и Hadoop для получения дополнительной информации.

    Если вы планируете использовать набор данных, который находится на вашем локальном компьютере, в отличие от набора данных в HDFS, можно пропустить этот шаг.

    Примечание

    Этот пример использует /usr/lib/hadoop как директория, в котором установлен Hadoop. Возможно, ваша директория установки Hadoop отличается.

  2. Задайте свойства Spark.

    Использование containers.Map объект для задания свойств Spark.

    sparkProperties = containers.Map( ...
     {'spark.executor.cores', ...
     'spark.executor.memory', ...
     'spark.yarn.executor.memoryOverhead', ...
     'spark.dynamicAllocation.enabled', ...
     'spark.shuffle.service.enabled', ...
     'spark.eventLog.enabled', ...
     'spark.eventLog.dir'}, ...
     {'1', ...
      '2g', ...
      '1024', ...
      'true', ...
      'true', ...
      'true', ...
      'hdfs://host:54310/user/<username>/log'});

    Для получения дополнительной информации о свойствах Spark разверните prop значение 'SparkProperties' Пара "имя-значение" в Входные параметры разделе SparkConf класс. The SparkConf класс является частью MATLAB API для Spark, который предоставляет альтернативный способ развертывания приложений MATLAB в Spark. Для получения дополнительной информации смотрите Развертывание приложений Используя MATLAB API для Spark.

  3. Сконфигурируйте свое приложение MATLAB, содержащее длинные массивы с параметрами Spark.

    Используйте matlab.mapreduce.DeploySparkMapReducer классов чтобы сконфигурировать приложение MATLAB, содержащее длинные массивы с параметрами Spark в виде пар "ключ-значение".

    conf = matlab.mapreduce.DeploySparkMapReducer( ...
          'AppName','myTallApp', ...
          'Master','yarn-client', ...
          'SparkProperties',sparkProperties);

    Для получения дополнительной информации смотрите matlab.mapreduce.DeploySparkMapReducer.

  4. Определите окружение выполнения Spark.

    Используйте mapreducer функция для определения окружения выполнения Spark.

    mapreducer(conf)

    Для получения дополнительной информации см. mapreducer.

  5. Включите код приложения MATLAB, содержащий длинные массивы.

    Используйте функцию MATLAB datastore для создания datastore объект, указывающий на файл airlinesmall.csv в HDFS. Передайте datastore объект как входной параметр к tall функция. Это создаст длинный массив. Можно выполнить операции на длинный массив, чтобы вычислить среднюю задержку прибытия и самые большие задержки прибытия.

    % Create a |datastore| for a collection of tabular text files representing airline data. 
    % Select the variables of interest, specify a categorical data type for the 
    % |Origin| and |Dest| variables.
    % ds = datastore('airlinesmall.csv') % if using a dataset on your local machine
    ds = datastore('hdfs:///<username>/datasets/airlinesmall.csv');
    ds.TreatAsMissing = 'NA';
    ds.SelectedVariableNames = {'Year','Month','ArrDelay','DepDelay','Origin','Dest'};
    ds.SelectedFormats(5:6) = {'%C','%C'};
    
    % Create Tall Array
    % Tall arrays are like normal MATLAB arrays, except that they can have any
    % number of rows. When a |tall| array is backed by a |datastore|, the underlying class of
    % the tall array is based on the type of datastore.
    tt = tall(ds);
    
    % Remove Rows with Missing Data or NaN Values
    idx = any(ismissing(tt),2); 
    tt(idx,:) = [];
    
    % Compute Mean Delay
    meanArrivalDelay = mean(tt.DepDelay,'omitnan');
    biggestDelays = topkrows(tt,10,'ArrDelay');
    
    % Gather Results
    % The |gather| function forces evaluation of all queued operations and
    % brings the resulting output back into memory. 
    [meanArrivalDelay,biggestDelays] = gather(meanArrivalDelay,biggestDelays)
    
    % Delete mapreducer object
    delete(conf);
  6. Создайте приложение Spark.

    Используйте mcc команда со -vCW опции для создания приложения Spark.

    >> mcc -vCW 'Spark:myTallApp' deployTallArrayToSpark.m

    Создаются следующие файлы.

    ФайлыОписание
    run_myTallApp.shShell для запуска приложения. Скрипт вызывает spark-submit чтобы запустить приложение на кластере.
    myTallApp.jarПриложение JAR. JAR приложения содержит упакованный код MATLAB и другие зависимости.
    readme.txtФайл Readme, содержащий подробную информацию о том, как запустить приложение.
    requiredMCRProducts.txt 
    mccExcludedFiles.log 

    Для получения дополнительной информации см. mcc.

  7. Запустите приложение из интерпретатора с помощью следующей команды:

    $ ./run_myTallApp.sh /usr/local/MATLAB/MATLAB_Runtime/v##

    / usr/local/MATLAB/MATLAB_Runtime/v<reservedrangesplaceholder0 > является аргументом, указывающим местоположение MATLAB Runtime.

  8. Вы увидите следующий выход:

    meanArrivalDelay =
        7.1201
    
    biggestDelays =
    
      10x5 table
    
        Year    Month    ArrDelay    Origin    Dest
        ____    _____    ________    ______    ____
    
        1995    11       1014        HNL       LAX 
        2007     4        914        JFK       DTW 
        2001     4        887        MCO       DTW 
        2008     7        845        CMH       ORD 
        1988     3        772        ORD       LEX 
        2008     4        710        EWR       RDU 
        1998    10        679        MCI       DFW 
        2006     6        603        ABQ       PHX 
        2008     6        586        PIT       LGA 
        2007     4        568        RNO       SLC
    

Если вы хотите анализировать или просматривать результаты, сгенерированные вашим приложением в MATLAB, необходимо записать результаты в файл на HDFS с помощью write функция для длинные массивы. Затем вы можете считать файл, используя datastore функция.

Чтобы записать результаты в файл на HDFS, добавьте следующую строку кода к приложению MATLAB непосредственно перед delete(conf) оператор и затем упаковать ваше приложение:

write('hdfs:///user/<username>/results', tall(biggestDelays));

Замените <username> с вашим именем пользователя.

Вы можете сохранить только одну переменную в файл, используя write функция для длинные массивы. Поэтому вам нужно будет записать в несколько файлов, если вы хотите сохранить несколько переменных.

Чтобы просмотреть результаты в MATLAB после выполнения приложения против кластера с поддержкой Spark, используйте datastore функционировать следующим образом:

>> ds = datastore('hdfs:///user/<username>/results')
>> readall(ds)

Возможно, вам потребуется задать переменную окружения HADOOP_PREFIX использование функции setenv в случае, если вы не можете просмотреть результаты, используя datastore функция.

Примечание

Если развертываемое приложение длинный массив является функцией MATLAB в отличие от скрипта MATLAB, используйте следующий синтаксис выполнения:

$ ./run_<applicationName>.sh \
  <MATLAB_Runtime_Location> \
  [Spark arguments] \
  [Application arguments]
Для примера:
$ ./run_myTallApp.sh \
   /usr/local/MATLAB/MATLAB_Runtime/v92 \
   yarn-client \
   hdfs://host:54310/user/<username>/datasets/airlinesmall.csv \
   hdfs://host:54310/user/<username>/result

Код:

 deployTallArrayToSpark.m