Потоковая обработка данных
01На каждом компьютере ведется журнал авторизации пользователей, в который записывается информация об удачных и неудачных попытках входа на компьютер. У компьютеров с публичным IP адресом этот журнал обновляется постоянно из-за интернет шума и попыток программ-ботов зайти на машину.
На современных Linux системах журнал можно получить в виде потока, обновляемого
в реальном времени, с помощью команды journalctl
. Поскольку на кластере журнал обновляется редко и доступ к нему требует прав администратора,
то вместо этой команды необходимо использовать скрипт-генератор
./generate
из каталога с заданием.
Для того чтобы обработать выходной поток из команды в Spark, необходимо
реализовать класс Receiver
. Следующий фрагмент кода определяет класс
MyReceiver
для получения строк.
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) { override def onStart() { // start a thread to process data } override def onStop() { // stop the thread } } // launch MyReceiver val conf = new SparkConf().setAppName("MyReceiver") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val stream = ssc.receiverStream(new MyReceiver)
Задания
02Для выполнения заданий вам понадобятся следующие директории.
Исходный код | /gfs/bigdata-course/code/spark/logs |
Входные данные | /gfs/bigdata-course/datasets/auth-logs-test |
Задание S3
03 Проанализируйте выходной поток скрипта-генератора и выведите имена пользователей и количество неудачных попыток входа для процесса sshd. Данные должны агрегироваться за 30 секунд.