Пример развертывания приложений в Spark с помощью MATLAB API для Spark

Поддерживаемая платформа: Linux® только.

В этом примере показано, как развернуть автономное приложение для Spark™ с помощью MATLAB® API для Spark. Ваше приложение может быть развернуто против Spark с помощью одного из двух поддерживаемых кластерных менеджеров: локального и Hadoop® YARN. В этом примере показано, как развернуть приложение с помощью обоих менеджеров кластеров. Для обсуждения кластерных менеджеров смотрите Кластерные менеджеры, поддерживаемые Spark.

Цель: Подсчитать количество уникальных авиакомпаний в данном наборе данных.

Набор данных:airlinesmall.csv
Описание:

Информация о вылете и прибытии из 1987-2008.

Расположение:/usr/local/MATLAB/R2021a/toolbox/matlab/demos

Функция помощника

Создайте файл MATLAB с именем carrierToCount.m со следующим кодом:

function results = carrierToCount(input)
    tbl = input{1};
    intermKeys = tbl.UniqueCarrier;
    [intermKeys, ~, idx] = unique(intermKeys);
    intermValues = num2cell(accumarray(idx, ones(size(idx))));
    results = cellfun( @(x,y) {x,y} , ...
        intermKeys, intermValues, ...
        'UniformOutput',false);
Эта вспомогательная функция передается в качестве указателя на функцию одному из методов в примере.

Примечание

Если вы используете Spark версии 1.6 или выше, вам нужно будет увеличить Java® размер кучки в MATLAB по меньшей мере до 512MB. Для получения информации об увеличении размера Java-кучи в MATLAB, смотрите Java Heap Настройки.

Местный

Локальный менеджер кластеров представляет кластер с поддержкой pseudo Spark и работает в распределенном режиме на одной машине. Он может быть сконфигурирован, чтобы использовать один рабочий поток, или на многоядерной машине, несколько рабочих потоков. В приложениях обозначается словом local. Локальный менеджер кластеров удобен для отладки вашего приложения перед полным развертыванием в кластере Hadoop с поддержкой Spark.

Необходимые условия

  1. Начните этот пример, создав новую рабочую папку, которая видна по путь поиска файлов MATLAB.

  2. Создайте вспомогательную функцию carrierToCount.m упомянутый выше.

