пятница, 17 июля 2020 г.

Создание таблицы учета времени на основе ELK и windows

Задача: Предоставить кадрам информацию по входам и выходам пользователей на рабочих станциях под управлением Windows AD.

Реализация: Реализовать решили используя следующие технологии:
Elasticsearch -  механизм индексирования и хранения логов, а также полнотекстового поиска по ней.
Kibana - визуальная часть системы Elasticsearch
Logstash - фильтрация и модификация поступаемых логов в Elasticsearch
winlogbeat - сбор логов по определенным правилам с windows систем.

Т.е. простыми словами, по порядку:
Ставим winlogbeat который собирает логи и передает их в Logstash который их фильтрует и модифицирует, а затем отправляет в Elasticsearch, а через интерфейс Kibana мы можем увидеть результат работы всей этой связки.

Дисклеймер: Я не являюсь специалистом по ELK, поэтому привожу ту реализацию, которую использую в работе. Так же, инструкция не содержит информации о установке пакетов и приложений - информации о том как что ставить, в сети навалом. Рекомендую перед выполнением конфигурирования, поставить все необходимые приложения. ELK на linux машину, и winlogbeat на машину откуда собираем логи.





1. Для начала необходимо настроить сбор логов входов и выходов пользователей на сервере АД. Наша реализация такова, что все пользовательские логи собирались на отдельном сервере, контроллер домена их пересылал на него.

2. Поставим на сервер, где собираются все эти логи, утилиту winlogbeat
Устанавливаем его сразу как службу. Открываем конфиг C:\Program Files\Winlogbeat\winlogbeat.yml и прописываем следующее
winlogbeat.event_logs:
  - name: ForwardedEvents
    event_id: 21, 22, 23, 24, 25, 4800, 4801 #перечисляем ID event которые буем собирать

tags: ["log-msk-01"] #указываем тэг с которым логи будут отправляться в logstash

output.logstash:
  hosts: ["1.1.1.1:5044"] #адрес сервера logstash

logging.level: info
logging.to_files: true
logging.files: #указываем, так же хранить логи в файлах локально.
  path: C:/Program Files/Winlogbeat/logs
  name: winlogbeat
  keepfiles: 7

xpack.monitoring: # эти параметры, по идее можно не указывать
  enabled: true
  elasticsearch:
    hosts: ["http://1.1.1.1:9200"]
Перезапускаем службу.
Советую так же запустить winlogbeat в командной строке, что бы увидеть вывод, корректно ли отрабатывает сбор и пересылка логов.

3. Идём конфигурировать ELK. Установите пакет программ, убедитесь, что можете зайти в Kibana в веб-браузере. Настраиваем logstash

Открываем /etc/logstash/conf.d/input.conf
input {
  beats {
    port => 5044
  }
}

/etc/logstash/conf.d/output.conf
output {
    if "log-msk-01" in [tags] { # тэг который прописывали в winlogbeat
        elasticsearch {
            hosts     => "localhost:9200" #адрес сервера Elasticsearch 
            index    => "log-msk-01" # индекс под которым будут писаться логи
        }
    }
    else { #если тэга нет, все остальные логи кидать в индекс unknown_messages
        elasticsearch {
            hosts     => "localhost:9200"
            index    => "unknown_messages"
        }
    }
        #stdout { codec => rubydebug }
}


/etc/logstash/conf.d/filter.conf
filter {
  date {
        match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ] #добавляем timestamp
  }

  if [winlog][event_id] == 24 { #если id события 24, то добавляем новый параметр со значением Выход
   mutate {
   add_field => { "info" => "Выход" }
  }
 }
  else if [winlog][event_id] == 25 { #если id события 25, то добавляем новый параметр со значением Вход
   mutate {
   add_field => { "info" => "Вход" }
  }
 }
  else if [winlog][event_id] == 4800 {
   mutate {
   add_field => { "info" => "Заблокирован" }
  }
 }
  else if [winlog][event_id] == 4801 {
   mutate {
   add_field => { "info" => "Разблокирован" }
  }
 }

  mutate {
    gsub => [
      "[winlog][user_data][User]", "DOMAIN", "", #удаляем имя домена перед именем пользователя
      "[winlog][user_data][User]", "[\\]", "" #удаляем косую черту разделяющую имя домена и юзернейм
    ]
  }

  mutate {
  copy => {"[winlog][event_data][TargetUserName]" => "uname"} # переносим значение из лога TargetUserName в строку uname
  copy => {"[winlog][user_data][User]" => "uname"} переносим значение из лога User в строку uname

  }
}

Тут требуется несколько пояснений. Формирование логов на windows не поддается никакому здравому смыслу и имя юзернейма может скакать из одного параметра в другой, в зависимости от метода подключения юзера (через РДП или локально). Нам же нужно составить таблицу где был бы столбец юзернеймов, поэтому нам необходимо объединить эти параметры в один. Проще всего это сделать передав оба значения в новую созданную нами строку uname.

4. Идём в kibana и ловим наши логи по пути Management - Index Patterns. Нажимаем кнопку Create index pattern  выбираем наш и log-msk-01 и добавляем.


5. После этого имеет смысл настроить ротацию, удаление ненужных индексов. По уму, нужно устаревшие логи сохранять в отдельный индекс, а уже безнадёжно устаревшие - удалять. Однако, у нас такой задачи нет, нам нужны логи за последние 60 дней, а остальные можно удалять. В последствие, что бы разрузить сервер ELK, будет иметь смысл разделить логи на разные индексы по времени, на каждые 30 дней.

Что бы реализовать удаление, мне помогла статья отсюда https://shiningapples.net/elasticsearch-%D0%BD%D0%B0%D1%81%D1%82%D1%80%D0%BE%D0%B9%D0%BA%D0%B0-ilm/

Опишу частично мои действия:
- Идём в Management -  Index Lifecycle Policies Создаем новую политику.




Указываем сверху имя политики и внизу включаем удаление на 60 дней.

- Далее идём на вкладку Dev Tools и выполняем следующий запрос:

PUT _template/log-msk-01
{
  "index_patterns": ["log-msk-01"],               
  "settings": {
    "index.lifecycle.name": "log-msk-01"
  }
}


- Идем на вкладку  Management - Index Management нажимаем на наш индекс log-msk-01 и в открывшемся сбоку окне нажимаем Manage - Add lifecycle policy и выбираем нашу политику.


На этом всё.

Комментариев нет:

Отправить комментарий