Пример на развертывающихся приложениях к Spark Используя MATLAB API для Spark

Поддерживаемая платформа: Linux® только.

Этот пример показывает вам, как развернуть автономное приложение на Spark™ с помощью MATLAB® API для Spark. Ваше приложение может быть развернуто против Spark с помощью одного из двух поддерживаемых кластерных менеджеров: локальный и Hadoop® YARN. Этот пример показывает вам, как развернуть ваше приложение с помощью обоих кластерных менеджеров. Для обсуждения кластерных менеджеров смотрите Кластерных менеджеров, Поддержанных Spark.

Цель: Считайте количество уникальных авиакомпаний в данном наборе данных.

Набор данных:airlinesmall.csv
Описание:

Отъезд авиакомпании и информация о прибытии от 1987-2008.

Местоположение :/usr/local/MATLAB/R2019a/toolbox/matlab/demos

Функция помощника

Создайте файл MATLAB под названием carrierToCount.m со следующим кодом:

function results = carrierToCount(input)
    tbl = input{1};
    intermKeys = tbl.UniqueCarrier;
    [intermKeys, ~, idx] = unique(intermKeys);
    intermValues = num2cell(accumarray(idx, ones(size(idx))));
    results = cellfun( @(x,y) {x,y} , ...
        intermKeys, intermValues, ...
        'UniformOutput',false);
Эта функция помощника передается в как указатель на функцию к одному из методов в примере.

Примечание

Если вы будете использовать версию 1.6 Spark или выше, необходимо будет увеличить размер кучи Java® в MATLAB по крайней мере к 512 МБ. Для получения информации о том, как увеличить размер кучи Java в MATLAB, смотрите Настройки Java Heap Memory (MATLAB).

Локальный

Локальный кластерный менеджер представляет псевдо Spark, включил кластер и работает в нераспределенном режиме над одной машиной. Это может быть сконфигурировано, чтобы использовать один рабочий поток, или на многожильной машине, нескольких рабочих потоках. В приложениях это обозначается словом local. Локальный кластерный менеджер удобен для отладки вашего приложения до полноценного развертывания на включенном кластере Hadoop Spark.

Предпосылки

  1. Запустите этот пример путем создания новой папки работы, которая видима к пути поиска файлов MATLAB.

  2. Создайте функцию помощника упомянутый выше carrierToCount.m.

Процедура

  1. Свойства Specify Spark.

    Используйте объект containers.Map задать свойства Spark.

    sparkProp = containers.Map(...
        {'spark.executor.cores',...
        'spark.matlab.worker.debug'},...
        {'1',...
        'true'});

    Свойства Spark указывают на среду выполнения Spark приложения, которое развертывается. Каждое приложение должно быть сконфигурировано с определенными свойствами Spark для него, чтобы быть развернутым.

    Для получения дополнительной информации о свойствах Spark расширьте значение prop пары "имя-значение" 'SparkProperties' в разделе Input Arguments класса SparkConf.

  2. Создайте объект SparkConf.

    Используйте класс matlab.compiler.mlspark.SparkConf, чтобы создать объект SparkConf. SparkConf объектно-ориентированная память параметры конфигурации приложения, развертываемого на Spark. Параметры конфигурации приложения передаются на кластер Spark через SparkContext.

    conf = matlab.compiler.mlspark.SparkConf(...
        'AppName', 'mySparkAppDepLocal', ...
        'Master', 'local[1]', ...
        'SparkProperties', sparkProp );

    Для получения дополнительной информации о SparkConf смотрите matlab.compiler.mlspark.SparkConf.

  3. Создайте объект SparkContext.

    Используйте класс matlab.compiler.mlspark.SparkContext с объектом SparkConf как вход, чтобы создать объект SparkContext.

    sc = matlab.compiler.mlspark.SparkContext(conf);

    Объект SparkContext служит точкой входа к Spark путем инициализации связи с кластером Spark. Это принимает объект SparkConf как входной параметр и использует параметры, заданные в том объекте настроить внутренние сервисы, необходимые, чтобы установить связь со средой выполнения Spark.

    Для получения дополнительной информации о SparkContext смотрите matlab.compiler.mlspark.SparkContext.

  4. Создайте объект RDD из данных.

    Используйте функцию MATLAB datastore, чтобы создать объект datastore, указывающий на файл airlinesmall.csv. Затем используйте метод SparkContext datastoreToRDD, чтобы преобразовать объект datastore в объект Spark RDD.

    % Create a MATLAB datastore (LOCAL)
    ds = datastore('airlinesmall.csv',...
        'TreatAsMissing','NA', ...
        'SelectedVariableNames','UniqueCarrier');
    % Convert MATLAB datastore to Spark RDD
    rdd = sc.datastoreToRDD(ds);

    В целом вход RDDs может быть создан с помощью следующих методов класса SparkContext: parallelize, datastoreToRDD и textFile.

  5. Выполните операции на объекте RDD.

    Используйте Spark метод RDD, такой как flatMap, чтобы применить функцию ко всем элементам объекта RDD и сгладить результаты. Функциональный carrierToCount, который был создан ранее, служит функцией, которая будет прикладной к элементам RDD. Указатель на функцию к функциональному carrierToCount передается как входной параметр методу flatMap.

    maprdd = rdd.flatMap(@carrierToCount);
    redrdd = maprdd.reduceByKey( @(acc,value) acc+value );
    countdata = redrdd.collect();
    
    % Count and display carrier occurrences
    count = 0;
    for i=1:numel(countdata)
        count = count + countdata{i}{2};
        fprintf('\nCarrier Name: %s, Count: %d', countdata{i}{1}, countdata{i}{2});
    end
    fprintf('\n Total count : %d\n', count);
    
    % Delete Spark Context
    delete(sc)

    В целом вы обеспечите указатели функций MATLAB или анонимные функции как входные параметры к Spark методы RDD, известные как transformations и actions. Эти указатели на функцию и анонимные функции выполняются на рабочих по развертываемому приложению.

    Для списка поддерживаемых преобразований RDD и действий, смотрите Преобразования и Действия в разделе Methods класса RDD.

    Для получения дополнительной информации о преобразованиях и действиях, смотрите Основы Spark Apache.

  6. Создавание автономного приложения.

    Используйте команду mcc с флагом -m, чтобы создать автономное приложение. Флаг -m создает стандартный исполняемый файл, который может быть запущен из командной строки. Флаг -a включает зависимый набор данных airlinesmall.csv от папки <matlabroot>/toolbox/matlab/demos. Команда mcc автоматически берет зависимый файл carrierToCount.m, пока это находится в том же самом, работают папка.

    >> mcc -m deployToSparkMlApiLocal.m -a <matlabroot>/toolbox/matlab/demos/airlinesmall.csv

    Команда mcc создает сценарий оболочки run_deployToSparkMlApiLocal.sh, чтобы запустить исполняемый файл deployToSparkMlApiLocal.

    Для получения дополнительной информации смотрите mcc.

  7. Запустите автономное приложение от интерпретатора Linux с помощью следующей команды:

    $ ./run_deployToSparkMlApiLocal.sh /share/MATLAB/MATLAB_Runtime/v91

    /share/MATLAB/MATLAB_Runtime/v91 является аргументом, указывающим на местоположение MATLAB Runtime.

    До выполнения вышеупомянутой команды убедитесь, что файл javaclasspath.txt находится в той же папке как сценарий оболочки и исполняемый файл.

    Ваше приложение не выполнится, если оно не может найти файл javaclasspath.txt.

    Ваше приложение также может не выполниться, если дополнительная строка, содержащая местоположение папки конфигурационных файлов Hadoop, не прокомментирована. Чтобы выполнить ваше приложение на менеджере по кластеру local, эта строка должна быть прокомментирована. Эта строка должна только быть не прокомментирована, если вы планируете выполнение вашего приложения с помощью yarn-client в качестве кластерного менеджера на включенном кластере Hadoop Spark.

  8. Вы будете видеть следующий вывод:

    Carrier Name: 9E, Count: 521
    Carrier Name: AA, Count: 14930
    Carrier Name: AQ, Count: 154
    Carrier Name: AS, Count: 2910
    Carrier Name: B6, Count: 806
    Carrier Name: CO, Count: 8138
    ...
    ...
    ...
    Carrier Name: US, Count: 13997
    Carrier Name: WN, Count: 15931
    Carrier Name: XE, Count: 2357
    Carrier Name: YV, Count: 849
     Total count : 123523

Код:

 deployToSparkMlApiLocal.m

Hadoop YARN

