مقدمه
Apache Kafka یک پلتفرم پردازش رویداد و جریان توزیعشده و متنباز است که با زبان جاوا نوشته شده است. این پلتفرم برای پردازش دادههای زمان واقعی که نیاز به منابع زیادی دارند طراحی شده است و بهطور ذاتی مقیاسپذیر است و از طریق توان عملیاتی و در دسترس بودن بالا بهره میبرد. این پروژه اسکریپتهای شل را برای تولید و مصرف پیامها از یک کلاستر Kafka و انجام وظایف مدیریتی مانند مدیریت موضوعات و پارتیشنها ارائه میدهد. در حالی که این اسکریپتها برای کاوش و آزمایش مفید هستند، در برنامههای دنیای واقعی دسترسی به Kafka بهصورت برنامهنویسی انجام میشود. برای این منظور، Kafka کتابخانههای کلاینت زیادی برای زبانهای برنامهنویسی و محیطهای widely-used فراهم کرده است.
در این آموزش، شما یاد خواهید گرفت که چگونه منابع را در یک کلاستر Kafka با استفاده از KafkaAdminClient API مدیریت کنید. همچنین یاد خواهید گرفت که چگونه اطلاعات مربوط به کلاستر را بهصورت برنامهنویسی بازیابی کنید و چگونه موضوعات را ایجاد، فهرست و حذف کنید. در نهایت، شما با ابزار خط فرمان kcat آشنا خواهید شد که به شما این امکان را میدهد تا بدون وابستگی به جاوا به کلاستر Kafka خود دسترسی پیدا کنید.
در نهایت، شما یاد خواهید گرفت که چگونه Kafka Cruise Control را راهاندازی کنید. این یک دیمون است که بهطور خودکار فرایندهای درونی یک کلاستر Kafka را بهینه میکند و در نتیجه کارایی و قابلیت اطمینان بالاتری به ارمغان میآورد.
پیشنیازها
برای تکمیل این آموزش، شما به موارد زیر نیاز دارید:
• یک ماشین با حداقل ۴ گیگابایت رم و ۲ CPU. در صورتی که از سرور Ubuntu استفاده میکنید، دستورالعملهای راهاندازی اولیه سرور را دنبال کنید.
• نصب Java Development Kit (JDK) نسخه ۱۱ بر روی Droplet یا ماشین محلی شما. برای راهنمای نصب جاوا روی Ubuntu، آموزش “How To Install Java with Apt on Ubuntu” را مشاهده کنید.
• یک کلاستر Apache Kafka با سه بروکر. شما میتوانید برای دستورالعملهای راهاندازی، آموزش “How To Set Up a Multi-Node Kafka Cluster using KRaft” را دنبال کنید.
• یک پروژه جاوا با یک Kafka producer که مطابق با آموزش “How To Set Up a Kafka Producer to Source Data Through CLI” راهاندازی شده باشد.
• آشنایی با ساختار دایرکتوری استاندارد پروژههای جاوا. برای اطلاعات بیشتر، بخش “Introduction to the Standard Directory Layout” را در مستندات رسمی Maven مشاهده کنید.
• نصب Python 3 روی Droplet یا ماشین محلی شما، به همراه راهاندازی یک محیط مجازی جدید. در صورت استفاده از سرور Ubuntu، به آموزش “How To Install Python 3 and Set Up a Programming Environment on an Ubuntu Server” مراجعه کنید. شما فقط باید مراحل ۱ و ۲ را تکمیل کنید. در این آموزش، محیط مجازی به عنوان ~/venv در نظر گرفته میشود.
گام 1 – استفاده از Kafka AdminClient
در این گام، شما باید کلاسی به نام AdminClientDemo.java ایجاد کنید تا از کلاس AdminClient برای مدیریت برنامهنویسی خوشه Kafka استفاده کنید.
1. ایجاد کلاس AdminClientDemo.java
ابتدا به دایرکتوریای که پروژه dokafka در آن قرار دارد بروید. اگر از ساختار پروژه اطلاع ندارید، معمولاً پروژه باید به این شکل باشد:
dokafka/
├── src/
│ └── main/
│ └── java/
│ └── com/
│ └── dokafka/
│ └── AdminClientDemo.java
اکنون فایل AdminClientDemo.java را ایجاد کرده و آن را برای ویرایش باز کنید:
nano src/main/java/com/dokafka/AdminClientDemo.java
سپس کد زیر را به فایل اضافه کنید:
package com.dokafka;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class AdminClientDemo {
private static final Logger log = LoggerFactory.getLogger(AdminClientDemo.class);
public static void main(String[] args) {
String bootstrapServers = “kafka1.your_domain:9092”; // آدرس سرور Kafka خود را جایگزین کنید
Properties properties = new Properties();
properties.put(“bootstrap.servers”, bootstrapServers);
final KafkaAdminClient client = AdminClient.create(properties);
try {
Collection<Node> nodes = client.describeCluster().nodes().get();
if (nodes == null)
log.info(“به نظر میرسد هیچ گرهای در خوشه وجود ندارد!”);
else
log.info(String.format(“تعداد گرهها: %s\n”, nodes.size()));
} catch (Exception e) {
log.error(“خطایی رخ داد”, e);
}
}
}
توضیح کد:
• Logger: از LoggerFactory.getLogger() برای ایجاد یک لاگر استفاده شده است که میتوان از آن برای ثبت پیامها در سطوح مختلف (اطلاعاتی، خطا و غیره) استفاده کرد.
• KafkaAdminClient: با استفاده از KafkaAdminClient میتوان کارهای مدیریتی مختلفی را بر روی خوشه Kafka انجام داد (مانند شرح خوشه، ایجاد یا حذف موضوعات و غیره).
• bootstrap.servers: آدرس خوشه Kafka را مشخص میکند. به یاد داشته باشید که kafka1.your_domain:9092 را با آدرس واقعی سرور Kafka خود جایگزین کنید.
2. ایجاد اسکریپت run-adminclient.sh
حالا باید یک اسکریپت به نام run-adminclient.sh برای کامپایل و اجرای کلاس AdminClientDemo بسازید. این اسکریپت را ایجاد کرده و برای ویرایش باز کنید:
nano run-adminclient.sh
سپس کد زیر را به اسکریپت اضافه کنید:
#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.AdminClientDemo
توضیح کد اسکریپت:
• mvn clean: این دستور برای پاکسازی پروژه از فایلهای کامپایلشده قبلی استفاده میشود.
• mvn package: این دستور برای کامپایل و بستهبندی پروژه استفاده میشود.
• java -cp: این دستور برای اجرای کلاس AdminClientDemo است که در بسته dokafka قرار دارد.
3. اعطای مجوز اجرایی به اسکریپت
بعد از ذخیره و بستن فایل، اسکریپت را قابل اجرا کنید:
chmod +x run-adminclient.sh
4. اجرای اسکریپت
در نهایت، اسکریپت را اجرا کنید:
./run-adminclient.sh
خروجی
…
[main] INFO com.dokafka.AdminClientDemo – تعداد گرهها: 3
AdminClientDemo با موفقیت به نصب Kafka شما متصل شده و تعداد گرهها را بازیابی کرده است.
ایجاد و فهرست کردن موضوعات
KafkaAdminClient متدهای createTopics() و listTopics() را فراهم میکند، که حالا شما از آنها برای ایجاد یک موضوع و سپس فهرست کردن تمام موضوعات موجود در کلاستر استفاده خواهید کرد.
ابتدا برای ویرایش AdminClientDemo آن را باز کنید:
nano src/main/java/com/dokafka/AdminClientDemo.java
کد را به این شکل تغییر دهید:
nano src/main/java/com/dokafka/AdminClientDemo.java
…
try {
NewTopic newTopic = new NewTopic(“newTopic”, 1, (short) 1);
CreateTopicsResult result = client.createTopics(
Collections.singleton(newTopic)
);
result.all().get();
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
Collection<TopicListing> topics = client.listTopics(options).listings().get();
for (TopicListing topic: topics) {
log.info(“Topic: ” + topic.name());
}
} catch (Exception e) {
log.error(“An error occurred”, e);
}
…
در ابتدا یک نمونه از NewTopic ایجاد میکنید، که موضوعی است که باید در کلاستر ایجاد شود. نام آن (“newTopic”) و تعداد بخشها و نسخهها را وارد میکنید که باید به نوع short تبدیل شوند. متد createTopics() یک مجموعه از نمونههای NewTopic را میپذیرد، بنابراین شما یک لیست با یک عنصر که فقط newTopic را شامل میشود، میسازید.
چون این متد به صورت غیر همزمان است، جریان اجرا بلافاصله پس از فراخوانی به main() باز خواهد گشت، در حالی که در پسزمینه ادامه میدهد. از آنجایی که شما قصد دارید موضوعات را در ادامه فهرست کنید، باید منتظر بمانید تا این عملیات با result.all().get() تکمیل شود. result شامل یک لیست از موضوعات ایجاد شده است و برای دریافت همهی آنها باید عملیات تکمیل شود.
سپس یک نمونه از ListTopicsOptions را میسازید که پیکربندی برای فرایند بازیابی موضوعات را نگه میدارد. متد listInternal() را صدا میزنید و true را به آن میدهید تا موضوعاتی که Kafka آنها را به عنوان موارد داخلی در نظر میگیرد نیز دریافت شوند. سپس آن را به client.listTopics() میدهید و فهرستها را از طریق listings() بازیابی میکنید، که در آن حلقه میزنید و نامهای موضوعات را ثبت میکنید.
پس از پایان کار، فایل را ذخیره کرده و آن را ببندید، سپس اسکریپت را اجرا کنید:
./run-adminclient.sh
تمام موضوعات موجود در کلاستر در انتهای خروجی فهرست خواهند شد:
خروجی
…
[main] INFO com.dokafka.AdminClientDemo – Topic: newTopic
[main] INFO com.dokafka.AdminClientDemo – Topic: java_demo
حذف موضوعات
برای حذف یک موضوع (یا چندین موضوع)، میتوانید از متد deleteTopics() در KafkaAdminClient استفاده کنید. حالا از آن برای حذف newTopic که به تازگی ایجاد کردهاید استفاده خواهید کرد. برای ویرایش AdminClientDemo آن را باز کنید:
nano src/main/java/com/dokafka/AdminClientDemo.java
کد مربوط به ایجاد موضوع را با خطوط برجستهشده جایگزین کنید:
nano src/main/java/com/dokafka/AdminClientDemo.java
…
DeleteTopicsResult deleted = client.deleteTopics(Collections.singleton(“newTopic”));
deleted.all().get();
log.info(“موضوع newTopic حذف شد!”);
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
Collection<TopicListing> topics = client.listTopics(options).listings().get();
for (TopicListing topic: topics) {
log.info(“موضوع: ” + topic.name());
}
…
در اینجا، شما یک مجموعه از رشتهها که نمایانگر نامهای موضوعات هستند را به client.deleteTopics() میدهید. سپس منتظر میمانید تا فرایند تکمیل شود با گرفتن deleted.all() که فقط زمانی برمیگردد که تمام موضوعات پردازش شده باشند. پس از پایان کار، فایل را ذخیره کرده و آن را ببندید، سپس اسکریپت run-adminclient.sh را اجرا کنید:
./run-adminclient.sh
خواهید دید که موضوع جدید در فهرست موضوعات موجود نیست:
خروجی
[main] INFO com.dokafka.AdminClientDemo – موضوع newTopic حذف شد!
[main] INFO com.dokafka.AdminClientDemo – موضوع: java_demo
در این مرحله، شما از KafkaAdminClient برای دسترسی به کلاستر خود و بازیابی اطلاعات آن به صورت برنامهنویسی استفاده کردهاید. همچنین نحوه فهرست کردن، ایجاد و حذف موضوعات در کلاستر را مشاهده کردهاید. حالا خواهید آموخت که چگونه از ابزار خط فرمان kcat برای دسترسی به کلاستر خود استفاده کنید.
مرحله 2 – استفاده از kcat برای مدیریت کلاستر
در این مرحله، شما یاد خواهید گرفت که چگونه kcat، ابزاری خط فرمان برای دسترسی و پیکربندی کلاسترهای Kafka را دانلود و نصب کنید، بدون نیاز به وابستگی به Java.
ابتدا باید بسته مناسب برای سیستمعامل خود را نصب کنید. اگر از MacOS استفاده میکنید، میتوانید kcat را با Brew دانلود کنید:
brew install kcat
در سیستمهای Debian و Ubuntu، میتوانید آن را بهطور معمول از طریق apt نصب کنید:
sudo apt install kafkacat
kafkacat نام قدیمی برای kcat است، اما برای سازگاری در مدیر بسته نگهداشته شده است. برای دستورالعملهای نصب در توزیعهای دیگر، به مستندات رسمی مراجعه کنید.
تولید و مصرف پیامها
یکی از عملیاتهای پایهای که اسکریپتهای همراه Kafka اجازه میدهند، پخش دادهها از یک موضوع است. با kcat، میتوانید از چندین موضوع بهطور همزمان پخش کنید:
kcat -b your_broker_address:9092 first_topic second_topic …
اجرای دستور زیر پیامها را از java_demotopic در کنسول پخش میکند:
kcat -b kafka1.your_domain:9092 java_demo
اسکریپت Kafka-console-consumer.sh همچنین به شما این امکان را میدهد که تمام پیامهای موجود در یک موضوع را از ابتدا بخوانید. برای دستیابی به همین هدف با kcat، باید -t را وارد کنید:
kcat -b kafka1.your_domain:9092 -t java_demo
خروجی شامل پیامهایی خواهد بود که شما در بخش پیشنیازها تولید کردهاید:
خروجی
% انتخاب خودکار حالت مصرفکننده (برای لغو از -P یا -C استفاده کنید)
% به انتهای موضوع java_demo [1] در افست 0 رسید
% به انتهای موضوع java_demo [2] در افست 0 رسید
Hello World!
% به انتهای موضوع java_demo [0] در افست 0 رسید
% به انتهای موضوع java_demo [3] در افست 0 رسید
% به انتهای موضوع java_demo [4] در افست 1 رسید
% به انتهای موضوع java_demo [5] در افست 0 رسید
همچنین میتوانید پیامهای مصرف شده را بهصورت JSON خروجی کنید با وارد کردن -J:
kcat -b kafka1.your_domain:9092 -t java_demo -J
خروجی شبیه به این خواهد بود:
خروجی
% انتخاب خودکار حالت مصرفکننده (برای لغو از -P یا -C استفاده کنید)
% به انتهای موضوع java_demo [2] در افست 0 رسید
% به انتهای موضوع java_demo [0] در افست 0 رسید
% به انتهای موضوع java_demo [1] در افست 0 رسید
{“topic”:“java_demo”,“partition”:4,“offset”:0,“tstype”:“create”,“ts”:1714922509999,“broker”:1,“key”:null,“payload”:“Hello World!”}
% به انتهای موضوع java_demo [3] در افست 0 رسید
% به انتهای موضوع java_demo [5] در افست 0 رسید
% به انتهای موضوع java_demo [4] در افست 1 رسید
برای تولید پیامها به یک موضوع، باید -P را وارد کنید تا به حالت تولیدکننده تغییر وضعیت دهید:
kcat -b kafka1.your_domain:9092 -t java_demo -P
مانند kafka-console-producer.sh، kcat پیامها را که با ENTER جدا شدهاند، میپذیرد. برای خروج از پرامپت، میتوانید CTRL + C را فشار دهید و سپس ENTER را بزنید.
شما میتوانید از قابلیت الگوهای kcat برای خروجی گرفتن اطلاعات بیشتر در مورد یک پیام همراه با آن استفاده کنید. برای وارد کردن قالب دلخواه خود، از -f مانند دستور زیر استفاده کنید:
kcat -b kafka1.your_domain:9092 -t java_demo -f ‘Topic %t[%p], offset: %o, key: %k, payload: %S bytes: %s\n’
در اینجا، شما به آن دستور میدهید که تمام پیامها را از ابتدا از java_demotopic بخواند و نام موضوع (%t)، شماره پارتیشن (%p)، افست (%o)، کلید پیام (%k)، طول مقدار (%S) و خود پیام (%s) را خروجی کند.
خروجی شبیه به این خواهد بود:
% انتخاب خودکار حالت مصرفکننده (برای لغو از -P یا -C استفاده کنید)
% به انتهای موضوع java_demo [2] در افست 0 رسید
% به انتهای موضوع java_demo [1] در افست 0 رسید
Topic java_demo[4], offset: 0, key: , payload: 12 bytes: Hello World!
% به انتهای موضوع java_demo [0] در افست 0 رسید
% به انتهای موضوع java_demo [3] در افست 0 رسید
% به انتهای موضوع java_demo [4] در افست 1 رسید
% به انتهای موضوع java_demo [5] در افست 0 رسید
kcat از قالب رشتهای استفاده کرد و اطلاعات اضافی در مورد رکورد پیام را خروجی کرد.
فهرستکردن متاداده کلاستر
شما میتوانید متاداده کلاستر را بهطور منظم با وارد کردن -L فهرست کنید:
kcat -b kafka1.your_domain:9092 -L
خروجی تمام بروکرهای کلاستر را بههمراه تمام موضوعها و پارتیشنهایشان فهرست میکند:
خروجی
3 بروکر:
بروکر 1 در kafka1.your_domain:9092
بروکر 2 در kafka2.your_domain:9092 (کنترلکننده)
بروکر 3 در kafka3.your_domain:9092
1 موضوع:
موضوع “java_demo” با 6 پارتیشن:
پارتیشن 0، رهبر 3، نسخهها: 3,1، نمایندگان همزمان: 3,1
پارتیشن 1، رهبر 1، نسخهها: 1,2، نمایندگان همزمان: 1,2
پارتیشن 2، رهبر 2، نسخهها: 2,3، نمایندگان همزمان: 2,3
پارتیشن 3، رهبر 2، نسخهها: 2,1، نمایندگان همزمان: 2,1
پارتیشن 4، رهبر 1، نسخهها: 1,3، نمایندگان همزمان: 1,3
پارتیشن 5، رهبر 3، نسخهها: 3,2، نمایندگان همزمان: 3,2
بهطور مشابه با اسکریپت Kafka-topics.sh، kcat رهبران پارتیشنها، نسخهها و مجموعههای نسخههای همزمان را فهرست میکند.
همچنین میتوانید با وارد کردن -J خروجی را بهصورت JSON دریافت کنید:
kcat -b kafka1.your_domain:9092 -L -J
خروجی شبیه به این خواهد بود:
خروجی
{
“originating_broker”: {
“id”: 2,
“name”: “kafka2.your_domain:9092/2”
},
“query”: {
“topic”: “*”
},
“controllerid”: 3,
“brokers”: [
{
“id”: 1,
“name”: “kafka1.your_domain:9092”
},
{
“id”: 2,
“name”: “kafka2.your_domain:9092”
},
{
“id”: 3,
“name”: “kafka3.your_domain:9092”
}
],
“topics”: [
{
“topic”: “java_demo”,
“partitions”: [
{
“partition”: 0,
“leader”: 3,
“replicas”: [
{
“id”: 3
}
],
“isrs”: [
{
“id”: 3
}
]
},
…
]
}
]
}
در این مرحله، شما kcat را نصب کردهاید، ابزاری برای دسترسی و مدیریت آسان کلاسترهای Kafka بدون نیاز به Java. شما یاد گرفتید که چگونه اطلاعاتی در مورد کلاستر و موضوعهای آن بهدست آورید، همچنین چگونگی تولید و مصرف پیامها را آموختید. اکنون خواهید آموخت که چگونه با Cruise Control، تعادل مجدد مناسب درون کلاستر را خودکار کنید.
گام 3 – خودکارسازی تغییرات تعادل با Kafka Cruise Control
Cruise Control یک پروژه متنباز است که توسط LinkedIn توسعه داده شده و بهطور مداوم فعالیتهای بروکرهای Kafka در یک کلاستر را نظارت میکند و بار کاری را دوباره تعادل میکند تا مصرف منابع و کارایی را بهینهسازی کند. در این گام، شما یاد خواهید گرفت که چگونه آن را برای کلاستر خود فعال کنید و عملیاتهای آن را نظارت کنید.
بهطور پیشفرض، Cruise Control اهداف معقولی برای اطمینان از عملکرد بهینه کلاستر دارد که به آنها “goal” گفته میشود. برخی از اهداف اصلی شامل اطمینان از داشتن تعداد مناسب رپلیکای هر موضوع، حفظ تعادل مصرف CPU، شبکه و دیسک و محدود کردن آنها تحت مقادیر مشخص، و داشتن رپلیکای کافی برای پارتیشنهای موضوع است. پیادهسازی اهداف سفارشی نیز پشتیبانی میشود.
شما باید Cruise Control را از منبع کامپایل کنید. ابتدا، مخزن رسمی Git را با استفاده از دستور زیر کلون کنید:
به آن بروید:
cd cruise-control
سپس از Gradle برای کامپایل پروژه استفاده کنید:
./gradlew jar
فرآیند ساخت زمانبر خواهد بود. انتهای خروجی مشابه این خواهد بود:
BUILD SUCCESSFUL in 2m 41s
17 actionable tasks: 17 executed
اکنون Cruise Control به همراه گزارشدهنده متریکهای آن کامپایل شده است که در مسیر cruise-control-metrics-reporter/build/libs/ در یک فایل JAR قرار دارد. این گزارشدهنده متریکهایی در مورد یک بروکر به یک موضوع در کلاستر ارسال میکند که Cruise Control میتواند آنها را نظارت کند.
سپس، دستور زیر را برای کپی کردن تمام وابستگیها به دایرکتوری هدف اجرا کنید:
./gradlew jar copyDependantLibs
خروجی به شکل زیر خواهد بود:
BUILD SUCCESSFUL in 15s
17 actionable tasks: 1 executed, 16 up-to-date
در مرحله بعد، شما باید هر بروکر Kafka در کلاستر را برای استفاده از گزارشدهنده متریک جدید پیکربندی کنید. آن را به دایرکتوری libs/ که در آن Kafka نصب شده است کپی کنید:
cp cruise-control-metrics-reporter/build/libs/* /home/kafka/kafka/libs/
سپس، شما باید پیکربندی بروکر Kafka را برای استفاده از گزارشدهنده جدید تغییر دهید. فایل server.properties مربوط به بروکر را برای ویرایش باز کنید:
nano /home/kafka/kafka/config/kraft/server.properties
این خط را در انتهای فایل اضافه کنید:
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
پس از اتمام، فایل را ذخیره و بسته کنید. بروکر را با دستور زیر مجدداً راهاندازی کنید:
sudo systemctl restart kafka
تمام بروکرهای کلاستر باید گزارشدهنده متریک Cruise Control را فعال کرده باشند، بنابراین مراحل بالا را در سایر گرهها نیز تکرار کنید تا گزارشدهنده متریک JAR را دریافت کنید. پس از چند دقیقه، میتوانید با استفاده از دستور kcat موضوعات موجود در کلاستر را فهرست کنید تا تأیید کنید که گزارشدهنده متریک شروع به کار کرده است:
Output
…
topic “__CruiseControlMetrics” with 6 partitions:
partition 0, leader 3, replicas: 3,2 isrs: 3,2
partition 1, leader 2, replicas: 2,3 isrs: 2,3
partition 2, leader 3, replicas: 3,2 isrs: 3,2
partition 3, leader 2, replicas: 2,3 isrs: 2,3
partition 4, leader 2, replicas: 2,3 isrs: 2,3
partition 5, leader 3, replicas: 3,2 isrs: 3,2
برای بهینهسازی صحیح بروکرها، Cruise Control نیاز دارد که از مشخصات سختافزاری هر بروکر آگاه باشد. این پیکربندی در فایلی به نام capacity.json که در پوشه config قرار دارد ذخیره شده است. آن را برای ویرایش باز کنید:
nano config/capacity.json
در حالت پیشفرض، فایل به این شکل است:
{
“brokerCapacities”:[
{
“brokerId”: “-1”,
“capacity”: {
“DISK”: “100000”,
“CPU”: “100”,
“NW_IN”: “10000”,
“NW_OUT”: “10000”
},
“doc”: “This is the default capacity. Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB.”
},
{
“brokerId”: “0”,
“capacity”: {
“DISK”: “500000”,
“CPU”: “100”,
“NW_IN”: “50000”,
“NW_OUT”: “50000”
},
“doc”: “This overrides the capacity for broker 0.”
}
]
}
آرایه brokerCapacities شامل عناصری است که به شناسههای بروکر مرتبط هستند. هرکدام شامل فیلدهایی برای فضای دیسک، تخصیص CPU و ازدیاد شبکه به و از بروکر است.
شما سه بروکر Kafka را در مراحل قبلی استقرار دادهاید. فایل را با افزودن مشخصات صحیح برای هر یک از آنها تغییر دهید:
{
“brokerCapacities”:[
{
“brokerId”: “-1”,
“capacity”: {
“DISK”: “100000”,
“CPU”: “100”,
“NW_IN”: “10000”,
“NW_OUT”: “10000”
},
“doc”: “This is the default capacity. Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB.”
},
{
“brokerId”: “1”,
“capacity”: {
“DISK”: “100000”,
“CPU”: “100”,
“NW_IN”: “10000”,
“NW_OUT”: “10000”
},
“doc”: “”
},
{
“brokerId”: “2”,
“capacity”: {
“DISK”: “100000”,
“CPU”: “100”,
“NW_IN”: “10000”,
“NW_OUT”: “10000”
},
“doc”: “”
},
{
“brokerId”: “3”,
“capacity”: {
“DISK”: “100000”,
“CPU”: “100”,
“NW_IN”: “10000”,
“NW_OUT”: “10000”
},
“doc”: “”
}
]
}
ظرفیتهای دیسک را طبق اندازه Dropletهای خود تنظیم کنید، سپس فایل را ذخیره و بسته کنید.
در ادامه، Cruise Control را پیکربندی میکنید تا به کلاستر Kafka شما در حالت KRaft متصل شود. باید فایل cruisecontrol.properties را در پوشه config ویرایش کنید. برای باز کردن آن دستور زیر را اجرا کنید:
nano config/cruisecontrol.properties
خطوط زیر را پیدا کنید:
# The Kafka cluster to control.
bootstrap.servers=localhost:9092
پارامتر bootstrap.servers مشخص میکند Cruise Control به کدام بروکر Kafka (و در نتیجه کدام کلاستر) متصل شود. آن را به شکل زیر تغییر دهید:
# The Kafka cluster to control.
bootstrap.servers=kafka1.your_domain:9092
# Switch to KRaft mode
kafka.broker.failure.detection.enable=true
پارامتر kafka.broker.failure.detection.enable=true به Cruise Control میگوید که تنظیمات ZooKeeper را نادیده بگیرد و بهصورت بومی از حالت KRaft استفاده کند. عبارت kafka1.your_domain را با نام دامنه یا آدرس بروکر Kafka خود جایگزین کنید.
سپس خطوط زیر را پیدا کنید:
# The configuration for the BrokerCapacityConfigFileResolver (supports JBOD, non-JBOD, and heterogeneous CPU core capacities)
#capacity.config.file=config/capacity.json
capacity.config.file=config/capacityJBOD.json
و آنها را به شکل زیر تغییر دهید تا فایل پیکربندی اصلی ظرفیتها فعال شود:
# The configuration for the BrokerCapacityConfigFileResolver (supports JBOD, non-JBOD, and heterogeneous CPU core capacities)
capacity.config.file=config/capacity.json
#capacity.config.file=config/capacityJBOD.json
در اینجا، فایل capacity.json (واقع در پوشه config/) به عنوان مرجع ظرفیت بروکرها تنظیم شده است.
پس از اتمام، فایل را ذخیره کرده و ببندید.
اکنون که Cruise Control پیکربندی شده است، آن را در یک ترمینال جداگانه با دستور زیر اجرا کنید:
./kafka-cruise-control-start.sh config/cruisecontrol.properties
پس از اجرای دستور، خروجیهای متعددی در ترمینال نمایش داده خواهد شد، چرا که Cruise Control بهصورت مداوم کلاستر Kafka شما را نظارت و بهینهسازی میکند.
استفاده از Cruise Control CLI
Cruise Control یک API مبتنی بر REST روی پورت ۹۰۹۰ ارائه میدهد که برای پیکربندی و کارهای مدیریتی استفاده میشود. اما پروژهی Cruise Control یک ابزار خط فرمان به نام cccli نیز دارد که با زبان Python نوشته شده و بهصورت سادهتر، این API را در قالب دستورات ترمینال در اختیار شما قرار میدهد.
فعالسازی محیط مجازی پایتون
ابتدا وارد محیط مجازی پایتونی شوید که در مراحل قبلی ایجاد کردهاید:
cd ~/venv
با دستور زیر محیط مجازی را فعال کنید:
./bin/activate
سپس با استفاده از pip، ابزار Cruise Control CLI را نصب کنید:
pip install cruise-control-client
خروجی نهایی چیزی شبیه به این خواهد بود:
Installing collected packages: urllib3, idna, charset-normalizer, certifi, requests, cruise-control-client
Successfully installed …
با این نصب، دستور cccli در ترمینال در دسترس قرار میگیرد.
مشاهدهی وضعیت فعلی کلاستر Kafka
برای مشاهده اطلاعات مربوط به وضعیت بار (load) فعلی کلاستر Kafka، دستور زیر را اجرا کنید (به جای localhost:9090 آدرس Cruise Control خود را قرار دهید):
cccli -a localhost:9090 load
خروجی شامل اطلاعاتی مانند استفاده از دیسک، پردازنده، پهنای باند شبکه، تعداد لیدرها و غیره برای هر بروکر است:
HOST BROKER … CPU(%) NW_IN_CAP …
kafka1.your_domain 1 … 0.999 10000.000 …
…
فعالسازی تعمیر خودکار (Self-Healing)
Cruise Control قابلیت «تعمیر خودکار» دارد؛ یعنی در صورت خرابی بروکر یا نقض هدفهای مشخصشده (Goals)، خودش به صورت خودکار وضعیت را اصلاح میکند. برای فعالسازی این ویژگی در زمان خرابی بروکر، دستور زیر را بزنید:
cccli -a localhost:9090 admin –enable-self-healing-for broker_failure
خروجی به صورت زیر خواهد بود و نشان میدهد که تنظیمات تغییر کردهاند:
{
selfHealingEnabledBefore: {BROKER_FAILURE=false},
selfHealingEnabledAfter: {BROKER_FAILURE=true}
}
نتیجهگیری
در این مرحله، شما:
• متریکریپورتر Cruise Control را روی هر نود Kafka نصب کردید.
• فایلهای پیکربندی را ویرایش و Cruise Control را اجرا کردید.
• ابزار cccli را نصب کردید و نحوهی استفاده از آن برای مانیتورینگ و مدیریت کلاستر Kafka را یاد گرفتید.
برای اطلاعات بیشتر، میتوانید به مستندات رسمی Cruise Control مراجعه کنید.
اکنون شما توانایی مدیریت کلاستر Kafka را از طریق کد با استفاده از KafkaAdminClient و ابزارهای جانبی مانند kcat و Cruise Control دارید.
برای امتیاز به این نوشته کلیک کنید!
[کل: 0 میانگین: 0]
نظرات کاربران