چطور Apache Kafka را به‌صورت حرفه ای مدیریت کنیم ؟

مقدمه
Apache Kafka یک پلتفرم پردازش رویداد و جریان توزیع‌شده و متن‌باز است که با زبان جاوا نوشته شده است. این پلتفرم برای پردازش داده‌های زمان واقعی که نیاز به منابع زیادی دارند طراحی شده است و به‌طور ذاتی مقیاس‌پذیر است و از طریق توان عملیاتی و در دسترس بودن بالا بهره می‌برد. این پروژه اسکریپت‌های شل را برای تولید و مصرف پیام‌ها از یک کلاستر Kafka و انجام وظایف مدیریتی مانند مدیریت موضوعات و پارتیشن‌ها ارائه می‌دهد. در حالی که این اسکریپت‌ها برای کاوش و آزمایش مفید هستند، در برنامه‌های دنیای واقعی دسترسی به Kafka به‌صورت برنامه‌نویسی انجام می‌شود. برای این منظور، Kafka کتابخانه‌های کلاینت زیادی برای زبان‌های برنامه‌نویسی و محیط‌های widely-used فراهم کرده است.
در این آموزش، شما یاد خواهید گرفت که چگونه منابع را در یک کلاستر Kafka با استفاده از KafkaAdminClient API مدیریت کنید. همچنین یاد خواهید گرفت که چگونه اطلاعات مربوط به کلاستر را به‌صورت برنامه‌نویسی بازیابی کنید و چگونه موضوعات را ایجاد، فهرست و حذف کنید. در نهایت، شما با ابزار خط فرمان kcat آشنا خواهید شد که به شما این امکان را می‌دهد تا بدون وابستگی به جاوا به کلاستر Kafka خود دسترسی پیدا کنید.
در نهایت، شما یاد خواهید گرفت که چگونه Kafka Cruise Control را راه‌اندازی کنید. این یک دیمون است که به‌طور خودکار فرایندهای درونی یک کلاستر Kafka را بهینه می‌کند و در نتیجه کارایی و قابلیت اطمینان بالاتری به ارمغان می‌آورد.
پیش‌نیازها
برای تکمیل این آموزش، شما به موارد زیر نیاز دارید:
• یک ماشین با حداقل ۴ گیگابایت رم و ۲ CPU. در صورتی که از سرور Ubuntu استفاده می‌کنید، دستورالعمل‌های راه‌اندازی اولیه سرور را دنبال کنید.
• نصب Java Development Kit (JDK) نسخه ۱۱ بر روی Droplet یا ماشین محلی شما. برای راهنمای نصب جاوا روی Ubuntu، آموزش “How To Install Java with Apt on Ubuntu” را مشاهده کنید.
• یک کلاستر Apache Kafka با سه بروکر. شما می‌توانید برای دستورالعمل‌های راه‌اندازی، آموزش “How To Set Up a Multi-Node Kafka Cluster using KRaft” را دنبال کنید.
• یک پروژه جاوا با یک Kafka producer که مطابق با آموزش “How To Set Up a Kafka Producer to Source Data Through CLI” راه‌اندازی شده باشد.
• آشنایی با ساختار دایرکتوری استاندارد پروژه‌های جاوا. برای اطلاعات بیشتر، بخش “Introduction to the Standard Directory Layout” را در مستندات رسمی Maven مشاهده کنید.
• نصب Python 3 روی Droplet یا ماشین محلی شما، به همراه راه‌اندازی یک محیط مجازی جدید. در صورت استفاده از سرور Ubuntu، به آموزش “How To Install Python 3 and Set Up a Programming Environment on an Ubuntu Server” مراجعه کنید. شما فقط باید مراحل ۱ و ۲ را تکمیل کنید. در این آموزش، محیط مجازی به عنوان ~/venv در نظر گرفته می‌شود.
گام 1 – استفاده از Kafka AdminClient
در این گام، شما باید کلاسی به نام AdminClientDemo.java ایجاد کنید تا از کلاس AdminClient برای مدیریت برنامه‌نویسی خوشه Kafka استفاده کنید.
1. ایجاد کلاس AdminClientDemo.java
ابتدا به دایرکتوری‌ای که پروژه dokafka در آن قرار دارد بروید. اگر از ساختار پروژه اطلاع ندارید، معمولاً پروژه باید به این شکل باشد:
dokafka/
├── src/
│   └── main/
│       └── java/
│           └── com/
│               └── dokafka/
│                   └── AdminClientDemo.java
اکنون فایل AdminClientDemo.java را ایجاد کرده و آن را برای ویرایش باز کنید:
nano src/main/java/com/dokafka/AdminClientDemo.java
سپس کد زیر را به فایل اضافه کنید:
package com.dokafka;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class AdminClientDemo {
  private static final Logger log = LoggerFactory.getLogger(AdminClientDemo.class);
  public static void main(String[] args) {
    String bootstrapServers = “kafka1.your_domain:9092”; // آدرس سرور Kafka خود را جایگزین کنید
    Properties properties = new Properties();
    properties.put(“bootstrap.servers”, bootstrapServers);
    final KafkaAdminClient client = AdminClient.create(properties);
    try {
      Collection<Node> nodes = client.describeCluster().nodes().get();
      if (nodes == null)
        log.info(“به نظر می‌رسد هیچ گره‌ای در خوشه وجود ندارد!”);
      else
        log.info(String.format(“تعداد گره‌ها: %s\n”, nodes.size()));
    } catch (Exception e) {
      log.error(“خطایی رخ داد”, e);
    }
  }
}
توضیح کد:
• Logger: از LoggerFactory.getLogger() برای ایجاد یک لاگر استفاده شده است که می‌توان از آن برای ثبت پیام‌ها در سطوح مختلف (اطلاعاتی، خطا و غیره) استفاده کرد.
• KafkaAdminClient: با استفاده از KafkaAdminClient می‌توان کارهای مدیریتی مختلفی را بر روی خوشه Kafka انجام داد (مانند شرح خوشه، ایجاد یا حذف موضوعات و غیره).
• bootstrap.servers: آدرس خوشه Kafka را مشخص می‌کند. به یاد داشته باشید که kafka1.your_domain:9092 را با آدرس واقعی سرور Kafka خود جایگزین کنید.
2. ایجاد اسکریپت run-adminclient.sh
حالا باید یک اسکریپت به نام run-adminclient.sh برای کامپایل و اجرای کلاس AdminClientDemo بسازید. این اسکریپت را ایجاد کرده و برای ویرایش باز کنید:
nano run-adminclient.sh
سپس کد زیر را به اسکریپت اضافه کنید:
#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.AdminClientDemo
توضیح کد اسکریپت:
• mvn clean: این دستور برای پاکسازی پروژه از فایل‌های کامپایل‌شده قبلی استفاده می‌شود.
• mvn package: این دستور برای کامپایل و بسته‌بندی پروژه استفاده می‌شود.
• java -cp: این دستور برای اجرای کلاس AdminClientDemo است که در بسته dokafka قرار دارد.
3. اعطای مجوز اجرایی به اسکریپت
بعد از ذخیره و بستن فایل، اسکریپت را قابل اجرا کنید:
chmod +x run-adminclient.sh
4. اجرای اسکریپت
در نهایت، اسکریپت را اجرا کنید:
./run-adminclient.sh
خروجی
[main] INFO com.dokafka.AdminClientDemo – تعداد گره‌ها: 3
AdminClientDemo با موفقیت به نصب Kafka شما متصل شده و تعداد گره‌ها را بازیابی کرده است.
ایجاد و فهرست کردن موضوعات
KafkaAdminClient متدهای createTopics() و listTopics() را فراهم می‌کند، که حالا شما از آن‌ها برای ایجاد یک موضوع و سپس فهرست کردن تمام موضوعات موجود در کلاستر استفاده خواهید کرد.
ابتدا برای ویرایش AdminClientDemo آن را باز کنید:
nano src/main/java/com/dokafka/AdminClientDemo.java
کد را به این شکل تغییر دهید:
nano src/main/java/com/dokafka/AdminClientDemo.java
try {
NewTopic newTopic = new NewTopic(“newTopic”, 1, (short) 1);
CreateTopicsResult result = client.createTopics(
Collections.singleton(newTopic)
);
result.all().get();
  ListTopicsOptions options = new ListTopicsOptions();
  options.listInternal(true);
  Collection<TopicListing> topics = client.listTopics(options).listings().get();
  for (TopicListing topic: topics) {
    log.info(“Topic: ” + topic.name());
  }
} catch (Exception e) {
  log.error(“An error occurred”, e);
}
در ابتدا یک نمونه از NewTopic ایجاد می‌کنید، که موضوعی است که باید در کلاستر ایجاد شود. نام آن (“newTopic”) و تعداد بخش‌ها و نسخه‌ها را وارد می‌کنید که باید به نوع short تبدیل شوند. متد createTopics() یک مجموعه از نمونه‌های NewTopic را می‌پذیرد، بنابراین شما یک لیست با یک عنصر که فقط newTopic را شامل می‌شود، می‌سازید.
چون این متد به صورت غیر همزمان است، جریان اجرا بلافاصله پس از فراخوانی به main() باز خواهد گشت، در حالی که در پس‌زمینه ادامه می‌دهد. از آنجایی که شما قصد دارید موضوعات را در ادامه فهرست کنید، باید منتظر بمانید تا این عملیات با result.all().get() تکمیل شود. result شامل یک لیست از موضوعات ایجاد شده است و برای دریافت همه‌ی آن‌ها باید عملیات تکمیل شود.
سپس یک نمونه از ListTopicsOptions را می‌سازید که پیکربندی برای فرایند بازیابی موضوعات را نگه می‌دارد. متد listInternal() را صدا می‌زنید و true را به آن می‌دهید تا موضوعاتی که Kafka آن‌ها را به عنوان موارد داخلی در نظر می‌گیرد نیز دریافت شوند. سپس آن را به client.listTopics() می‌دهید و فهرست‌ها را از طریق listings() بازیابی می‌کنید، که در آن حلقه می‌زنید و نام‌های موضوعات را ثبت می‌کنید.
پس از پایان کار، فایل را ذخیره کرده و آن را ببندید، سپس اسکریپت را اجرا کنید:
./run-adminclient.sh
تمام موضوعات موجود در کلاستر در انتهای خروجی فهرست خواهند شد:
خروجی
[main] INFO com.dokafka.AdminClientDemo – Topic: newTopic
[main] INFO com.dokafka.AdminClientDemo – Topic: java_demo
حذف موضوعات
برای حذف یک موضوع (یا چندین موضوع)، می‌توانید از متد deleteTopics() در KafkaAdminClient استفاده کنید. حالا از آن برای حذف newTopic که به تازگی ایجاد کرده‌اید استفاده خواهید کرد. برای ویرایش AdminClientDemo آن را باز کنید:
nano src/main/java/com/dokafka/AdminClientDemo.java
کد مربوط به ایجاد موضوع را با خطوط برجسته‌شده جایگزین کنید:
nano src/main/java/com/dokafka/AdminClientDemo.java
DeleteTopicsResult deleted = client.deleteTopics(Collections.singleton(“newTopic”));
deleted.all().get();
log.info(“موضوع newTopic حذف شد!”);
  ListTopicsOptions options = new ListTopicsOptions();
  options.listInternal(true);
  Collection<TopicListing> topics = client.listTopics(options).listings().get();
  for (TopicListing topic: topics) {
    log.info(“موضوع: ” + topic.name());
  }