Менеджер по кластеру клиента пряжи представляет включенный кластер Hadoop Spark. Менеджер по кластеру YARN был введен в Hadoop 2.0. Это обычно устанавливается на тех же узлах как HDFS™. Поэтому под управлением Spark на YARN позволяет доступу к Spark данные HDFS легко. В приложениях это обозначается с помощью слова yarn-client.

Поскольку шаги для развертывания вашего приложения с помощью yarn-client в качестве кластерного менеджера подобны использованию локального кластерного менеджера, показанного выше, шагам дарят минимальное обсуждение. Для детального обсуждения каждого шага проверяйте Локальный случай выше.

Примечание

Можно следовать тем же инструкциям, чтобы развернуться, приложения Spark создали использование MATLAB API для Spark к Cloudera® CDH. Чтобы видеть пример на MATLAB Answers™, щелкнуть здесь.

Чтобы использовать зоны шифрования Cloudera CDH, добавьте файл JAR commons-codec-1.9.jar в статический путь к классу MATLAB Runtime. Местоположение файла: $HADOOP_PREFIX/lib/commons-codec-1.9.jar, где $HADOOP_PREFIX является местоположением, где Hadoop установлен.

Предпосылки

  1. Запустите этот пример путем создания новой папки работы, которая видима к пути поиска файлов MATLAB.

  2. Установите MATLAB Runtime в папке, которая доступна каждым узлом рабочего в кластере Hadoop. Этот пример использует /share/MATLAB/MATLAB_Runtime/v91 в качестве местоположения папки MATLAB Runtime.

    Если у вас нет MATLAB Runtime, можно загрузить его с веб-сайта в: https://www.mathworks.com/products/compiler/mcr.

  3. Скопируйте airlinesmall.csv в папку Hadoop Distributed File System (HDFS) /user/<username>/datasets. Здесь <username> относится к вашему имени пользователя в HDFS.

    $ ./hadoop fs -copyFromLocal airlinesmall.csv hdfs://host:54310/user/<username>/datasets

