Hadoop + HDFS + YARN = большие данные

Что такое большие данные?

In short, the term Big Data applies to information that can’t be processed or analysed using traditional processes or tools.

Zikopoulos P. et al. Understanding Big Data: Analytics for enterprise class Hadoop and streaming data. McGraw-Hill Osborne Media, 2011.

MapReduce

Dean, J., & Ghemawat, S. (2008). MapReduce: simplified data processing on large clusters. Communications of the ACM, 51(1), 107-113.

Алгоритм MapReduce

Алгоритм MapReduce

Алгоритм MapReduce

Алгоритм MapReduce

Алгоритм MapReduce

  • Высокий параллелизм по данным.
  • Автоматическое обеспечение отказоустойчивости.
  • Достаточная универсальность для реализации часто используемых алгоритмов.

Hadoop vs. HPC

Big DataHPC
Файловая системаРазмер блока64МБ4Кб
POSIXнетда
Произвольный доступнетда
Репликациядада
ПланировщикОтказоустойчивостьотдельные процессыприложения целиком
Программный интерфейсYARN API + Hadoop??? + MPI
Запуск приложений близко к даннымданет

Hadoop: частота повторения слов

public class MyMapper extends Mapper<LongWritable,Text, Text,IntWritable> {
  public void map(LongWritable key, Text val, Context ctx)
  throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(val.toString());
    while (itr.hasMoreTokens()) {
      ctx.write(new Text(itr.nextToken()), new IntWritable(1));
    }
  }
}

public class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
  public void reduce(Text key, Iterable<IntWritable> vals, Context ctx)
  throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) { sum += val.get(); }
    ctx.write(key, new IntWritable(sum));
  }
}

Hadoop: запуск на кластере

Job j = Job.getInstance();
j.setJarByClass(Main.class);
FileInputFormat.addInputPath(j, ...);
FileOutputFormat.setOutputPath(j, ...);
j.setMapperClass(MyMapper.class);
j.setReducerClass(MyReducer.class);
j.setMapOutputKeyClass(Text.class);
j.setMapOutputValueClass(IntWritable.class);
j.waitForCompletion(true);

Экосистема Hadoop

Apache Spark

Spark: Cluster Computing with Working Sets. Matei Zaharia, Mosharaf Chowdhury, Michael J. Franklin, Scott Shenker, Ion Stoica. HotCloud 2010. June 2010.

Наш опыт

Объем данных144MB
Объем несжатых данных770MB
Кол-во станций24
Временной период3 года (2010–2012)
Общее кол-во спектров445422

$ cd nm-local-dir/usercache/hadoop/appcache/application_1403952415373_0009
$ du -chd0 *
...
1,4G   output
...
На диск записывается почти в 10 раз больше промежуточных данных!

Spark vs. Hadoop

Aлгоритм работы Hadoop
...
while (problem is not solved) {
    ...
    read input data from HDFS
    run map in parallel
    write intermediate results to HDFS
    run sort in parallel
    write intermediate results to HDFS
    run reduce in parallel
    write results to HDFS
    ...
}
...
Aлгоритм работы Spark
read input data from HDFS
while (problem is not solved) {
    ...
    ...
    run map in parallel
    write log to HDFS
    sort in parallel
    write log to HDFS
    run reduce in parallel
    write log to HDFS
    ...
}
write results to HDFS

Как Спарку это удается?

Resilient Distributed Datasets (RDD) — отказоустойчивые распределенные массивы данных.
  • Обрабатывают только локальные данные,
  • поддерживают только крупно-зернистые преобразования,
  • при выходе из строя узла пересчитывают потерявшиеся данные.
Преобразования
map(func)
filter(func)
flatMap(func)
mapPartitions(func)
mapPartitionsWithIndex(func)
sample(withReplacement, fraction, seed)
union(otherDataset)
intersection(otherDataset)
distinct([numTasks])
groupByKey([numTasks])
reduceByKey(func, [numTasks])
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks])
join(otherDataset, [numTasks])
...
Действия
reduce(func)
collect()
count()
first()
take(n)
takeSample(withReplacement, num, [seed])
takeOrdered(n, [ordering])
saveAsTextFile(path)
saveAsSequenceFile(path)
saveAsObjectFile(path)
countByKey()
foreach(func)
...

Spark: частота повторения слов

val file = sc.textFile("...");
file.flatMap(line => line.split("\W+"))
    .filter(word => !word.isEmpty())
    .map(word => (word, 1))
    .reduceByKey((a, b) => a + b)
    .map(pair => pair.swap)
    .sortByKey(false)
    .collect()
Граф задач

Spark: потоковая обработка

val conf = new SparkConf().setAppName("NetWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
ssc.socketTextStream("127.0.0.1", 7777,
  StorageLevel.MEMORY_AND_DISK_SER)
    .flatMap(_.split(" "))
    .map(x => (x, 1))
    .reduceByKey(_ + _)
    .foreachRDD(rdd => {
      // RDD functions
    })
ssc.start()
ssc.awaitTermination()
Граф задач
Adam Drake. Command-line Tools can be 235x Faster than your Hadoop Cluster.