Hadoop + HDFS + YARN = большие данные
Что такое большие данные?
In short, the term Big Data applies to information that can’t be processed or analysed using traditional processes or tools.
MapReduce
Алгоритм MapReduce
Алгоритм MapReduce
Алгоритм MapReduce
Алгоритм MapReduce
Алгоритм MapReduce
- Высокий параллелизм по данным.
- Автоматическое обеспечение отказоустойчивости.
- Достаточная универсальность для реализации часто используемых алгоритмов.
Hadoop vs. HPC
Big Data | HPC | ||
---|---|---|---|
Файловая система | Размер блока | 64МБ | 4Кб |
POSIX | нет | да | |
Произвольный доступ | нет | да | |
Репликация | да | да | |
Планировщик | Отказоустойчивость | отдельные процессы | приложения целиком |
Программный интерфейс | YARN API + Hadoop | ??? + MPI | |
Запуск приложений близко к данным | да | нет | |
Hadoop: частота повторения слов
public class MyMapper extends Mapper<LongWritable,Text, Text,IntWritable> { public void map(LongWritable key, Text val, Context ctx) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(val.toString()); while (itr.hasMoreTokens()) { ctx.write(new Text(itr.nextToken()), new IntWritable(1)); } } } public class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> { public void reduce(Text key, Iterable<IntWritable> vals, Context ctx) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } ctx.write(key, new IntWritable(sum)); } }
Hadoop: запуск на кластере
Job j = Job.getInstance(); j.setJarByClass(Main.class); FileInputFormat.addInputPath(j, ...); FileOutputFormat.setOutputPath(j, ...); j.setMapperClass(MyMapper.class); j.setReducerClass(MyReducer.class); j.setMapOutputKeyClass(Text.class); j.setMapOutputValueClass(IntWritable.class); j.waitForCompletion(true);
Экосистема Hadoop
Apache Spark
Наш опыт
Объем данных | 144MB |
Объем несжатых данных | 770MB |
Кол-во станций | 24 |
Временной период | 3 года (2010–2012) |
Общее кол-во спектров | 445422 |
$ cd nm-local-dir/usercache/hadoop/appcache/application_1403952415373_0009 $ du -chd0 * ... 1,4G output ...На диск записывается почти в 10 раз больше промежуточных данных!
Spark vs. Hadoop
Aлгоритм работы Hadoop... while (problem is not solved) { ... read input data from HDFS run map in parallel write intermediate results to HDFS run sort in parallel write intermediate results to HDFS run reduce in parallel write results to HDFS ... } ... | Aлгоритм работы Sparkread input data from HDFS while (problem is not solved) { ... ... run map in parallel write log to HDFS sort in parallel write log to HDFS run reduce in parallel write log to HDFS ... } write results to HDFS |
Как Спарку это удается?
Resilient Distributed Datasets (RDD) — отказоустойчивые распределенные массивы данных.- Обрабатывают только локальные данные,
- поддерживают только крупно-зернистые преобразования,
- при выходе из строя узла пересчитывают потерявшиеся данные.
Преобразованияmap(func) filter(func) flatMap(func) mapPartitions(func) mapPartitionsWithIndex(func) sample(withReplacement, fraction, seed) union(otherDataset) intersection(otherDataset) distinct([numTasks]) groupByKey([numTasks]) reduceByKey(func, [numTasks]) aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) sortByKey([ascending], [numTasks]) join(otherDataset, [numTasks]) ... | Действияreduce(func) collect() count() first() take(n) takeSample(withReplacement, num, [seed]) takeOrdered(n, [ordering]) saveAsTextFile(path) saveAsSequenceFile(path) saveAsObjectFile(path) countByKey() foreach(func) ... |
Spark: частота повторения слов
val file = sc.textFile("..."); file.flatMap(line => line.split("\W+")) .filter(word => !word.isEmpty()) .map(word => (word, 1)) .reduceByKey((a, b) => a + b) .map(pair => pair.swap) .sortByKey(false) .collect() |
Spark: потоковая обработка
val conf = new SparkConf().setAppName("NetWordCount") val ssc = new StreamingContext(conf, Seconds(1)) ssc.socketTextStream("127.0.0.1", 7777, StorageLevel.MEMORY_AND_DISK_SER) .flatMap(_.split(" ")) .map(x => (x, 1)) .reduceByKey(_ + _) .foreachRDD(rdd => { // RDD functions }) ssc.start() ssc.awaitTermination() |