در اینجا، شما یک مجموعه از رشته‌ها که نمایانگر نام‌های موضوعات هستند را به client.deleteTopics() می‌دهید. سپس منتظر می‌مانید تا فرایند تکمیل شود با گرفتن deleted.all() که فقط زمانی برمی‌گردد که تمام موضوعات پردازش شده باشند. پس از پایان کار، فایل را ذخیره کرده و آن را ببندید، سپس اسکریپت run-adminclient.sh را اجرا کنید:
./run-adminclient.sh
خواهید دید که موضوع جدید در فهرست موضوعات موجود نیست:
خروجی
[main] INFO com.dokafka.AdminClientDemo – موضوع newTopic حذف شد!
[main] INFO com.dokafka.AdminClientDemo – موضوع: java_demo
در این مرحله، شما از KafkaAdminClient برای دسترسی به کلاستر خود و بازیابی اطلاعات آن به صورت برنامه‌نویسی استفاده کرده‌اید. همچنین نحوه فهرست کردن، ایجاد و حذف موضوعات در کلاستر را مشاهده کرده‌اید. حالا خواهید آموخت که چگونه از ابزار خط فرمان kcat برای دسترسی به کلاستر خود استفاده کنید.
مرحله 2 – استفاده از kcat برای مدیریت کلاستر
در این مرحله، شما یاد خواهید گرفت که چگونه kcat، ابزاری خط فرمان برای دسترسی و پیکربندی کلاسترهای Kafka را دانلود و نصب کنید، بدون نیاز به وابستگی به Java.
ابتدا باید بسته مناسب برای سیستم‌عامل خود را نصب کنید. اگر از MacOS استفاده می‌کنید، می‌توانید kcat را با Brew دانلود کنید:
brew install kcat
در سیستم‌های Debian و Ubuntu، می‌توانید آن را به‌طور معمول از طریق apt نصب کنید:
sudo apt install kafkacat
kafkacat نام قدیمی برای kcat است، اما برای سازگاری در مدیر بسته نگه‌داشته شده است. برای دستورالعمل‌های نصب در توزیع‌های دیگر، به مستندات رسمی مراجعه کنید.
تولید و مصرف پیام‌ها
یکی از عملیات‌های پایه‌ای که اسکریپت‌های همراه Kafka اجازه می‌دهند، پخش داده‌ها از یک موضوع است. با kcat، می‌توانید از چندین موضوع به‌طور هم‌زمان پخش کنید:
kcat -b your_broker_address:9092 first_topic second_topic …
اجرای دستور زیر پیام‌ها را از java_demotopic در کنسول پخش می‌کند:
kcat -b kafka1.your_domain:9092 java_demo
اسکریپت Kafka-console-consumer.sh همچنین به شما این امکان را می‌دهد که تمام پیام‌های موجود در یک موضوع را از ابتدا بخوانید. برای دستیابی به همین هدف با kcat، باید -t را وارد کنید:
kcat -b kafka1.your_domain:9092 -t java_demo
خروجی شامل پیام‌هایی خواهد بود که شما در بخش پیش‌نیازها تولید کرده‌اید:
خروجی
% انتخاب خودکار حالت مصرف‌کننده (برای لغو از -P یا -C استفاده کنید)
% به انتهای موضوع java_demo [1] در افست 0 رسید
% به انتهای موضوع java_demo [2] در افست 0 رسید
Hello World!
% به انتهای موضوع java_demo [0] در افست 0 رسید
% به انتهای موضوع java_demo [3] در افست 0 رسید
% به انتهای موضوع java_demo [4] در افست 1 رسید
% به انتهای موضوع java_demo [5] در افست 0 رسید
همچنین می‌توانید پیام‌های مصرف شده را به‌صورت JSON خروجی کنید با وارد کردن -J:
kcat -b kafka1.your_domain:9092 -t java_demo -J
خروجی شبیه به این خواهد بود:
خروجی
% انتخاب خودکار حالت مصرف‌کننده (برای لغو از -P یا -C استفاده کنید)
% به انتهای موضوع java_demo [2] در افست 0 رسید
% به انتهای موضوع java_demo [0] در افست 0 رسید
% به انتهای موضوع java_demo [1] در افست 0 رسید
{“topic”:“java_demo”,“partition”:4,“offset”:0,“tstype”:“create”,“ts”:1714922509999,“broker”:1,“key”:null,“payload”:“Hello World!”}
% به انتهای موضوع java_demo [3] در افست 0 رسید
% به انتهای موضوع java_demo [5] در افست 0 رسید
% به انتهای موضوع java_demo [4] در افست 1 رسید
برای تولید پیام‌ها به یک موضوع، باید -P را وارد کنید تا به حالت تولیدکننده تغییر وضعیت دهید:
kcat -b kafka1.your_domain:9092 -t java_demo -P
مانند kafka-console-producer.sh، kcat پیام‌ها را که با ENTER جدا شده‌اند، می‌پذیرد. برای خروج از پرامپت، می‌توانید CTRL + C را فشار دهید و سپس ENTER را بزنید.
شما می‌توانید از قابلیت الگوهای kcat برای خروجی گرفتن اطلاعات بیشتر در مورد یک پیام همراه با آن استفاده کنید. برای وارد کردن قالب دلخواه خود، از -f مانند دستور زیر استفاده کنید:
kcat -b kafka1.your_domain:9092 -t java_demo -f ‘Topic %t[%p], offset: %o, key: %k, payload: %S bytes: %s\n’
در اینجا، شما به آن دستور می‌دهید که تمام پیام‌ها را از ابتدا از java_demotopic بخواند و نام موضوع (%t)، شماره پارتیشن (%p)، افست (%o)، کلید پیام (%k)، طول مقدار (%S) و خود پیام (%s) را خروجی کند.
خروجی شبیه به این خواهد بود:
% انتخاب خودکار حالت مصرف‌کننده (برای لغو از -P یا -C استفاده کنید)
% به انتهای موضوع java_demo [2] در افست 0 رسید
% به انتهای موضوع java_demo [1] در افست 0 رسید
Topic java_demo[4], offset: 0, key: , payload: 12 bytes: Hello World!
% به انتهای موضوع java_demo [0] در افست 0 رسید
% به انتهای موضوع java_demo [3] در افست 0 رسید
% به انتهای موضوع java_demo [4] در افست 1 رسید
% به انتهای موضوع java_demo [5] در افست 0 رسید
kcat از قالب رشته‌ای استفاده کرد و اطلاعات اضافی در مورد رکورد پیام را خروجی کرد.
فهرست‌کردن متاداده کلاستر
شما می‌توانید متاداده کلاستر را به‌طور منظم با وارد کردن -L فهرست کنید:
kcat -b kafka1.your_domain:9092 -L
خروجی تمام بروکرهای کلاستر را به‌همراه تمام موضوع‌ها و پارتیشن‌هایشان فهرست می‌کند:
خروجی
3 بروکر:
بروکر 1 در kafka1.your_domain:9092
بروکر 2 در kafka2.your_domain:9092 (کنترل‌کننده)
بروکر 3 در kafka3.your_domain:9092
1 موضوع:
موضوع “java_demo” با 6 پارتیشن:
پارتیشن 0، رهبر 3، نسخه‌ها: 3,1، نمایندگان هم‌زمان: 3,1
پارتیشن 1، رهبر 1، نسخه‌ها: 1,2، نمایندگان هم‌زمان: 1,2
پارتیشن 2، رهبر 2، نسخه‌ها: 2,3، نمایندگان هم‌زمان: 2,3
پارتیشن 3، رهبر 2، نسخه‌ها: 2,1، نمایندگان هم‌زمان: 2,1
پارتیشن 4، رهبر 1، نسخه‌ها: 1,3، نمایندگان هم‌زمان: 1,3
پارتیشن 5، رهبر 3، نسخه‌ها: 3,2، نمایندگان هم‌زمان: 3,2
به‌طور مشابه با اسکریپت Kafka-topics.sh، kcat رهبران پارتیشن‌ها، نسخه‌ها و مجموعه‌های نسخه‌های هم‌زمان را فهرست می‌کند.
همچنین می‌توانید با وارد کردن -J خروجی را به‌صورت JSON دریافت کنید:
kcat -b kafka1.your_domain:9092 -L -J
خروجی شبیه به این خواهد بود:
خروجی
{
“originating_broker”: {
“id”: 2,
“name”: “kafka2.your_domain:9092/2”
},
“query”: {
“topic”: “*”
},
“controllerid”: 3,
“brokers”: [
{
“id”: 1,
“name”: “kafka1.your_domain:9092”
},
{
“id”: 2,
“name”: “kafka2.your_domain:9092”
},
{
“id”: 3,
“name”: “kafka3.your_domain:9092”
}
],
“topics”: [
{
“topic”: “java_demo”,
“partitions”: [
{
“partition”: 0,
“leader”: 3,
“replicas”: [
{
“id”: 3
}
],
“isrs”: [
{
“id”: 3
}
]
},
]
}
]
}
در این مرحله، شما kcat را نصب کرده‌اید، ابزاری برای دسترسی و مدیریت آسان کلاسترهای Kafka بدون نیاز به Java. شما یاد گرفتید که چگونه اطلاعاتی در مورد کلاستر و موضوع‌های آن به‌دست آورید، همچنین چگونگی تولید و مصرف پیام‌ها را آموختید. اکنون خواهید آموخت که چگونه با Cruise Control، تعادل مجدد مناسب درون کلاستر را خودکار کنید.
گام 3 – خودکارسازی تغییرات تعادل با Kafka Cruise Control
Cruise Control یک پروژه متن‌باز است که توسط LinkedIn توسعه داده شده و به‌طور مداوم فعالیت‌های بروکرهای Kafka در یک کلاستر را نظارت می‌کند و بار کاری را دوباره تعادل می‌کند تا مصرف منابع و کارایی را بهینه‌سازی کند. در این گام، شما یاد خواهید گرفت که چگونه آن را برای کلاستر خود فعال کنید و عملیات‌های آن را نظارت کنید.
به‌طور پیش‌فرض، Cruise Control اهداف معقولی برای اطمینان از عملکرد بهینه کلاستر دارد که به آن‌ها “goal” گفته می‌شود. برخی از اهداف اصلی شامل اطمینان از داشتن تعداد مناسب رپلیکای هر موضوع، حفظ تعادل مصرف CPU، شبکه و دیسک و محدود کردن آن‌ها تحت مقادیر مشخص، و داشتن رپلیکای کافی برای پارتیشن‌های موضوع است. پیاده‌سازی اهداف سفارشی نیز پشتیبانی می‌شود.
شما باید Cruise Control را از منبع کامپایل کنید. ابتدا، مخزن رسمی Git را با استفاده از دستور زیر کلون کنید:
به آن بروید:
cd cruise-control
سپس از Gradle برای کامپایل پروژه استفاده کنید:
./gradlew jar
فرآیند ساخت زمان‌بر خواهد بود. انتهای خروجی مشابه این خواهد بود:
BUILD SUCCESSFUL in 2m 41s
17 actionable tasks: 17 executed
اکنون Cruise Control به همراه گزارش‌دهنده متریک‌های آن کامپایل شده است که در مسیر cruise-control-metrics-reporter/build/libs/ در یک فایل JAR قرار دارد. این گزارش‌دهنده متریک‌هایی در مورد یک بروکر به یک موضوع در کلاستر ارسال می‌کند که Cruise Control می‌تواند آن‌ها را نظارت کند.
سپس، دستور زیر را برای کپی کردن تمام وابستگی‌ها به دایرکتوری هدف اجرا کنید:
./gradlew jar copyDependantLibs
خروجی به شکل زیر خواهد بود:
BUILD SUCCESSFUL in 15s
17 actionable tasks: 1 executed, 16 up-to-date
در مرحله بعد، شما باید هر بروکر Kafka در کلاستر را برای استفاده از گزارش‌دهنده متریک جدید پیکربندی کنید. آن را به دایرکتوری libs/ که در آن Kafka نصب شده است کپی کنید:
cp cruise-control-metrics-reporter/build/libs/* /home/kafka/kafka/libs/
سپس، شما باید پیکربندی بروکر Kafka را برای استفاده از گزارش‌دهنده جدید تغییر دهید. فایل server.properties مربوط به بروکر را برای ویرایش باز کنید:
nano /home/kafka/kafka/config/kraft/server.properties
این خط را در انتهای فایل اضافه کنید:
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
پس از اتمام، فایل را ذخیره و بسته کنید. بروکر را با دستور زیر مجدداً راه‌اندازی کنید:
sudo systemctl restart kafka
تمام بروکرهای کلاستر باید گزارش‌دهنده متریک Cruise Control را فعال کرده باشند، بنابراین مراحل بالا را در سایر گره‌ها نیز تکرار کنید تا گزارش‌دهنده متریک JAR را دریافت کنید. پس از چند دقیقه، می‌توانید با استفاده از دستور kcat موضوعات موجود در کلاستر را فهرست کنید تا تأیید کنید که گزارش‌دهنده متریک شروع به کار کرده است:
Output
  topic “__CruiseControlMetrics” with 6 partitions:
    partition 0, leader 3, replicas: 3,2 isrs: 3,2
    partition 1, leader 2, replicas: 2,3 isrs: 2,3
    partition 2, leader 3, replicas: 3,2 isrs: 3,2
    partition 3, leader 2, replicas: 2,3 isrs: 2,3
    partition 4, leader 2, replicas: 2,3 isrs: 2,3
    partition 5, leader 3, replicas: 3,2 isrs: 3,2
برای بهینه‌سازی صحیح بروکرها، Cruise Control نیاز دارد که از مشخصات سخت‌افزاری هر بروکر آگاه باشد. این پیکربندی در فایلی به نام capacity.json که در پوشه config قرار دارد ذخیره شده است. آن را برای ویرایش باز کنید:
nano config/capacity.json
در حالت پیش‌فرض، فایل به این شکل است:
{
  “brokerCapacities”:[
    {
      “brokerId”: “-1”,
      “capacity”: {
        “DISK”: “100000”,
        “CPU”: “100”,
        “NW_IN”: “10000”,
        “NW_OUT”: “10000”
      },
      “doc”: “This is the default capacity. Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB.”
    },
    {
      “brokerId”: “0”,
      “capacity”: {
        “DISK”: “500000”,
        “CPU”: “100”,
        “NW_IN”: “50000”,
        “NW_OUT”: “50000”
      },
      “doc”: “This overrides the capacity for broker 0.”
    }
  ]
}
آرایه brokerCapacities شامل عناصری است که به شناسه‌های بروکر مرتبط هستند. هرکدام شامل فیلدهایی برای فضای دیسک، تخصیص CPU و ازدیاد شبکه به و از بروکر است.
شما سه بروکر Kafka را در مراحل قبلی استقرار داده‌اید. فایل را با افزودن مشخصات صحیح برای هر یک از آن‌ها تغییر دهید:
{
  “brokerCapacities”:[
    {
      “brokerId”: “-1”,
      “capacity”: {
        “DISK”: “100000”,
        “CPU”: “100”,
        “NW_IN”: “10000”,
        “NW_OUT”: “10000”
      },
      “doc”: “This is the default capacity. Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB.”
    },
    {
      “brokerId”: “1”,
      “capacity”: {
        “DISK”: “100000”,
        “CPU”: “100”,
        “NW_IN”: “10000”,
        “NW_OUT”: “10000”
      },
      “doc”: “”
    },
    {
      “brokerId”: “2”,
      “capacity”: {
        “DISK”: “100000”,
        “CPU”: “100”,
        “NW_IN”: “10000”,
        “NW_OUT”: “10000”
      },
      “doc”: “”
    },
    {
      “brokerId”: “3”,
      “capacity”: {
        “DISK”: “100000”,
        “CPU”: “100”,
        “NW_IN”: “10000”,
        “NW_OUT”: “10000”
      },
      “doc”: “”
    }
  ]
}
ظرفیت‌های دیسک را طبق اندازه Droplet‌های خود تنظیم کنید، سپس فایل را ذخیره و بسته کنید.
در ادامه، Cruise Control را پیکربندی می‌کنید تا به کلاستر Kafka شما در حالت KRaft متصل شود. باید فایل cruisecontrol.properties را در پوشه config ویرایش کنید. برای باز کردن آن دستور زیر را اجرا کنید:
nano config/cruisecontrol.properties
خطوط زیر را پیدا کنید:
# The Kafka cluster to control.
bootstrap.servers=localhost:9092
پارامتر bootstrap.servers مشخص می‌کند Cruise Control به کدام بروکر Kafka در نتیجه کدام کلاستر) متصل شود. آن را به شکل زیر تغییر دهید:
# The Kafka cluster to control.
bootstrap.servers=kafka1.your_domain:9092
# Switch to KRaft mode
kafka.broker.failure.detection.enable=true
پارامتر kafka.broker.failure.detection.enable=true به Cruise Control می‌گوید که تنظیمات ZooKeeper را نادیده بگیرد و به‌صورت بومی از حالت KRaft استفاده کند. عبارت kafka1.your_domain را با نام دامنه یا آدرس بروکر Kafka خود جایگزین کنید.
سپس خطوط زیر را پیدا کنید:
# The configuration for the BrokerCapacityConfigFileResolver (supports JBOD, non-JBOD, and heterogeneous CPU core capacities)
#capacity.config.file=config/capacity.json
capacity.config.file=config/capacityJBOD.json
و آن‌ها را به شکل زیر تغییر دهید تا فایل پیکربندی اصلی ظرفیت‌ها فعال شود:
# The configuration for the BrokerCapacityConfigFileResolver (supports JBOD, non-JBOD, and heterogeneous CPU core capacities)
capacity.config.file=config/capacity.json
#capacity.config.file=config/capacityJBOD.json
در اینجا، فایل capacity.json (واقع در پوشه config/) به عنوان مرجع ظرفیت بروکرها تنظیم شده است.
پس از اتمام، فایل را ذخیره کرده و ببندید.
اکنون که Cruise Control پیکربندی شده است، آن را در یک ترمینال جداگانه با دستور زیر اجرا کنید:
./kafka-cruise-control-start.sh config/cruisecontrol.properties
پس از اجرای دستور، خروجی‌های متعددی در ترمینال نمایش داده خواهد شد، چرا که Cruise Control به‌صورت مداوم کلاستر Kafka شما را نظارت و بهینه‌سازی می‌کند.
استفاده از Cruise Control CLI
Cruise Control یک API مبتنی بر REST روی پورت ۹۰۹۰ ارائه می‌دهد که برای پیکربندی و کارهای مدیریتی استفاده می‌شود. اما پروژه‌ی Cruise Control یک ابزار خط فرمان به نام cccli نیز دارد که با زبان Python نوشته شده و به‌صورت ساده‌تر، این API را در قالب دستورات ترمینال در اختیار شما قرار می‌دهد.
فعال‌سازی محیط مجازی پایتون
ابتدا وارد محیط مجازی پایتونی شوید که در مراحل قبلی ایجاد کرده‌اید:
cd ~/venv
با دستور زیر محیط مجازی را فعال کنید:
./bin/activate
سپس با استفاده از pip، ابزار Cruise Control CLI را نصب کنید:
pip install cruise-control-client
خروجی نهایی چیزی شبیه به این خواهد بود:
Installing collected packages: urllib3, idna, charset-normalizer, certifi, requests, cruise-control-client
Successfully installed …
با این نصب، دستور cccli در ترمینال در دسترس قرار می‌گیرد.
مشاهده‌ی وضعیت فعلی کلاستر Kafka
برای مشاهده اطلاعات مربوط به وضعیت بار (load) فعلی کلاستر Kafka، دستور زیر را اجرا کنید (به جای localhost:9090 آدرس Cruise Control خود را قرار دهید):
cccli -a localhost:9090 load
خروجی شامل اطلاعاتی مانند استفاده از دیسک، پردازنده، پهنای باند شبکه، تعداد لیدرها و غیره برای هر بروکر است:
              HOST         BROKER       …       CPU(%)       NW_IN_CAP       …
kafka1.your_domain    1                …         0.999         10000.000       …
فعال‌سازی تعمیر خودکار (Self-Healing)
Cruise Control قابلیت «تعمیر خودکار» دارد؛ یعنی در صورت خرابی بروکر یا نقض هدف‌های مشخص‌شده (Goals)، خودش به صورت خودکار وضعیت را اصلاح می‌کند. برای فعال‌سازی این ویژگی در زمان خرابی بروکر، دستور زیر را بزنید:
cccli -a localhost:9090 admin –enable-self-healing-for broker_failure
خروجی به صورت زیر خواهد بود و نشان می‌دهد که تنظیمات تغییر کرده‌اند:
{
  selfHealingEnabledBefore: {BROKER_FAILURE=false},
  selfHealingEnabledAfter: {BROKER_FAILURE=true}
}
نتیجه‌گیری
در این مرحله، شما:
• متریک‌ریپورتر Cruise Control را روی هر نود Kafka نصب کردید.
• فایل‌های پیکربندی را ویرایش و Cruise Control را اجرا کردید.
• ابزار cccli را نصب کردید و نحوه‌ی استفاده از آن برای مانیتورینگ و مدیریت کلاستر Kafka را یاد گرفتید.
برای اطلاعات بیشتر، می‌توانید به مستندات رسمی Cruise Control مراجعه کنید.
اکنون شما توانایی مدیریت کلاستر Kafka را از طریق کد با استفاده از KafkaAdminClient و ابزارهای جانبی مانند kcat و Cruise Control دارید.
برای امتیاز به این نوشته کلیک کنید!
[کل: 0 میانگین: 0]

نظرات کاربران

دیدگاهی بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *