exponenta event banner

Простой подсчет данных с помощью MapReduce

В этом примере показано, как извлечь подмножество большого набора данных.

Существует два аспекта подстановки или выполнения запроса. Выбирается подмножество переменных (столбцов) в наборе данных. Другой - выбор подмножества наблюдений или строк.

В этом примере выбор переменных происходит в определении хранилища данных. (Функция отображения может выполнять дополнительный подвыбор переменных, но это не входит в объем данного примера). В этом примере роль функции карты заключается в выполнении выбора наблюдений. Роль функции уменьшения состоит в объединении поднабранных записей, извлеченных каждым вызовом функции отображения. Этот подход предполагает, что набор данных может помещаться в память после фазы карты.

Подготовка данных

Создание хранилища данных с помощью airlinesmall.csv набор данных. Этот 12-мегабайтный набор данных содержит 29 столбцов полетной информации для нескольких авиаперевозчиков, включая время прилета и вылета. В этом примере используется 15 переменных из 29, доступных в данных.

ds = tabularTextDatastore('airlinesmall.csv', 'TreatAsMissing', 'NA');
ds.SelectedVariableNames = ds.VariableNames([1 2 5 9 12 13 15 16 17 ...
    18 20 21 25 26 27]);
ds.SelectedVariableNames
ans = 1x15 cell
  Columns 1 through 4

    {'Year'}    {'Month'}    {'DepTime'}    {'UniqueCarrier'}

  Columns 5 through 8

    {'ActualElapsedTime'}    {'CRSElapsedTime'}    {'ArrDelay'}    {'DepDelay'}

  Columns 9 through 13

    {'Origin'}    {'Dest'}    {'TaxiIn'}    {'TaxiOut'}    {'CarrierDelay'}

  Columns 14 through 15

    {'WeatherDelay'}    {'NASDelay'}

Хранилище данных обрабатывает 'NA' значения как отсутствующие и заменяет отсутствующие значения на NaN значения по умолчанию. Кроме того, SelectedVariableNames позволяет работать только с указанными интересующими переменными, которые можно проверить с помощью preview.

preview(ds)
ans=8×15 table
    Year    Month    DepTime    UniqueCarrier    ActualElapsedTime    CRSElapsedTime    ArrDelay    DepDelay    Origin      Dest      TaxiIn    TaxiOut    CarrierDelay    WeatherDelay    NASDelay
    ____    _____    _______    _____________    _________________    ______________    ________    ________    _______    _______    ______    _______    ____________    ____________    ________

    1987     10        642         {'PS'}                53                 57              8          12       {'LAX'}    {'SJC'}     NaN        NaN          NaN             NaN           NaN   
    1987     10       1021         {'PS'}                63                 56              8           1       {'SJC'}    {'BUR'}     NaN        NaN          NaN             NaN           NaN   
    1987     10       2055         {'PS'}                83                 82             21          20       {'SAN'}    {'SMF'}     NaN        NaN          NaN             NaN           NaN   
    1987     10       1332         {'PS'}                59                 58             13          12       {'BUR'}    {'SJC'}     NaN        NaN          NaN             NaN           NaN   
    1987     10        629         {'PS'}                77                 72              4          -1       {'SMF'}    {'LAX'}     NaN        NaN          NaN             NaN           NaN   
    1987     10       1446         {'PS'}                61                 65             59          63       {'LAX'}    {'SJC'}     NaN        NaN          NaN             NaN           NaN   
    1987     10        928         {'PS'}                84                 79              3          -2       {'SAN'}    {'SFO'}     NaN        NaN          NaN             NaN           NaN   
    1987     10        859         {'PS'}               155                143             11          -1       {'SEA'}    {'LAX'}     NaN        NaN          NaN             NaN           NaN   

Запустить MapReduce

mapreduce функция требует функции отображения и функции уменьшения в качестве входных данных. Преобразователь принимает блоки данных и выводит промежуточные результаты. Редуктор считывает промежуточные результаты и выдает окончательный результат.

В этом примере сопоставитель получает таблицу с переменными, описанными SelectedVariableNames свойство в хранилище данных. Затем картограф извлекает полеты, которые имели высокую величину задержки после отталкивания от ворот. В частности, он идентифицирует полеты длительностью, превышающей в 2,5 раза запланированную продолжительность. Картографист игнорирует полеты до 1995 года, поскольку некоторые из переменных, представляющих интерес для этого примера, не были собраны до этого года.

Отображение файла функции карты.

function subsettingMapper(data, ~, intermKVStore)
  % Select flights from 1995 and later that had exceptionally long
  % elapsed flight times (including both time on the tarmac and time in 
  % the air).
  idx = data.Year > 1994 & (data.ActualElapsedTime - data.CRSElapsedTime)...
    > 1.50 * data.CRSElapsedTime;
  intermVal = data(idx,:);

  add(intermKVStore,'Null',intermVal);