Процедура

  1. Настройте переменную окружения, HADOOP_PREFIX, чтобы указать на вашу папку установки Hadoop. Эти свойства необходимы для представления заданий к вашему кластеру Hadoop.

    setenv('HADOOP_PREFIX','/share/hadoop/hadoop-2.6.0')

    Переменная окружения HADOOP_PREFIX должна собираться при использовании функции datastore MATLAB указать на данные по HDFS. Установка этой переменной окружения не имеет никакого отношения к Spark. Смотрите Отношение Между Spark и Hadoop для получения дополнительной информации.

  2. Свойства Specify Spark.

    Используйте объект containers.Map задать свойства Spark.

    sparkProperties = containers.Map( ...
        {'spark.executor.cores',...
        'spark.executor.memory',...
        'spark.yarn.executor.memoryOverhead',...
        'spark.dynamicAllocation.enabled',...
        'spark.shuffle.service.enabled',...
        'spark.eventLog.enabled',...
        'spark.eventLog.dir'}, ...
        {'1',...
        '2g',...
        '1024',...
        'true',...
        'true',...
        'true',...
        'hdfs://hadoop01glnxa64:54310/user/<username>/sparkdeploy'});

    Для получения дополнительной информации о свойствах Spark расширьте значение prop пары "имя-значение" 'SparkProperties' в разделе Input Arguments класса SparkConf.

  3. Создайте объект SparkConf.

    Используйте класс matlab.compiler.mlspark.SparkConf, чтобы создать объект SparkConf.

    conf = matlab.compiler.mlspark.SparkConf( ...
        'AppName','myApp', ...
        'Master','yarn-client', ...
        'SparkProperties',sparkProperties);

    Для получения дополнительной информации о SparkConf смотрите matlab.compiler.mlspark.SparkConf.

  4. Создайте объект SparkContext.

    Используйте класс matlab.compiler.mlspark.SparkContext с объектом SparkConf как вход, чтобы создать объект SparkContext.

    sc = matlab.compiler.mlspark.SparkContext(conf);

    Для получения дополнительной информации о SparkContext смотрите matlab.compiler.mlspark.SparkContext.

  5. Создайте объект RDD из данных.

    Используйте функцию MATLAB datastore, чтобы создать объект datastore, указывающий на файл airlinesmall.csv в HDFS. Затем используйте метод SparkContext datastoreToRDD, чтобы преобразовать объект datastore в объект Spark RDD.

    % Create a MATLAB datastore (HADOOP)
    ds = datastore(...
        'hdfs:///user/<username>/datasets/airlinesmall.csv',...
        'TreatAsMissing','NA',...
        'SelectedVariableNames','UniqueCarrier');
    
    % Convert MATLAB datastore to Spark RDD
    rdd = sc.datastoreToRDD(ds);

    В целом вход RDDs может быть создан с помощью следующих методов класса SparkContext: parallelize, datastoreToRDD и textFile.

  6. Выполните операции на объекте RDD.

    Используйте Spark метод RDD, такой как flatMap, чтобы применить функцию ко всем элементам объекта RDD и сгладить результаты. Функциональный carrierToCount, который был создан ранее, служит функцией, которая будет прикладной к элементам RDD. Указатель на функцию к функциональному carrierToCount передается как входной параметр методу flatMap.

    maprdd = rdd.flatMap(@carrierToCount);
    redrdd = maprdd.reduceByKey( @(acc,value) acc+value );
    countdata = redrdd.collect();
    
    % Count and display carrier occurrences
    count = 0;
    for i=1:numel(countdata)
        count = count + countdata{i}{2};
        fprintf('\nCarrier Code: %s, Count: %d', countdata{i}{1}, countdata{i}{2});
    end
    fprintf('\n Total count : %d\n', count);
    
    % Save results to MAT file
    save('countdata.mat','countdata');
    
    % Delete Spark Context
    delete(sc);

    Для списка поддерживаемых преобразований RDD и действий, смотрите Преобразования и Действия в разделе Methods класса RDD.

    Для получения дополнительной информации о преобразованиях и действиях, смотрите Основы Spark Apache.

  7. Создавание автономного приложения.

    Используйте команду mcc с флагом -m, чтобы создать автономное приложение. Флаг -m создает автономное приложение, которое может быть запущено из командной строки. Вы не должны присоединять набор данных airlinesmall.csv, поскольку он находится на HDFS. Команда mcc автоматически берет зависимый файл carrierToCount.m, пока это находится в том же самом, работают папка.

    >> mcc -m deployToSparkMlApiHadoop.m

    Команда mcc создает сценарий оболочки run_deployToSparkMlApiHadoop.sh, чтобы запустить исполняемый файл deployToSparkMlApiHadoop.

    Для получения дополнительной информации смотрите mcc.

  8. Запустите автономное приложение от интерпретатора Linux с помощью следующей команды:

    $ ./run_deployToSparkMlApiHadoop.sh /share/MATLAB/MATLAB_Runtime/v91

    /share/MATLAB/MATLAB_Runtime/v91 является аргументом, указывающим на местоположение MATLAB Runtime.

    До выполнения вышеупомянутой команды убедитесь, что файл javaclasspath.txt находится в той же папке как сценарий оболочки и исполняемый файл.

    Ваше приложение не выполнится, если оно не может найти файл javaclasspath.txt.

    Ваше приложение также может не выполниться, если дополнительная строка, содержащая местоположение папки конфигурационных файлов Hadoop, прокомментирована. Чтобы выполнить ваше приложение на менеджере по кластеру клиента пряжи, эта строка должна быть не прокомментирована. Эта строка должна только быть прокомментирована, если вы планируете выполнение вашего приложения с помощью локального кластерного менеджера.

  9. Вы будете видеть следующий вывод:

    Carrier Name: 9E, Count: 521
    Carrier Name: AA, Count: 14930
    Carrier Name: AQ, Count: 154
    Carrier Name: AS, Count: 2910
    Carrier Name: B6, Count: 806
    Carrier Name: CO, Count: 8138
    ...
    ...
    ...
    Carrier Name: US, Count: 13997
    Carrier Name: WN, Count: 15931
    Carrier Name: XE, Count: 2357
    Carrier Name: YV, Count: 849
     Total count : 123523

    Примечание

    Если развертываемым приложением является функция MATLAB в противоположность скрипту MATLAB, используйте следующий синтаксис выполнения:

    $ ./run_<applicationName>.sh \
      <MATLAB_Runtime_Location> \
      [Spark arguments] \
      [Application arguments]
    Например:
    $ ./run_deployToSparkMlApiHadoop.sh.sh \
       /usr/local/MATLAB/MATLAB_Runtime/v91 \
       yarn-client \
       hdfs://host:54310/user/<username>/datasets/airlinesmall.csv \
       hdfs://host:54310/user/<username>/result

Код:

 deployToSparkMlApiHadoop.m