Потоковая обработка данных

01На каждом компьютере ведется журнал авторизации пользователей, в который записывается информация об удачных и неудачных попытках входа на компьютер. У компьютеров с публичным IP адресом этот журнал обновляется постоянно из-за интернет шума и попыток программ-ботов зайти на машину.

На современных Linux системах журнал можно получить в виде потока, обновляемого в реальном времени, с помощью команды journalctl. Поскольку на кластере журнал обновляется редко и доступ к нему требует прав администратора, то вместо этой команды необходимо использовать скрипт-генератор ./generate из каталога с заданием.

Для того чтобы обработать выходной поток из команды в Spark, необходимо реализовать класс Receiver. Следующий фрагмент кода определяет класс MyReceiver для получения строк.

class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {
  override def onStart() {
    // start a thread to process data
  }
  override def onStop() {
    // stop the thread
  }
}

// launch MyReceiver
val conf = new SparkConf().setAppName("MyReceiver")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))
val stream = ssc.receiverStream(new MyReceiver)

Задания

02Для выполнения заданий вам понадобятся следующие директории.

Исходный код/gfs/bigdata-course/code/spark/logs
Входные данные/gfs/bigdata-course/datasets/auth-logs-test

Задание S3

03 Проанализируйте выходной поток скрипта-генератора и выведите имена пользователей и количество неудачных попыток входа для процесса sshd. Данные должны агрегироваться за 30 секунд.

Полезные ссылки