end

Редуктор получает поднаборные наблюдения, полученные от преобразователя, и просто объединяет их в единую таблицу. Редуктор возвращает один ключ (который является относительно бессмысленным) и одно значение (конкатенированная таблица).

Просмотрите файл функции сокращения.

function subsettingReducer(~, intermValList, outKVStore)
  % get all intermediate results from the list
  outVal = {};

  while hasnext(intermValList)
    outVal = [outVal; getnext(intermValList)];
  end
  % Note that this approach assumes the concatenated intermediate values (the
  % subset of the whole data) fit in memory.
    
  add(outKVStore, 'Null', outVal);
end

Использовать mapreduce для применения карты и сокращения функций к хранилищу данных, ds.

result = mapreduce(ds, @subsettingMapper, @subsettingReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce 100%

mapreduce возвращает хранилище выходных данных, result, с файлами в текущей папке.

Показать результаты

Найдите шаблоны в первых 10 переменных, которые были извлечены из набора данных. Эти переменные определяют авиакомпанию, пункт назначения и аэропорты прибытия, а также некоторую базовую информацию о задержках.

r = readall(result);
tbl = r.Value{1};
tbl(:,1:10)
ans=37×10 table
    Year    Month    DepTime    UniqueCarrier    ActualElapsedTime    CRSElapsedTime    ArrDelay    DepDelay    Origin      Dest  
    ____    _____    _______    _____________    _________________    ______________    ________    ________    _______    _______

    1995      6       1601         {'US'}               162                 58            118          14       {'BWI'}    {'PIT'}
    1996      6       1834         {'CO'}               241                 75            220          54       {'IAD'}    {'EWR'}
    1997      1        730         {'DL'}               110                 43            137          70       {'ATL'}    {'GSP'}
    1997      4       1715         {'UA'}               152                 57            243         148       {'IND'}    {'ORD'}
    1997      9       2232         {'NW'}               143                 50            115          22       {'DTW'}    {'CMH'}
    1997     10       1419         {'CO'}               196                 58            157          19       {'DFW'}    {'IAH'}
    1998      3       2156         {'DL'}               139                 49            146          56       {'TYS'}    {'ATL'}
    1998     10       1803         {'NW'}               291                 81            213           3       {'MSP'}    {'ORD'}
    2000      5        830         {'WN'}               140                 55             85           0       {'DAL'}    {'HOU'}
    2000      8       1630         {'CO'}               357                123            244          10       {'EWR'}    {'CLT'}
    2002      6       1759         {'US'}               260                 67            192          -1       {'LGA'}    {'BOS'}
    2003      3       1214         {'XE'}               214                 84            124          -6       {'GPT'}    {'IAH'}
    2003      3        604         {'XE'}               175                 60            114          -1       {'LFT'}    {'IAH'}
    2003      4       1556         {'MQ'}               142                 52            182          92       {'PIA'}    {'ORD'}
    2003      5       1954         {'US'}               127                 48             78          -1       {'RDU'}    {'CLT'}
    2003      7       1250         {'FL'}               261                 95            166           0       {'ATL'}    {'IAD'}
      ⋮

Глядя на первую запись, рейс U.S. Air покинул ворота через 14 минут после запланированного времени вылета и прибыл с опозданием на 118 минут. Рейс испытал задержку в 104 минут после отталкивания от ворот, что является разницей между ActualElapsedTime и CRSElapsedTime.

Есть одна аномальная запись. В феврале 2006 года рейс JetBlue имел время вылета 3:24 утра и истекшее время полёта 1650 минут, но задержку прилёта всего 415 минут. Возможно, произошла ошибка ввода данных.

В противном случае не существует четких схем, касающихся того, когда и где совершаются эти исключительно задержанные рейсы. Ни одна авиакомпания, ни время года, ни время суток, ни один аэропорт не доминируют. Некоторые интуитивные модели, такие как O'Hare (ORD) в зимние месяцы, безусловно, присутствуют.

Шаблоны задержки

Начиная с 1995 года в системе авиаперевозчиков начались данные, включающие измерения того, сколько задержек имело место на фазах такси рейса. Затем, в 2003 году, данные также стали включать определенные причины задержки.

Изучите эти две переменные более подробно.

tbl(:,[1,7,8,11:end])
ans=37×8 table
    Year    ArrDelay    DepDelay    TaxiIn    TaxiOut    CarrierDelay    WeatherDelay    NASDelay
    ____    ________    ________    ______    _______    ____________    ____________    ________

    1995      118          14          7        101          NaN             NaN           NaN   
    1996      220          54         12        180          NaN             NaN           NaN   
    1997      137          70          2         12          NaN             NaN           NaN   
    1997      243         148          4         38          NaN             NaN           NaN   
    1997      115          22          4         98          NaN             NaN           NaN   
    1997      157          19          6         95          NaN             NaN           NaN   
    1998      146          56          9         47          NaN             NaN           NaN   
    1998      213           3         11        205          NaN             NaN           NaN   
    2000       85           0          5         51          NaN             NaN           NaN   
    2000      244          10          4        273          NaN             NaN           NaN   
    2002      192          -1          6        217          NaN             NaN           NaN   
    2003      124          -6         13        131          NaN             NaN           NaN   
    2003      114          -1          8        106          NaN             NaN           NaN   
    2003      182          92          9        106          NaN             NaN           NaN   
    2003       78          -1          5         90          NaN             NaN           NaN   
    2003      166           0         11        170            0               0           166   
      ⋮

Для этих исключительно задержанных рейсов большая часть задержек происходит во время выезда такси, на тармаке. Более того, основной причиной задержки является NASDelay. Задержки в работе НАН налагаются национальными авиационными властями на вылеты, направляющиеся в аэропорт, который, по прогнозам, не сможет справиться со всеми запланированными прибытиями на момент прибытия рейса. Программы задержки NAS, действующие в любой данный момент времени, проводятся по адресу https://www.fly.faa.gov/ois/.

Предпочтительно, когда накладываются задержки NAS, посадка самолета просто задерживается. Такая задержка проявится как задержка отправления. Однако для большинства рейсов, выбранных для этого примера, задержки происходили во многом после вылета из ворот, что приводило к задержке такси.

Повторный запуск MapReduce

Предыдущая функция сопоставления имела критерии подстановки, жестко подключенные к файлу функции. Новая функция карты должна быть написана для любого нового запроса, например, для рейсов, вылетающих из Сан-Франциско в определенный день.

Универсальный сопоставитель может быть более адаптивным путем выделения критериев подстановки из определения функции отображения и использования анонимной функции для настройки сопоставителя для каждого запроса. Этот универсальный сопоставитель использует четвертый входной аргумент, который предоставляет требуемую переменную запроса.

Отображение файла общей функции карты.

function subsettingMapperGeneric(data, ~, intermKVStore, subsetter)
  intermKey = 'Null';
  intermVal = data(subsetter(data), :);
  add(intermKVStore,intermKey,intermVal);
end

Создать анонимную функцию, выполняющую тот же выбор строк, который жестко закодирован в subsettingMapper.

inFlightDelay150percent = ...
   @(data) data.Year > 1994 & ...
   (data.ActualElapsedTime-data.CRSElapsedTime) > 1.50*data.CRSElapsedTime;

С момента mapreduce функция требует, чтобы карта и уменьшить функции, чтобы принять ровно три входа, используйте другую анонимную функцию, чтобы указать четвертый вход для отображения, subsettingMapperGeneric. Впоследствии эту анонимную функцию можно использовать для вызова subsettingMapperGeneric используя только три аргумента (четвертый неявный).

configuredMapper = ...
    @(data, info, intermKVStore) subsettingMapperGeneric(data, info, ...
    intermKVStore, inFlightDelay150percent);

Использовать mapreduce для применения общей функции сопоставления ко входному хранилищу данных.

result2 = mapreduce(ds, configuredMapper, @subsettingReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce 100%

mapreduce возвращает хранилище выходных данных, result2, с файлами в текущей папке.

Проверка результатов

Убедитесь, что общий сопоставитель получает тот же результат, что и в случае с логикой подмножества с жесткой проводкой.

r2 = readall(result2);
tbl2 = r2.Value{1};

if isequaln(tbl, tbl2)
    disp('Same results with the configurable mapper.')
else
    disp('Oops, back to the drawing board.')
end
Same results with the configurable mapper.

Локальные функции

Здесь перечислены функции карты и сокращения, которые mapreduce относится к данным.

function subsettingMapper(data, ~, intermKVStore)
  % Select flights from 1995 and later that had exceptionally long
  % elapsed flight times (including both time on the tarmac and time in 
  % the air).
  idx = data.Year > 1994 & (data.ActualElapsedTime - data.CRSElapsedTime)...
    > 1.50 * data.CRSElapsedTime;
  intermVal = data(idx,:);

  add(intermKVStore,'Null',intermVal);
end
%-------------------------------------------------------------------------
function subsettingReducer(~, intermValList, outKVStore)
  % get all intermediate results from the list
  outVal = {};

  while hasnext(intermValList)
    outVal = [outVal; getnext(intermValList)];
  end
  % Note that this approach assumes the concatenated intermediate values (the
  % subset of the whole data) fit in memory.
    
  add(outKVStore, 'Null', outVal);
end
%-------------------------------------------------------------------------
function subsettingMapperGeneric(data, ~, intermKVStore, subsetter)
  intermKey = 'Null';
  intermVal = data(subsetter(data), :);
  add(intermKVStore,intermKey,intermVal);
end
%-------------------------------------------------------------------------

См. также

|

Связанные темы