Процедура

  1. Задайте свойства Spark.

    Использование containers.Map объект для задания свойств Spark.

    sparkProp = containers.Map(...
        {'spark.executor.cores',...
        'spark.matlab.worker.debug'},...
        {'1',...
        'true'});

    Свойства Spark указывают на окружение выполнения Spark развертываемого приложения. Каждое приложение должно быть сконфигурировано с определенными свойствами Spark в порядок для его развертывания.

    Для получения дополнительной информации о свойствах Spark разверните prop значение 'SparkProperties' Пара "имя-значение" в Входные параметры разделе SparkConf класс.

  2. Создайте SparkConf объект.

    Используйте matlab.compiler.mlspark.SparkConf классов для создания SparkConf объект. A SparkConf объект хранит параметры конфигурации развертываемого приложения в Spark. Параметры конфигурации приложения передаются в кластер Spark через SparkContext.

    conf = matlab.compiler.mlspark.SparkConf(...
        'AppName', 'mySparkAppDepLocal', ...
        'Master', 'local[1]', ...
        'SparkProperties', sparkProp );

    Для получения дополнительной информации о SparkConf смотрите matlab.compiler.mlspark.SparkConf.

  3. Создайте SparkContext объект.

    Используйте matlab.compiler.mlspark.SparkContext классов с SparkConf объект как вход для создания SparkContext объект.

    sc = matlab.compiler.mlspark.SparkContext(conf);

    A SparkContext объект служит точкой входа в Spark путем инициализации соединения с кластером Spark. Принимает SparkConf объект как входной параметр и использует параметры, указанные в этом объекте, для настройки внутренних сервисов, необходимых для установления соединения с окружением выполнения Spark.

    Для получения дополнительной информации о SparkContext смотрите matlab.compiler.mlspark.SparkContext.

  4. Создайте RDD объект из данных.

    Используйте функцию MATLAB datastore для создания datastore объект, указывающий на файл airlinesmall.csv. Затем используйте метод SparkContext datastoreToRDD для преобразования datastore объект в RDD Spark объект.

    % Create a MATLAB datastore (LOCAL)
    ds = datastore('airlinesmall.csv',...
        'TreatAsMissing','NA', ...
        'SelectedVariableNames','UniqueCarrier');
    % Convert MATLAB datastore to Spark RDD
    rdd = sc.datastoreToRDD(ds);

    В целом входные RDD могут быть созданы с помощью следующих методов SparkContext класс: parallelize, datastoreToRDD, и textFile.

  5. Выполните операции на RDD объект.

    Используйте метод Spark RDD, такой как flatMap применить функцию ко всем элементам RDD объект и выравнивание результатов. Функция carrierToCount который был создан ранее, служит функцией, которая будет применяться к элементам RDD. Указатель на функцию carrierToCount передается как входной параметр в flatMap способ.

    maprdd = rdd.flatMap(@carrierToCount);
    redrdd = maprdd.reduceByKey( @(acc,value) acc+value );
    countdata = redrdd.collect();
    
    % Count and display carrier occurrences
    count = 0;
    for i=1:numel(countdata)
        count = count + countdata{i}{2};
        fprintf('\nCarrier Name: %s, Count: %d', countdata{i}{1}, countdata{i}{2});
    end
    fprintf('\n Total count : %d\n', count);
    
    % Delete Spark Context
    delete(sc)

    В целом, вы предоставите указатели на функции MATLAB или анонимные функции в качестве входных параметров в методы Spark RDD, известные как transformations и actions. Эти указатели на функцию и анонимные функции выполняются работниками развернутого приложения.

    Список поддерживаемых преобразований и действий RDD см. в разделе «Преобразования и действия» в разделе «Методы» RDD класс.

    Для получения дополнительной информации о преобразованиях и действиях смотрите Apache Spark Basics.

  6. Создайте автономное приложение.

    Используйте mcc команда со -m флаг для создания автономного приложения. The -m флаг создает стандартный исполняемый файл, который можно запускать из командной строки. The -a флаг включает зависимый набор данных airlinesmall.csv из папки <matlabroot>/toolbox/matlab/demos. mcc команда автоматически захватывает зависимый файл carrierToCount.m пока он находится в одной рабочей папке.

    >> mcc -m deployToSparkMlApiLocal.m -a <matlabroot>/toolbox/matlab/demos/airlinesmall.csv

    mcc команда создает оболочечный скрипт run_deployToSparkMlApiLocal.sh чтобы запустить исполняемый файл deployToSparkMlApiLocal.

    Для получения дополнительной информации см. mcc.

  7. Запустите автономное приложение из интерпретатора Linux с помощью следующей команды:

    $ ./run_deployToSparkMlApiLocal.sh /share/MATLAB/MATLAB_Runtime/v91

    /share/MATLAB/MATLAB_Runtime/v91 является аргументом, указывающим местоположение MATLAB Runtime.

    Перед выполнением вышеуказанной команды убедитесь в javaclasspath.txt файл находится в той же папке, что и скрипт интерпретатора и исполняемый файл.

    Ваше приложение не будет выполнено, если оно не может найти файл javaclasspath.txt.

    Ваше приложение также может не выполняться, если необязательная линия, содержащая папку с файлами строения Hadoop, отсутствует. Чтобы выполнить ваше приложение на local диспетчер кластеров, эту линию необходимо прокомментировать. Эта линия должна быть без комментариев, если вы планируете запускать приложение с помощью yarn-client в качестве менеджера кластеров на кластере Hadoop с поддержкой Spark.

  8. Вы увидите следующий выход:

    Carrier Name: 9E, Count: 521
    Carrier Name: AA, Count: 14930
    Carrier Name: AQ, Count: 154
    Carrier Name: AS, Count: 2910
    Carrier Name: B6, Count: 806
    Carrier Name: CO, Count: 8138
    ...
    ...
    ...
    Carrier Name: US, Count: 13997
    Carrier Name: WN, Count: 15931
    Carrier Name: XE, Count: 2357
    Carrier Name: YV, Count: 849
     Total count : 123523

Код:

 deployToSparkMlApiLocal.m

Hadoop YARN

