Этот пример показывает, как изменить пример MATLAB® составления длинной таблицы, чтобы работать на включенном кластере Hadoop® Spark®. Можно использовать эту длинную таблицу, чтобы создать длинные массивы и вычислить статистические свойства. Можно разработать код локально и затем масштабировать, чтобы использовать в своих интересах возможности, предлагаемые Parallel Computing Toolbox™ и MATLAB Parallel Server™, не имея необходимость переписывать алгоритм. См. также Рабочий процесс Больших данных с использованием высоких массивов и хранилищ данных и Сконфигурируйте Кластер Hadoop (MATLAB Parallel Server)
Во-первых, необходимо установить переменные окружения и кластерные свойства, когда подходящий для определенного Spark включил кластерную конфигурацию Hadoop. Смотрите своего системного администратора для значений для этих и других свойств, необходимых для представления заданий к вашему кластеру.
setenv('HADOOP_HOME', '/path/to/hadoop/install') setenv('SPARK_HOME', '/path/to/spark/install'); cluster = parallel.cluster.Hadoop; % Optionally, if you want to control the exact number of workers: cluster.SparkProperties('spark.executor.instances') = '16'; mapreducer(cluster);
На шаге настройки вы используете mapreducer
, чтобы установить кластерную среду выполнения. На следующем шаге вы создаете длинный массив. Если вы изменяете или удаляете кластерную среду выполнения после создания длинного массива, то длинный массив недопустим, и необходимо воссоздать его.
Если вы хотите разработать в сериале и не использовать локальных рабочих, введите следующую команду.
mapreducer(0);
После установки ваших переменных окружения и кластерных свойств, можно запуститься, пример длинной таблицы MATLAB (MATLAB) на Spark включил кластер Hadoop вместо на локальной машине. Создайте datastore и преобразуйте его в длинную таблицу. MATLAB автоматически запускает задание Spark, чтобы выполнить последующие вычисления на длинной таблице.
ds = datastore('airlinesmall.csv'); varnames = {'ArrDelay', 'DepDelay'}; ds.SelectedVariableNames = varnames; ds.TreatAsMissing = 'NA';
Составьте длинную таблицу tt
от datastore.
tt = tall(ds)
Starting a Spark Job on the Hadoop cluster. This could take a few minutes ...done. tt = M×2 tall table ArrDelay DepDelay ________ ________ 8 12 8 1 21 20 13 12 4 -1 59 63 3 -2 11 -1 : : : :
Отображение указывает, что количество строк, M
, еще не известно. M
является заполнителем, пока вычисление не завершается.
Извлеките задержку прибытия ArrDelay
от длинной таблицы. Это действие создает новую переменную длинного массива, чтобы использовать в последующих вычислениях.
a = tt.ArrDelay;
Можно задать ряд операций на длинном массиве, которые не выполняются, пока вы не вызываете gather
. Выполнение так позволяет вам обрабатывать в пакетном режиме команды, которые могут занять много времени. Как пример, вычислите среднее и стандартное отклонение задержки прибытия. Используйте эти значения, чтобы создать верхние и более низкие пороги для задержек, которые являются в 1 стандартном отклонении среднего значения.
m = mean(a,'omitnan'); s = std(a,'omitnan'); one_sigma_bounds = [m-s m m+s];
Используйте gather
, чтобы вычислить one_sigma_bounds
и загрузить ответ в память.
sig1 = gather(one_sigma_bounds)
Evaluating tall expression using the Spark Cluster: - Pass 1 of 1: Completed in 0.95 sec Evaluation completed in 1.3 sec sig1 = -23.4572 7.1201 37.6975
Можно задать несколько вводов и выводов к gather
, если вы хотите оценить несколько вещей целиком. Выполнение так быстрее, чем вызов gather
отдельно на каждом длинном массиве. Например, вычислите минимальную и максимальную задержку прибытия.
[max_delay, min_delay] = gather(max(a),min(a))
max_delay = 1014 min_delay = -64
Эти примеры занимают больше времени, чтобы завершиться в первый раз, если MATLAB запускается на кластерных рабочих.
При использовании длинных массивов на включенном кластере Hadoop Spark вычислите ресурсы из кластера Hadoop, будет зарезервирован в течение времени жизни mapreducer среды выполнения. Чтобы очистить эти ресурсы, необходимо удалить mapreducer:
delete(gcmr);
mapreducer(0);
datastore
| gather
| mapreducer
| parallel.cluster.Hadoop
| table
| tall