Разбивка и формат входных файлов

01Hadoop разбивает все входные файлы сначала на части, которые описываются одним из подклассов InputSplit, а затем, если это возможно, на записи, которые описываются парой ключ-значение и реализуют интерфейс Writable.

Для задания формата входных файлов в Hadoop необходимо переопределить класс InputFormat и реализовать методы getSplits и createRecordReader. Первый метод задает разбивку входных файлов на отдельные части, имеющие тип InputSplit, а второй создает объект для считывания этих частей. В качестве параметров шаблона этого класса выступают типы ключа и значения, которые будут переданы в функцию map.

Скелет реализации метода getSplits можно записать следующим образом.

for (Path path : FileInputFormat.getInputPaths(ctx)) {
    FileSystem fs = path.getFileSystem(ctx.getConfiguration());
    for (FileStatus file : fs.listStatus(path)) {
        ...
    }
}

Здесь с помощью двух вложенных циклов происходит итерация по всем файлам входных каталогов (без рекурсии). В теле внутреннего цикла обычно производится проверка, имеет ли найденный файл заданный формат. Затем он разбивается на части и записывается в результирующий список всех частей всех файлов.

Для описания частей в пакете org.apache.hadoop.mapreduce.lib.input определены вспомогательные классы. По умолчанию, используется класс FileSplit, который позволяет разделить файл побайтово на произвольное количество частей, имеющих произвольную позицию и размер внутри файла. Этот класс используется как для разбивки бинарных файлов на части, так и для разбивки текстовых файлов на строки. Если единица обработки хранится сразу в нескольких файлах (например, несколько связанных таблиц), то можно воспользоваться классом CombineFileSplit для разбивки множества входных файлов на подмножества, содержащие связанные друг с другом файлы.

Для того чтобы программа работала максимально эффективно, в этих классах необходимо задать узлы кластера, на которых хранятся конкретные части файла. Это можно сделать с помощью метода FileSystem.getFileBlockLocations. Когда единица обработки находится в нескольких файлах, то нагрузку между узлами следует распределить вручную:

// количество частей, которые будут обработаны на конкретном узле (вес узла)
Map<String,Integer> nodeWeights = new HashMap<>();
// цикл по всем частям, каждая из которых состоит из нескольких файлов
for (List<FileStatus> files : all_parts) {
  // множество узлов, на каждом из которых хранится хотя бы одна часть хотя бы одного входного файла
  Set<String> locations = new HashSet<>();
  // проходим по всем файлам, которые составляют одну часть
  // и заполняем множество узлов
  for (int i=0; i<nfiles; ++i) {
    ...
  }
  // ищем узел с минимальным весом
  for (String node : locations) {
    ...
  }
  // формируем список узлов для нашей части, в котором ставим найденный узел первым
  ...
  // обновляем вес первого узла
  ...
}

Задача, запущенная на конкретном узле будет читать файлы и с других узлов, поскольку каждая часть состоит из нескольких файлов, которые могут быть расположены на разных узлах.

Для считывания файлов заданного формата переопределяется класс RecordReader. В качестве параметров шаблона этого класса также выступают типы ключа и значения, которые будут переданы в функцию map. RecordReader выступает в роли итератора, попорядку считывающего записи, из которых состоит переданная ему часть файла или подмножество входных файлов. В метод initialize система передает одну из частей, на которые были разбиты входные данные, метод nextKeyValue используется для перехода к следующей записи, а методы getCurrentKey и getCurrentValue для получения текущего ключа и значения. Эти методы вызываются непосредственно перед вызовом функции map и передают в нее возвращаемые значения в качестве аргументов.

Таким образом, для задания нестандартного формата входных файлов необходимо

Задания

02Для выполнения заданий вам понадобятся следующие директории.

Исходный код/gfs/bigdata-course/code/hadoop/spectrum
/gfs/bigdata-course/code/spark/spectrum
Входные данные/gfs/bigdata-course/datasets/ndbc-small

Задание H6

03Произведите предварительную обработку данных NDBC, которые представляют собой записи частотно-направленных спектров морского волнения, полученных со специальных исследовательских буев и закодированных в пяти переменных.

Каждая запись содержит дату измерения и дискретные значения спектра для каждой из частот (диапазон частот и их количество фиксировано и одинаково во всех файлах). Каждый файл содержит записи только одного из буев за промежуток времени равный одному году.

Все файлы хранятся заархивированными в формат gzip в директории /gfs/bigdata-course/datasets/ndbc-small. Просмотреть их содержимое можно с помощью команды less, например,

$ less -S /gfs/bigdata-course/datasets/ndbc-small/41013w2012.txt.gz
#YY  MM DD hh mm  .0200  .0325  .0375  .0425  .0475  .0525
2010 01 01 00 00   0.00   0.00   0.00   0.00   0.00   0.00
2010 01 01 01 00   0.00   0.00   0.00   0.00   0.00   0.00
2010 01 01 02 00   0.00   0.00   0.00   0.00   0.00   0.00
2010 01 01 03 00   0.00   0.00   0.00   0.00   0.00   0.00
2010 01 01 04 00   0.00   0.00   0.00   0.00   0.00   0.00
...

В имени каждого файла закодирован номер станции, название переменной и год сбора данных: <номер><переменная><год>.txt.gz. Поскольку спектр закодирован с помощью пяти переменных и данные по каждой из переменных находятся в отдельных файлах, то перед обработкой данные из каждой пятерки файлов нужно объединить в один кортеж, реализовав InputFormat. Затем, записи внутри каждой пятерки необходимо соединить по дате измерения, реализовав RecordReader.

В выборке может не оказаться данных для какой-то из переменных или для определенных дат; неполные данные должны быть отсеяны, поскольку по ним невозможно восстановить спектр. Также, данные для некоторых дат могут дублироваться. Результатом работы программы должен стать список спектров с полным набором данных по всем переменным (функцию map переопределять не нужно).

Чтобы проверить себя, воспользуйтесь командой ./check-spec из каталога с исходным кодом. Например, так:

./check-spec path/to/your/output/dir

Команда выведет разницу между результатом работы вашей программы и правильным результатом. При этом необходимо убрать искусственное ограничение на вывод в маппере.

Задание S2

04Решите задачу H6 на Spark без использования возможностей Hadoop (реализованный формат данных и др.). Для работы с именами файлов используйте стандартный класс File и тот факт, что файлы параллельной файловой системы доступны как обычные файлы с префиксом /gfs. Проверить работу программы можно с помощью команды ./bin/check-spec.

Полезные ссылки