Менеджер кластера с пряжей представляет кластер Hadoop с поддержкой Spark. В Hadoop 2.0 был представлен менеджер кластеров YARN. Он обычно устанавливается на тех же узлах, что и HDFS™. Поэтому запуск Spark на YARN позволяет Spark легко получить доступ к данным HDFS. В приложениях обозначается с помощью слова yarn-client.

Начиная с шагов развертывания вашего приложения с помощью yarn-client поскольку ваш менеджер кластеров похож на использование локального менеджера кластеров, показанного выше, шаги представлены с минимальным обсуждением. Для подробного обсуждения каждого шага, проверьте Локальный случай выше.

Примечание

Можно следовать тем же инструкциям по развертыванию приложений Spark, созданных с помощью MATLAB API для Spark на Cloudera® CDH. Чтобы увидеть пример на Answers™ MATLAB, нажмите здесь.

Чтобы использовать зоны шифрования Cloudera CDH, добавьте файл JAR commons-codec-1.9.jar к статическому пути классов среды MATLAB Runtime. Расположение файла: $HADOOP_PREFIX/lib/commons-codec-1.9.jar, где $ HADOOP _ PREFIX является местом установки Hadoop.

Необходимые условия

  1. Начните этот пример, создав новую рабочую папку, которая видна по путь поиска файлов MATLAB.

  2. Установите среду MATLAB Runtime в папку, доступную каждому рабочему узлу кластера Hadoop. Этот пример использует /share/MATLAB/MATLAB_Runtime/v91 как расположение папки MATLAB Runtime.

    Если у вас нет MATLAB Runtime, вы можете скачать его с сайта в: https://www.mathworks.com/products/compiler/mcr.

  3. Скопируйте airlinesmall.csv в папку Hadoop Distributed File System (HDFS) /user/<username>/datasets. Вот <username> ссылается на ваше имя пользователя в HDFS.

    $ ./hadoop fs -copyFromLocal airlinesmall.csv hdfs://host:54310/user/<username>/datasets

