Apache Kafka – популярный механизм обмена сообщениями, позволяющий осуществлять их передачу с высокой пропускной способностью. Эта система используется для потоковой аналитики, логирования, передачи данных в хранилище и т.п. Apache Kafka работает как кластер из нескольких брокеров сообщений. Каждый брокер обрабатывает свою определённую часть общей нагрузки. В руководстве мы рассмотрим как установить и начать использовать Apache Kafka на CentOS Stream.
Установка OpenJDK 8
Первым шагом необходимо произвести установку OpenJDK 8 на ваш VPS. Для этого подключитесь к системе с помощью учётной записи пользователя, имеющего привилегии sudo, и запустите команду:
$ sudo dnf install java-1.8.0-openjdk
Чтобы проверить версию установленного пакета, наберите:
$ java -version
Создание учётной записи для Apache Kafka
На этом этапе необходимо создать специального пользователя для Apache Kafka. Для создания нового пользователя (назовём его kafka
) наберите следующую команду:
$ sudo useradd kafka -m
Опция -m
обеспечит нам создание для новой учётной записи своего домашнего каталога /home/kafka
.
Далее, установите пароль для пользователя kafka
:
$ sudo passwd kafka
Ещё, необходимо добавить пользователя kafka
в группу wheel
. Сделать это можно при помощи команды adduser
:
$ sudo usermod -aG wheel kafka
Теперь наш новый пользователь готов. Поэтому с его помощью можно подключиться к системе:
$ su -l kafka
После создания специального пользователя для Kafka мы можем перейти к скачиванию и распаковке файлов Apache Kafka.
Загрузка Apache Kafka
На этом этапе мы загрузим и распакуем файлы Kafka в специальный каталог, который мы создадим в домашней папке пользователя kafka
.
В чистой операционной системе CentOS Stream не установлен архиватор tar
. Если вы не инсталлировали его ранее, то сейчас – самое время сделать это:
$ sudo dnf install tar
В домашней папке пользователя kafka
необходимо создать каталог, в который мы загрузим файл с архивом Kafka. Для этого подключитесь к системе пользователем kafka
и выполните следующие команды:
$ cd ~
$ mkdir Downloads
Используя программу curl
, запустите скачивание архива Kafka:
$ curl "https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz" -o ~/Downloads/kafka.tgz
Теперь в домашнем каталоге создайте папку kafka
. Её мы будем использовать как основу для инсталляции Kafka:
$ mkdir ~/kafka
$ cd ~/kafka
И далее, разархивируйте полученный архив в созданный каталог:
$ tar -xvzf ~/Downloads/kafka.tgz --strip 1
Здесь, опция --strip 1
определяет каталог для разархивируемых файлов именно как /kafka/
без дополнительных подкаталогов.
Настройка конфигурации Kafka
На этом шаге мы изменим дефолтные настройки Kafka, которые не позволяют удалять категории, темы и группы, в которых могут публиковаться сообщения. Чтобы это исправить, необходимо внести коррективы в конфигурационный файл Kafka.
Для этого, используйте штатный текстовый редактор vi
:
$ cd ~/kafka/config
$ sudo vi server.properties
В конец этого файла вставьте следующую строку:
delete.topic.enable = true
После чего, закройте файл server.properties
, сохранив изменения.
Создание unit-файлов и запуск сервера Kafka
В этом разделе настоящего руководства мы создадим специальные unit-файлы для службы Kafka. Это позволит вам выполнять действия, которые выполняются при работе с обычными сервисами – запуск, перезапуск, остановка.
Для управления состоянием и конфигурацией Kafka используется служба Zookeeper. Этот сервис также широко применяется и во многих других распределённых системах в качестве интегрированного компонента.
Сначала, необходимо создать unit-файл для Zookeeper:
$ cd /etc/systemd/system
$ sudo vi zookeeper.service
В этот файл добавьте следующие строки:
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
Здесь, секция [Unit]
указывает на то, что на вашем сервере и сеть, и файловая система должны быть запущены до запуска службы. Секция [Service]
указывает на то, что systemd должен использовать файловые оболочки zookeeper-server-start.sh
и zookeeper-server-stop.sh
для управления службой Zookeeper (запуск, остановка). Также, в этой секции есть указание на перезапуск сервиса при каком-либо его нештатном состоянии.
Закройте файл с сохранением внесённых изменений.
Далее, создайте такой же unit-файл для службы kafka
:
$ cd /etc/systemd/system
$ sudo vi kafka.service
В файл kafka.service
вставьте следующий текст:
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
Здесь, секция [Unit]
указывает на то, что этот файл зависит от файла zookeeper.service
. Это обеспечит автоматический запуск сервиса Zookeeper вместе со службой Kafka. Секция [Service]
указывает на то, что systemd должен использовать файловые оболочки
и kafka-server-stop.sh
для запуска и остановки службы Kafka. Как и в файле kafka-server-start.sh
zookeeper.service
, здесь присутствует указание на перезапуск сервиса при каком-либо его нештатном состоянии.
Выйдите из файла с сохранением изменений.
Теперь службу Kafka необходимо запустить. Для чего используйте команду:
$ sudo systemctl start kafka
Чтобы удостовериться, что служба запустилась успешно, проверьте журнал событий Kafka:
$ journalctl -u kafka
Вывод команды должен выглядеть примерно так:
Если ошибок нет, значит, ваш Kafka запущен и слушает порт 9092.
Теперь осталось только добавить Kafka в автозагрузку:
$ sudo systemctl enable kafka
Проверка установки
В предыдущих разделах вы узнали, как установить и настроить Apache Kafka на вашем CentOS Stream, поэтому настало время увидеть, как это работает.
На этом этапе давайте опубликуем сообщение “Hello, World” для того, чтобы убедиться, что Kafka ведёт себя корректно.
Публикация сообщения в Kafka требует:
- Издатель, который создаёт публикацию записи и данные заголовка.
- Потребитель, который читает сообщения и данные заголовков.
Во-первых, создайте заголовок. В него внесите, например, MyTopic
:
$ ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic MyTopic
Теперь нужно создать издателя. Для чего используйте скрипт kafka-console-producer.sh
. В аргументах нужно указать имя хоста Kafka, порт и заголовок:
$ echo "Hello, My Friend" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic MyTopic > /dev/null
Далее, создайте потребителя при помощи скрипта kafka-console-consumer.sh
. Здесь в аргументах укажите имя хоста ZooKeeper, порт и заголовок.
Следующая инструкция подписывает на сообщения от MyTopic
. Здесь, опция --from-beginning
разрешает поучение сообщений, которые были опубликованы до того, как потребитель был подключён:
$ ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic MyTopic --from-beginning
Выводом этой команды будет появление в командной строку текста полученного сообщения:
Скрипт будет продолжать выполняться принимая сообщения от издателя. Для проверки вы можете открыть ещё один терминал, подключиться к системе пользователем kafka
и от вашего издателя отправьте ещё несколько сообщений. Все они должны отразиться в терминале с вашим потребителем. Для завершения сеанса приёма сообщений нажмите Ctrl-C
.
Настройка многоузлового кластера (опционально)
Если вы хотите создать мультиузловой кластер Kafka, который будет использовать несколько машин, работающих на CentOS Stream, вы должны будете повторить некоторые этапы установки сервиса на каждой из новых систем. А именно, вы должны проделать работы, описанные в разделах:
- Создание учётной записи для Apache Kafka.
- Создание unit-файлов и запуск сервера Kafka.
- Проверка установки.
Дополнительно, вам необходимо будет внести некоторые изменеия в файл server.properties
на каждом из серверов:
- Значение параметра
broker.id
должно быть уникальным для каждой системы внутри кластера. Оно определяет название каждого из серверов (server1
,server2
,server3
и так далее). - Значение параметра
zookeeper.connect
на каждой из машин должно указывать на один и тот же узел ZooKeeper. Формат записи должен соответствовать форме<HOSTNAME/IP_ADDRESS>:<PORT>
, например,10.10.10.1:2181
,10.10.10.2:2181
,10.10.10.3:218
1 и так далее.
Если вы хотите установить несколько узлов ZooKeeper в вашем кластере, вы должны в zookeeper.connect
перечислить IP-адрес и порт каждого из них, разделяя их через запятую.
Ограничение прав пользователя Kafka
По завершении настройки Apache Kafka было бы разумным лишить пользователя kafka
администраторских полномочий.
Для этого подключитесь к системе под именем другой учётной записи, имеющей привилегии sudo, и исключите учётную запись kafka
из группы wheel
:
$ sudo gpasswd -d kafka wheel
В целях повышения уровня безопасности вашей системы следует заблоуировать пароль пользователя kafka
, чтобы исключить возможность подключения к вашему серверу под именем учётной записи пользователя kafka
. Сделать это можно набрав:
$ sudo passwd kafka -l
После выполнения этой команды пользователем kafka
можно будет подключиться только учётной записи root
или любой другой, но только если она имеет привилегии sudo
.
Для разблокировки пароля можно применить команду:
$ sudo passwd kafka -u
Заключение
Таким образом, используя данное руководство вы сможете установить сервис Apache Kafka на ваш сервер с CentOS Stream. Больше информации об Apache Kafka вы найдёте на официальной странице проекта.