В этом примере показано, как изменить MATLAB® пример создания длинная таблица для запуска на Spark® включенный Hadoop® кластер. Можно использовать эта длинная таблица для создания длинные массивы и вычисления статистических свойств. Можно разрабатывать код локально, а затем масштабировать, чтобы воспользоваться возможностями, предлагаемыми Parallel Computing Toolbox™ и MATLAB Parallel Server™, не переписывая алгоритм. Смотрите также рабочий процесс Больших данных с использованием высоких массивов и хранилищ данных и настройте кластер Hadoop (MATLAB Parallel Server)
Во-первых, необходимо задать переменные окружения и свойства кластера как соответствующие для конкретного строения кластера Hadoop с поддержкой Spark. Значения этих и других свойств, необходимых для отправки заданий в кластер, см. у системного администратора.
setenv('HADOOP_HOME', '/path/to/hadoop/install') setenv('SPARK_HOME', '/path/to/spark/install'); cluster = parallel.cluster.Hadoop; % Optionally, if you want to control the exact number of workers: cluster.SparkProperties('spark.executor.instances') = '16'; mapreducer(cluster);
Примечание
В шаге настройки вы используете mapreducer
для установки окружения выполнения кластера. На следующем шаге вы создаете длинный массив. Если вы изменяете или удаляете окружение выполнения кластера после создания длинный массив, то длинный массив является недопустимым, и вы должны воссоздать его.
Примечание
Если вы хотите разрабатывать последовательно и не использовать локальные рабочие места, введите следующую команду.
mapreducer(0);
После установки переменных окружения и свойств кластера можно запустить пример длинная таблица MATLAB на кластере Hadoop с поддержкой Spark вместо на локальном компьютере. Создайте datastore и преобразуйте его в длинная таблица. MATLAB автоматически запускает задание Spark, чтобы запустить последующие вычисления на длинная таблица.
ds = datastore('airlinesmall.csv'); varnames = {'ArrDelay', 'DepDelay'}; ds.SelectedVariableNames = varnames; ds.TreatAsMissing = 'NA';
Создайте длинную таблицу tt
из datastore.
tt = tall(ds)
Starting a Spark Job on the Hadoop cluster. This could take a few minutes ...done. tt = M×2 tall table ArrDelay DepDelay ________ ________ 8 12 8 1 21 20 13 12 4 -1 59 63 3 -2 11 -1 : : : :
Отображение указывает, что количество строк, M
, пока не известно. M
является заполнителем до завершения вычисления.
Извлеките задержку прибытия ArrDelay
из длинная таблица. Это действие создает новую переменную длинный массив для использования в последующих вычислениях.
a = tt.ArrDelay;
Можно задать ряд операций на ваш длинный массив, которые не выполняются до вызова gather
. Это позволяет вам пакетировать команды, которые могут занять много времени. В качестве примера вычислите среднее и стандартное отклонение задержки прибытия. Используйте эти значения, чтобы создать верхний и нижний пороги для задержек, которые находятся в пределах 1 стандартного отклонения от среднего.
m = mean(a,'omitnan'); s = std(a,'omitnan'); one_sigma_bounds = [m-s m m+s];
Использовать gather
для вычисления one_sigma_bounds
, и принести ответ в память.
sig1 = gather(one_sigma_bounds)
Evaluating tall expression using the Spark Cluster: - Pass 1 of 1: Completed in 0.95 sec Evaluation completed in 1.3 sec sig1 = -23.4572 7.1201 37.6975
Можно задать несколько входов и выходов, gather
если вы хотите оценить сразу несколько вещей. Сделать это быстрее, чем позвонить gather
отдельно на каждый длинный массив. Например, вычислите минимальную и максимальную задержку прибытия.
[max_delay, min_delay] = gather(max(a),min(a))
max_delay = 1014 min_delay = -64
Примечание
Эти примеры занимают больше времени, чтобы завершить первое время, если MATLAB запускается на рабочих кластерах.
При использовании длинных массивов в кластере Hadoop с поддержкой Spark вычислительные ресурсы из кластера Hadoop будут зарезервированы на время жизни окружения выполнения mapreducer. Чтобы удалить эти ресурсы, вы должны удалить mapreducer:
delete(gcmr);
mapreducer(0);
datastore
| gather
| mapreducer
| parallel.cluster.Hadoop
| table
| tall