Процедура

  1. Настройте переменную окружения, HADOOP_PREFIX указывать на папку установки Hadoop. Эти свойства необходимы для отправки заданий в кластер Hadoop.

    setenv('HADOOP_PREFIX','/share/hadoop/hadoop-2.6.0')

    The HADOOP_PREFIX переменная окружения должна быть установлена при использовании MATLAB datastore функция для указания данных на HDFS. Установка этого переменного окружения не имеет никакого отношения к Spark. Смотрите Отношения между Spark и Hadoop для получения дополнительной информации.

  2. Задайте свойства Spark.

    Использование containers.Map объект для задания свойств Spark.

    sparkProperties = containers.Map( ...
        {'spark.executor.cores',...
        'spark.executor.memory',...
        'spark.yarn.executor.memoryOverhead',...
        'spark.dynamicAllocation.enabled',...
        'spark.shuffle.service.enabled',...
        'spark.eventLog.enabled',...
        'spark.eventLog.dir'}, ...
        {'1',...
        '2g',...
        '1024',...
        'true',...
        'true',...
        'true',...
        'hdfs://hadoop01glnxa64:54310/user/<username>/sparkdeploy'});

    Для получения дополнительной информации о свойствах Spark разверните prop значение 'SparkProperties' Пара "имя-значение" в Входные параметры разделе SparkConf класс.

  3. Создайте SparkConf объект.

    Используйте matlab.compiler.mlspark.SparkConf классов для создания SparkConf объект.

    conf = matlab.compiler.mlspark.SparkConf( ...
        'AppName','myApp', ...
        'Master','yarn-client', ...
        'SparkProperties',sparkProperties);

    Для получения дополнительной информации о SparkConf смотрите matlab.compiler.mlspark.SparkConf.

  4. Создайте SparkContext объект.

    Используйте matlab.compiler.mlspark.SparkContext классов с SparkConf объект как вход для создания SparkContext объект.

    sc = matlab.compiler.mlspark.SparkContext(conf);

    Для получения дополнительной информации о SparkContext смотрите matlab.compiler.mlspark.SparkContext.

  5. Создайте RDD объект из данных.

    Используйте функцию MATLAB datastore для создания datastore объект, указывающий на файл airlinesmall.csv в HDFS. Затем используйте метод SparkContext datastoreToRDD для преобразования datastore объект в RDD Spark объект.

    % Create a MATLAB datastore (HADOOP)
    ds = datastore(...
        'hdfs:///user/<username>/datasets/airlinesmall.csv',...
        'TreatAsMissing','NA',...
        'SelectedVariableNames','UniqueCarrier');
    
    % Convert MATLAB datastore to Spark RDD
    rdd = sc.datastoreToRDD(ds);

    В целом входные RDD могут быть созданы с помощью следующих методов SparkContext класс: parallelize, datastoreToRDD, и textFile.

  6. Выполните операции на RDD объект.

    Используйте метод Spark RDD, такой как flatMap применить функцию ко всем элементам RDD объект и выравнивание результатов. Функция carrierToCount который был создан ранее, служит функцией, которая будет применяться к элементам RDD. Указатель на функцию carrierToCount передается как входной параметр в flatMap способ.

    maprdd = rdd.flatMap(@carrierToCount);
    redrdd = maprdd.reduceByKey( @(acc,value) acc+value );
    countdata = redrdd.collect();
    
    % Count and display carrier occurrences
    count = 0;
    for i=1:numel(countdata)
        count = count + countdata{i}{2};
        fprintf('\nCarrier Code: %s, Count: %d', countdata{i}{1}, countdata{i}{2});
    end
    fprintf('\n Total count : %d\n', count);
    
    % Save results to MAT file
    save('countdata.mat','countdata');
    
    % Delete Spark Context
    delete(sc);

    Список поддерживаемых преобразований и действий RDD см. в разделе «Преобразования и действия» в разделе «Методы» RDD класс.

    Для получения дополнительной информации о преобразованиях и действиях смотрите Apache Spark Basics.

  7. Создайте автономное приложение.

    Используйте mcc команда со -m флаг для создания автономного приложения. The -m флаг создает автономное приложение, которое можно запускать из командной строки. Вам не нужно присоединять набор данных airlinesmall.csv поскольку он находится в HDFS. mcc команда автоматически захватывает зависимый файл carrierToCount.m пока он находится в одной рабочей папке.

    >> mcc -m deployToSparkMlApiHadoop.m

    mcc команда создает оболочечный скрипт run_deployToSparkMlApiHadoop.sh чтобы запустить исполняемый файл deployToSparkMlApiHadoop.

    Для получения дополнительной информации см. mcc.

  8. Запустите автономное приложение из интерпретатора Linux с помощью следующей команды:

    $ ./run_deployToSparkMlApiHadoop.sh /share/MATLAB/MATLAB_Runtime/v91

    /share/MATLAB/MATLAB_Runtime/v91 является аргументом, указывающим местоположение MATLAB Runtime.

    Перед выполнением вышеуказанной команды убедитесь в javaclasspath.txt файл находится в той же папке, что и скрипт интерпретатора и исполняемый файл.

    Ваше приложение не будет выполнено, если оно не может найти файл javaclasspath.txt.

    Ваше приложение также может не выполняться, если необязательная линия, содержащая папку с файлами строения Hadoop, закомментирована. Чтобы выполнить приложение на диспетчере кластеров пряжи-клиента, эта линия должна быть без комментариев. Эту линию следует комментировать, только если вы планируете запускать приложение с помощью локального менеджера кластеров.

  9. Вы увидите следующий выход:

    Carrier Name: 9E, Count: 521
    Carrier Name: AA, Count: 14930
    Carrier Name: AQ, Count: 154
    Carrier Name: AS, Count: 2910
    Carrier Name: B6, Count: 806
    Carrier Name: CO, Count: 8138
    ...
    ...
    ...
    Carrier Name: US, Count: 13997
    Carrier Name: WN, Count: 15931
    Carrier Name: XE, Count: 2357
    Carrier Name: YV, Count: 849
     Total count : 123523

    Примечание

    Если развертываемое приложение является функцией MATLAB, в отличие от скрипта MATLAB, используйте следующий синтаксис выполнения:

    $ ./run_<applicationName>.sh \
      <MATLAB_Runtime_Location> \
      [Spark arguments] \
      [Application arguments]
    Для примера:
    $ ./run_deployToSparkMlApiHadoop.sh.sh \
       /usr/local/MATLAB/MATLAB_Runtime/v91 \
       yarn-client \
       hdfs://host:54310/user/<username>/datasets/airlinesmall.csv \
       hdfs://host:54310/user/<username>/result

Код:

 deployToSparkMlApiHadoop.m