Обработка текстов

01Простейшая программа Hadoop состоит хотя бы из одной функции map и задания для планировщика, суть которого состоит в вызове этой функции параллельно для каждого элемента данных. Использование функции reduce опционально, и при ее отсутствии выводом программы является результат применения функции map. В общем случае программа Hadoop может состоять из нескольких заданий, между которыми устанавливаются информационные зависимости, позволяющие использовать выходные данные одного множества заданий в качестве входных данных для некоторого другого множества заданий. Таким образом создается конвейер, полностью определяющий все этапы обработки данных, начиная от очистки сырых данных и заканчивая сбором искомой статистики.

На данный момент различают две версии Hadoop (1.x и 2.x/3.x), которые используют разные интерфейсы для описания функций map/reduce и заданий для планирощика (пакеты org.apache.hadoop.mapred и org.apache.hadoop.mapreduce соответственно). Далее будут использоваться примеры, написанные для нового интерфейса. Если имя класса указано без пакета, то предполагается пакет org.apache.hadoop.mapreduce.

Для определения функции map наследуется класс Mapper и реализуется метод map:

public class NewMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    public void map(LongWritable key, Text val, Context ctx)
    throws IOException, InterruptedException {
        ...
    }
}

Первая пара параметров шаблона определяет типы ключей и значений входного набора данных, эти же типы имеют соответственно первый и второй аргументы функции map. По умолчанию, Hadoop считает все входные файлы текстовыми, в качестве ключа используется номер строки, а в качестве значения сама строка. Вторая пара параметров определяет типы ключей и значений выходного набора данных. Эти же типы имеют аргументы метода Context.write, используемого для отправки данных на следующий шаг вычислений (в фукнцию reduce или выходной файл).

Для определения функции reduce наследуется класс Reducer и реализовать метод reduce:

public class NewReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> vals, Context ctx)
    throws IOException, InterruptedException {
        ...
    }
}

Здесь параметры шаблона имеют те же самые значения, что и в методе map, и выходные типы метода map должны соответствовать входным типам метода reduce, для того чтобы их можно было использовать совместно. Первый аргумент метода reduce задает ключ, второй аргумент задает итератор значений, соответствующих этому ключу, а третий — контекст для записи выходных данных.

Вместо стандартных типов языка Java в Hadoop используются их обертки, а также некоторые специальные типы (см. таблицу ниже). Если какой-либо ключ или значение не используются в расчетах, то в качестве типа можно указать Object или NullWritable.

Базовый тип JavaБазовый тип Hadoop
StringText
BooleanBooleanWritable
ByteByteWritable
ShortShortWritable
IntegerIntWritable
LongLongWritable
DoubleDoubleWritable
FloatFloatWritable
Object[]ArrayWritable
MapMapWritable
Базовые типы Java и Hadoop.

Для определения задания для планировщика необходимо создать объект типа Job, инициализировать некоторые из его полей и отправить задачу планировщику на исполнение. Минимальный набор команд представлен ниже.

Job job = Job.getInstance();
job.setJarByClass(HadoopDriver.class);
FileInputFormat.addInputPath(job, new Path(...));
FileOutputFormat.setOutputPath(job, new Path(...));
job.setMapperClass(NewMapper.class);
job.setReducerClass(NewReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.waitForCompletion(true)

Здесь MapOutputKeyClass и MapOutputValueClass должны соответствовать выходным типам метода map или reduce, в зависимости от того, вызывается ли метод reduce или нет. Метод waitForCompletion отправляет задачу на обработку планировщику и ждет ее завершения.

Таким образом, программа Hadoop состоит из набора функций map и reduce, отправляемых на исполнение планировщику в виде заданий, а информационные зависимости между заданиями описываются вручную: путем отправки заданий в очередь и ожидания их завершения.

Более гибкая настройка обработки ключей и значений, поступающих на вход функции map, осуществляется переопределением метода run в классе Mapper. По умолчанию он имеет следующую реализацию.

public void run(Context ctx) throws IOException, InterruptedException {
    setup(ctx);
    try {
        while (ctx.nextKeyValue()) {
            map(ctx.getCurrentKey(), ctx.getCurrentValue(), ctx);
        }
    } finally {
        cleanup(ctx);
    }
}

В теле метода функция map вызывается по порядку для каждой пары ключ-значение, а в начале и конце метода вызываются методы setup и cleanup (по умолчанию они не производят никаких действий, и их тоже можно переопределить при необходимости). При обработке текстов метод run переопределяют для соединения строк друг с другом, чтобы анализировать текст не по строкам, а по предложениям или словосочетаниям.

Задания

02Для выполнения заданий вам понадобятся следующие директории.

Исходный код/gfs/bigdata-course/code/hadoop/wordcount
/gfs/bigdata-course/code/spark/wordcount
Входные данные/gfs/bigdata-course/datasets/text

Задание H1

03Перепишите программу, подсчитывающую частоту использования слов в тексте, используя библиотеку Lucene: учтите все возможные разделители слов и исключите стоп-слова из вывода. Для этого воспользуйтесь стандартным анализатором.

Задание H2

04Измените программу из задания H1, так чтобы посчитать количество слов определенной длины в тексте. Результатом работы программы должна стать таблица следующего вида. Ключ должен иметь целочисленный тип.

длина словаколичество слов такой длины
......
Пример вывода.

Задание H3

05Измените программу из задания H1, так чтобы для каждого слова вывести слово, чаще всего следующее за ним. Учтите знаки препинания, являющиеся разделителями для предложений, чтобы исключить из вывода слова, идущие друг за другом, но находящиеся в разных предложениях.

Задание H4

06Измените программу из предыдущего задания так, чтобы дополнить вывод частотой встречи слов (количество раз, которое слово-значение следует за словом-ключом). Для этого создайте новый выходной тип с соответствующими полями, реализовав интерфейс Writable.

Задание H5

07Измените программу из предыдущего задания так, чтобы упорядочить вывод по убыванию частоты встречи слов. Для этого необходимо реализовать интерфейс WritableComparable у созданного вами типа и дать планировщику дополнительное задание, которое отсортирует данные.

Задание S1

08Найдите самую часто встречающуюся пару идущих друг за другом слов в тексте.

Полезные ссылки