ساده‌سازی داده‌ها و نسخه‌بندی با Confluent Schema Registry در Kafka

مقدمه
با رشد کسب‌وکارها، حفظ سازگاری بین فرمت‌های داده‌ای در هنگام استریم رویدادها اهمیت زیادی دارد. داده‌های ذخیره‌شده در موضوعات Apache Kafka تغییرناپذیر هستند و نمی‌توان آنها را به‌طور بازگشتی برای تطبیق با نیازهای فعلی تغییر داد. مقابله با این مشکل زمانی که تغییرات زیادی در اسکیما وجود داشته باشد، می‌تواند چالش‌برانگیز باشد.
Apache Avro یک کتابخانه سریال‌سازی داده است که برای پایپ‌لاین‌های استریم رویداد طراحی شده است. این کتابخانه به شما اجازه می‌دهد ساختارهای پیچیده (که به آنها اسکیما گفته می‌شود) برای داده‌های خود تعریف کنید و قابلیت‌های سریال‌سازی برای حمل و نقل و ذخیره‌سازی کارآمد را فراهم می‌آورد. برای پیگیری اسکیماها و نسخه‌های آن‌ها، Confluent Schema Registry ایجاد شده است. این سرویس به‌عنوان یک مخزن متمرکز برای اسکیماهای شما عمل می‌کند، ذخیره‌سازی آن‌ها را مدیریت می‌کند و سازگاری متقابل را تضمین می‌کند. این امکان را به شما می‌دهد تا بر روی داده‌ها تمرکز کنید به جای آنکه روش‌هایی برای تبدیل دستی نسخه‌های مختلف اسکیما ایجاد کنید.
در این آموزش، شما Confluent Schema Registry را با استفاده از Docker مستقر خواهید کرد و تولیدکننده و مصرف‌کننده Kafka که در آموزش‌های قبلی این سری ایجاد کرده‌اید را گسترش خواهید داد. شما این بخش‌ها را دوباره طراحی خواهید کرد تا اشیائی را که با اسکیماهایی که خودتان تعریف کرده‌اید مطابقت دارند، ایجاد و مصرف کنند. همچنین اسکیما را تغییر خواهید داد و یاد خواهید گرفت که چگونه آن را تکامل دهید بدون اینکه داده‌های متناسب با نسخه‌های قبلی شکسته شوند.
الزامات
برای دنبال کردن این آموزش، شما به موارد زیر نیاز دارید:
• نصب Docker بر روی دستگاه خود. برای سیستم‌عامل Ubuntu، به آموزش “How To Install and Use Docker on Ubuntu” مراجعه کنید. شما فقط نیاز به انجام مرحله اول و دوم این آموزش دارید.
• نصب Docker Compose بر روی دستگاه خود. برای سیستم‌عامل Ubuntu، مراحل اول و دوم آموزش “How To Install and Use Docker Compose on Ubuntu” را دنبال کنید.
• یک پروژه Java با Kafka producer که طبق راهنمای “How To Set Up a Kafka Producer to Source Data Through CLI” راه‌اندازی شده باشد.
• یک پروژه Java با Kafka consumer که طبق راهنمای “How To Set Up a Kafka Consumer to Receive Data Through CLI” راه‌اندازی شده باشد.
• نصب jq بر روی دستگاه خود. برای آشنایی بیشتر، به مقاله “How To Transform JSON Data with jq” مراجعه کنید.
• درک مفاهیم Kafka، از جمله موضوعات (topics)، تولیدکنندگان (producers) و مصرف‌کنندگان (consumers). برای اطلاعات بیشتر، به آموزش “Introduction to Kafka” مراجعه کنید.
مرحله 1 – راه‌اندازی Schema Registry با استفاده از Docker Compose
در این بخش، شما یاد خواهید گرفت که چگونه Confluent Schema Registry را با استفاده از Docker Compose راه‌اندازی کنید. برخلاف Kafka که می‌تواند به صورت مستقل و با استفاده از KRaft اجرا شود، Schema Registry به یک نمونه ZooKeeper برای عملکرد نیاز دارد.
به عنوان بخشی از پیش‌نیازها، شما Kafka را به عنوان یک سرویس systemd بر روی ماشین محلی خود مستقر کرده‌اید. در این مرحله، شما یک گره Kafka را از طریق Docker Compose مستقر خواهید کرد. ابتدا باید سرویس Kafka را با اجرای دستور زیر متوقف کنید:
sudo systemctl stop kafka
حال باید پیکربندی Docker Compose را در یک فایل به نام schema-registry-compose.yaml تعریف کنید. برای ایجاد و باز کردن آن برای ویرایش، دستور زیر را اجرا کنید:
nano schema-registry-compose.yaml
سطرهای زیر را اضافه کنید:
version: ‘3’
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      – zookeeper
    ports:
      – 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  kafka-schema-registry:
    image: confluentinc/cp-schema-registry
    hostname: kafka-schema-registry
    container_name: kafka-schema-registry
    depends_on:
      – zookeeper
      – kafka
    ports:
      – “8081:8081”
    environment:
      SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: ‘PLAINTEXT://kafka:29092’
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
در اینجا شما سه سرویس تعریف می‌کنید: zookeeper، kafka و kafka-schema-registry. تمامی این سه سرویس از جدیدترین تصاویر Docker که توسط Confluent ارائه شده‌اند استفاده می‌کنند. سرویس ZooKeeper در پورت 2181 و Kafka در پورت 9092 در دسترس خواهد بود. برای Kafka، در قسمت environment آدرس ZooKeeper را پیکربندی کرده و یک listener اضافی در پورت 29092 تعریف می‌کنید که Schema Registry از آن برای اتصال مستقیم به Kafka استفاده خواهد کرد. شما مشخص می‌کنید که سرویس Kafka ابتدا باید منتظر بماند تا ZooKeeper شروع به کار کند.
سپس، سرویس kafka-schema-registry را در پورت 8081 در دسترس قرار می‌دهید و آدرس اتصال به سرویس Kafka را در قسمت environment مشخص می‌کنید. همچنین تعیین می‌کنید که Schema Registry فقط زمانی شروع به کار کند که ZooKeeper و Kafka به طور کامل راه‌اندازی شده باشند.
فایل را ذخیره کرده و آن را ببندید، سپس دستور زیر را برای راه‌اندازی سرویس‌ها در پس‌زمینه اجرا کنید:
docker-compose -f schema-registry-compose.yaml up -d
خروجی در انتهای اجرای دستور مشابه خطوط زیر خواهد بود:
Creating root_zookeeper_1 … done
Creating root_kafka_1     … done
Creating kafka-schema-registry … done
شما می‌توانید با استفاده از دستور زیر، کانتینرهای در حال اجرا را مشاهده کنید:
docker ps
خروجی مشابه به این خواهد بود:
CONTAINER ID   IMAGE                              COMMAND                  CREATED          STATUS          PORTS                                       NAMES
6783568a74c8   confluentinc/cp-schema-registry    “/etc/confluent/dock…”   19 seconds ago   Up 17 seconds   0.0.0.0:8081->8081/tcp, :::8081->8081/tcp   kafka-schema-registry
6367df4b55f7   confluentinc/cp-kafka:latest       “/etc/confluent/dock…”   19 seconds ago   Up 18 seconds   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp   root_kafka_1
a5f5b09984e0   confluentinc/cp-zookeeper:latest   “/etc/confluent/dock…”   19 seconds ago   Up 19 seconds   2181/tcp, 2888/tcp, 3888/tcp                root_zookeeper_1
در این مرحله، شما یک نمونه از Schema Registry را همراه با ZooKeeper و Kafka از طریق Docker Compose مستقر کرده‌اید. حالا شما یاد خواهید گرفت که چگونه در پروژه جاوای خود از Avro schemas استفاده کنید.
مرحله 2 – استفاده از Avro Schemas
در این بخش، شما به پروژه خود Avro را اضافه خواهید کرد و وابستگی‌های مربوطه را اضافه خواهید کرد. شما یاد خواهید گرفت که چگونه Avro schemas را تعریف کرده و کلاس‌های جاوا را برای انواع تعریف‌شده به طور خودکار ایجاد کنید. سپس، schema خود را به Schema Registry اضافه خواهید کرد.
اضافه کردن Avro به پروژه شما
ابتدا باید وابستگی org.apache.avro را اضافه کنید. به صفحه مخزن Maven برای کلاینت جاوا در مرورگر خود بروید و آخرین نسخه موجود را انتخاب کرده، سپس قطعه XML ارائه شده برای Maven را کپی کنید. در زمان نگارش این مقاله، آخرین نسخه کتابخانه کلاینت جاوا 1.11.3 بود.
وابستگی‌ها در فایل pom.xml که در ریشه پروژه شما قرار دارد اضافه می‌شوند. آن را برای ویرایش با اجرای دستور زیر باز کنید:
nano pom.xml
بخش <dependencies> را پیدا کرده و تعریف وابستگی را اضافه کنید:
<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.11.3</version>
</dependency>
این کار Avro را در دسترس پروژه شما قرار می‌دهد. با این حال، برای این که بتوانید کلاس‌های جاوا را از Avro schemas تولید کنید، باید افزونه avro-maven-plugin را نیز از مخزن Maven به همان روش اضافه کنید.
پس از تعریف وابستگی‌ها، باید اطمینان حاصل کنید که پلاگین منابع را تولید می‌کند. بخش <build> فایل pom.xml را پیدا کرده و خطوط زیر را اضافه کنید:
<plugin>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-maven-plugin</artifactId>
  <version>1.11.3</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals>
        <goal>schema</goal>
      </goals>
      <configuration>
        <sourceDirectory>${project.basedir}/src/main/java/com/dokafka/</sourceDirectory>
        <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
      </configuration>
    </execution>
  </executions>
</plugin>
در اینجا، شما افزونه avro-maven-plugin را برای تولید منابع جاوا بر اساس schemas که در /src/main/java/com/dokafka قرار دارند و قرار دادن آن‌ها در /src/main/java پیکربندی می‌کنید. توجه داشته باشید که نباید بخش <plugins> موجود در زیر <pluginManagement> را تغییر دهید.
بعد از انجام این مراحل، فایل را ذخیره کرده و آن را ببندید.
برای بررسی اینکه همه‌چیز به درستی پیکربندی شده است، پروژه را بسازید:
mvn package
خروجی در انتهای اجرا مشابه این خواهد بود:
[INFO] ————————————————————————
[INFO] BUILD SUCCESS
[INFO] ————————————————————————
[INFO] Total time:  5.759 s
[INFO] Finished at: 2024-04-01T13:38:31Z
[INFO] ————————————————————————
تعریف Avro Schemas
در حال حاضر، شما یک schema به نام TempMeasurement ایجاد خواهید کرد که یک اندازه‌گیری دما در یک نقطه زمانی را توصیف می‌کند. شما آن را در کنار کلاس‌های ProducerDemo و ConsumerDemo که به عنوان بخشی از پیش‌نیازها ایجاد کرده‌اید، ذخیره خواهید کرد. برای ایجاد و ویرایش آن دستور زیر را اجرا کنید:
nano src/main/java/com/dokafka/TempMeasurement.avsc
سطرهای زیر را اضافه کنید:
{
  “namespace”: “com.dokafka”,
  “name”: “TempMeasurement”,
  “type”: “record”,
  “fields”: [
      {
          “name”: “measuredValue”,
          “type”: “double”
      },
      {
          “name”: “measurerName”,
          “type”: “string”
      }
  ]
}
فایل‌های Avro schema به صورت JSON نوشته می‌شوند و پسوند فایل آن‌ها .avsc است. ابتدا شما namespace schema را مشخص می‌کنید، که همچنین namespace کلاس‌های جاوای تولید شده خواهد بود. سپس نام آن را به TempMeasurement تنظیم کرده و نوع schema را به record تعیین می‌کنید، که نشان‌دهنده یک شیء Avro است.
سپس، شما فیلدهای schema خود را که شامل measuredValue و measurerName از نوع‌های double و string به ترتیب هستند، مشخص می‌کنید. Avro از انواع دیگری مانند int، long، float، boolean و bytes نیز پشتیبانی می‌کند.
فایل را ذخیره کرده و آن را ببندید، سپس پروژه را بسازید:
mvn package
حال، فایل‌ها را زیر دایرکتوری src/main/java/com/dokafka لیست کنید:
ls src/main/java/com/dokafka
شما مشاهده خواهید کرد که یک کلاس TempMeasurement ایجاد شده است:
ConsumerDemo.java  ProducerDemo.java  TempMeasurement.avsc  TempMeasurement.java
این کلاس کدی را برای ایجاد، سریال‌سازی و دِسریال‌سازی اشیاء TempMeasurement در خود دارد.
ذخیره Schema در Schema Registry
حالا که شما schema خود را تعریف کرده‌اید، می‌توانید آن را به Schema Registry اضافه کنید با اجرای دستور زیر:
curl -X POST -H “Content-Type: application/vnd.schemaregistry.v1+json” \
    -d “$(jq -R -s ‘{“schema”: .}’ < src/main/java/com/dokafka/TempMeasurement.avsc)” \
Schema Registry در آدرس http://localhost:8081 در دسترس است و از طریق HTTP قابل دسترسی است. در این دستور، ابتدا متد HTTP را به POST تنظیم می‌کنید همراه با Content-Type مناسب که Registry آن را قبول می‌کند.
شما بدنه درخواست را به -d می‌دهید و از jq برای بسته‌بندی محتوای schema در یک فیلد به نام schema استفاده می‌کنید، زیرا این فرمت است که Schema Registry آن را می‌پذیرد. در نهایت، درخواست را به subjects/TempMeasurement/versions هدایت می‌کنید، جایی که شما مشخص می‌کنید که schema جدید چگونه باید نام‌گذاری شود.
خروجی به صورت زیر خواهد بود:
{“id”:1}
Schema Registry درخواست را پذیرفته و به آن ID شماره 1 اختصاص داده است.
برای مشاهده تمام schemas موجود، دستور زیر را اجرا کنید:
شما تنها یک schema موجود خواهید دید:
[“TempMeasurement”]
در این مرحله، شما وابستگی‌های لازم را به پروژه Maven خود اضافه کرده‌اید و کد خودکارسازی برای Avro schemas که تعریف می‌کنید را پیکربندی کرده‌اید. اکنون شما آماده هستید که داده‌ها را بر اساس schemas تولید و مصرف کنید.
مرحله 3 – تولید و مصرف داده‌ها طبق Schemas
Confluent یک کتابخانه Avro serializer برای Kafka به نام kafka-avro-serializer فراهم کرده است. در این مرحله، شما این کتابخانه را به پروژه خود اضافه خواهید کرد و producer و consumer خود را برای ارسال و دریافت اشیاء TempMeasurement پیکربندی می‌کنید.
افزودن وابستگی‌های لازم
برای افزودن این کتابخانه به پروژه خود، به مخزن Maven بروید و تعریف XML برای آخرین نسخه موجود را کپی کنید (نسخه‌ای که در زمان نگارش این مطلب به‌روزترین بود، نسخه 7.6.0 بود). سپس، فایل pom.xml را برای ویرایش باز کنید:
nano pom.xml
از آنجایی که بسته kafka-avro-serializer در مخزن Maven Confluent میزبانی می‌شود، باید آن را با افزودن خطوط زیر تعریف کنید:
<repositories>
  <repository>
    <id>confluent</id>
  </repository>
</repositories>
سپس، تعریف کتابخانه را به بخش <dependencies> اضافه کنید:
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-avro-serializer</artifactId>
  <version>7.6.0</version>
</dependency>
برای اینکه مشتری Kafka بتواند با Schema Registry ارتباط برقرار کند، همچنین باید kafka-schema-registry-client را به‌عنوان یک وابستگی اضافه کنید.
پس از انجام این تغییرات، فایل را ذخیره کرده و بسته را کامپایل کنید.
به‌روزرسانی کلاس‌های ProducerDemo و ConsumerDemo
به‌روزرسانی ProducerDemo
کلاس ProducerDemo را برای اتصال به Schema Registry و تولید اشیاء از نوع TempMeasurement در یک topic به‌روزرسانی کنید. ابتدا آن را برای ویرایش باز کنید:
nano src/main/java/com/dokafka/ProducerDemo.java
کتابخانه‌های KafkaAvroSerializer و KafkaAvroSerializerConfig را وارد کنید:
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
سپس، بخش اول متد main را به صورت زیر تغییر دهید:
String topicName = “java_demo_avro”;
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
properties.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, http://localhost:8081“);
KafkaProducer<String, TempMeasurement> producer = new KafkaProducer<>(properties);
ProducerRecord<String, TempMeasurement> producerRecord =
  new ProducerRecord<>(topicName, new TempMeasurement(Double.parseDouble(args[0]), args[1]));
در اینجا:
• نام topic را به java_demo_avro تنظیم می‌کنید. Kafka این topic را اگر قبلاً وجود نداشته باشد، ایجاد خواهد کرد.
• کلاس serializer برای مقدار را از StringSerializer به KafkaAvroSerializer تغییر می‌دهید و آدرس Schema Registry را با استفاده از SCHEMA_REGISTRY_URL_CONFIG مشخص می‌کنید.
• تعریف مقدار قبلی که از نوع String بود را به TempMeasurement تغییر می‌دهید و برای producerRecord یک نمونه جدید از TempMeasurement با دو پارامتر measurementValue و measurerName از آرگومان‌های خط فرمان ایجاد می‌کنید.
پس از انجام این تغییرات، فایل را ذخیره کرده و آن را ببندید.
برای اینکه بتوانید دو پارامتر را به اسکریپت run-producer.sh ارسال کنید، آن را برای ویرایش باز کنید:
nano run-producer.sh
پارامتر زیر را به دستور اضافه کنید:
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemo $1 $2
سپس فایل را ذخیره کرده و آن را ببندید. حالا می‌توانید یک نمونه TempMeasurement تولید کنید با اجرای دستور زیر:
./run-producer.sh 100 sammy
در این دستور، 100 به‌عنوان measurementValue و sammy به‌عنوان measurerName ارسال می‌شود.
به‌روزرسانی ConsumerDemo
برای دریافت شیء Avro که به تازگی تولید کرده‌اید، باید کلاس ConsumerDemo را به‌روزرسانی کنید. ابتدا آن را برای ویرایش باز کنید:
nano src/main/java/com/dokafka/ConsumerDemo.java
کتابخانه‌های KafkaAvroDeserializer و KafkaAvroDeserializerConfig را وارد کنید:
import org.apache.kafka.common.serialization.StringDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
سپس، متد main را به صورت زیر تغییر دهید:
String topic = “java_demo_avro”;
String groupId = “group1”;
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “earliest”);
properties.setProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, http://localhost:8081“);
properties.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, “true”);
final KafkaConsumer<String, TempMeasurement> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
try {
  while (true) {
    ConsumerRecords<String, TempMeasurement> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, TempMeasurement> record: records) {
      log.info(String.format(“measuredValue: %s, measurerName: %s\n”,
        record.value().getMeasuredValue(),
        record.value().getMeasurerName()));
    }
  }
} catch (Exception e) {
  log.error(“An error occurred”, e);
} finally {
  consumer.close();
}
در اینجا:
• topic و آدرس Schema Registry را به‌روزرسانی می‌کنید.
• deserializer را به KafkaAvroDeserializer تغییر می‌دهید.
• با تنظیم SPECIFIC_AVRO_READER_CONFIG به true، شما به deserializer می‌گویید که اشیاء واقعی از نوع TempMeasurement برگرداند. در غیر این صورت، یک GenericRecord Avro که هنوز شامل تمام فیلدها است، اما نوع‌گذاری مشخص ندارد، برگردانده خواهد شد.
حالا می‌توانید داده‌های Avro تولید شده را دریافت کنید.
سپس، نوع مقدار TempMeasurement را در سراسر کد باقی‌مانده انتشار می‌دهید. در داخل حلقه for، فراخوانی متد logging را به‌روزرسانی می‌کنید تا measurementTemp و measurementValue را به‌درستی نمایش دهد.
با توجه به یکپارچگی با Schema Registry، تولیدکننده (producer) شیء را همراه با شناسایی‌کننده (identifier) اسکیمای آن به topic ارسال می‌کند، نه اینکه خود اسکیمای شیء را ارسال کند. مصرف‌کننده (consumer) سپس آن اسکیمای کامل را از Schema Registry دریافت کرده و آن را deserializes می‌کند.
پس از انجام تغییرات، فایل را ذخیره کرده و آن را ببندید. سپس مصرف‌کننده را اجرا کنید:
./run-consumer.sh
خروجی آخر مشابه این خواهد بود:
[main] INFO com.dokafka.ConsumerDemo – measuredValue: 100.0, measurerName: sammy
این نشان‌دهنده این است که مصرف‌کننده Kafka پیام Avro را به‌درستی deserialized کرده است، همانطور که از پیام log مشخص است.
در این مرحله، شما کلاس‌های تولیدکننده و مصرف‌کننده خود را به‌روزرسانی کردید تا از اشیاء Avro استفاده کنند. در مرحله بعد، شما یاد خواهید گرفت که چگونه اسکیمای خود را به‌روزرسانی کرده و سازگاری آن را با Schema Registry پیگیری کنید.
مرحله ۴ – تکامل اسکیمای داده و سازگاری
در این مرحله، شما یاد خواهید گرفت که چگونه اسکیمای موجود را به‌روزرسانی کنید و این تغییرات چگونه بر سازگاری با نسخه‌ها و مشتریان موجود تأثیر می‌گذارند.
علاوه بر ذخیره و نسخه‌بندی اسکیمای داده‌ها از طریق زمان، Schema Registry برای فعال‌سازی تکامل اسکیمای داده بسیار مهم است. اسکیمای داده می‌تواند در طول عمر یک پروژه تغییر کند، در حالی که داده‌های تولیدشده قبلاً نمی‌توانند تغییر کنند.
Schema Registry مسئول مدیریت سازگاری بین نسخه‌های مختلف اسکیمای داده است و به مصرف‌کننده اجازه می‌دهد تا داده‌ها را مطابق با نسخه داخلی اسکیمای خود تجزیه کند. این قابلیت به تولیدکنندگان و مصرف‌کنندگان اجازه می‌دهد که در ارتباط با نسخه دقیق اسکیمای خود همگام نباشند، زیرا آنها می‌توانند در کدهای مختلف قرار داشته باشند.
استراتژی‌های اصلی سازگاری که Schema Registry ارائه می‌دهد عبارتند از:
• BACKWARD: اطمینان می‌دهد که مصرف‌کنندگان با استفاده از اسکیمای جدید می‌توانند داده‌هایی را که بر اساس نسخه قبلی ایجاد شده‌اند، بخوانند.
• FORWARD: به این معنی که مصرف‌کنندگان با استفاده از اسکیمای جدید می‌توانند داده‌هایی را که بر اساس اسکیمای جدید ایجاد شده‌اند، بخوانند (بدون تضمین برای نسخه‌های قبلی).
• FULL: ترکیبی از دو استراتژی قبلی است.
• NONE: به این معنی که بررسی‌های سازگاری غیرفعال هستند.
سه استراتژی اول دارای همتایان ترانزیتی (مثل BACKWARD_TRANSITIVE) هستند که ملزم می‌کند اسکیمای جدید با تمام نسخه‌های قبلی اسکیمای داده سازگار باشد، نه فقط نسخه قبلی.
استراتژی پیش‌فرض BACKWARD است.
به‌روزرسانی اسکیمای TempMeasurement
در این مرحله، شما اسکیمای TempMeasurement را به‌روزرسانی می‌کنید تا یک فیلد جدید به نام measurementDate برای ذخیره تاریخ اندازه‌گیری اضافه شود.
۱. فایل اسکیمای TempMeasurement را برای ویرایش باز کنید:
nano src/main/java/com/dokafka/TempMeasurement.avsc
۲. آن را به شکل زیر تغییر دهید:
{
  “namespace”: “com.dokafka”,
  “name”: “TempMeasurement”,
  “type”: “record”,
  “fields”: [
      {
          “name”: “measuredValue”,
          “type”: “double”
      },
      {
          “name”: “measurerName”,
          “type”: “string”
      },
      {
          “name”: “measurementDate”,
          “type”: “string”
      }
  ]
}
۳. شما یک فیلد جدید به نام measurementDate تعریف کرده‌اید که تاریخ اندازه‌گیری را به‌صورت متنی ذخیره خواهد کرد. فایل را ذخیره و بسته کنید.
۴. برای ایجاد نسخه جدید از اسکیمای به‌روزرسانی‌شده در Schema Registry، دستور زیر را اجرا کنید:
curl -X POST -H “Content-Type: application/vnd.schemaregistry.v1+json” \
    -d “$(jq -R -s ‘{“schema”: .}’ < src/main/java/com/dokafka/TempMeasurement.avsc)” \
۵. در این مرحله، خطای زیر به شما نمایش داده خواهد شد:
{
  “error_code”: 409,
  “message”: “Schema being registered is incompatible with an earlier schema for subject \”TempMeasurement\”, details: [{errorType:’READER_FIELD_MISSING_DEFAULT_VALUE’, description:’The field ‘measurementDate’ at path ‘/fields/2’ in the new schema has no default value and is missing in the old schema’, additionalInfo:’measurementDate’}, {oldSchemaVersion: 1}, {oldSchema: ‘{\”type\”:\”record\”,\”name\”:\”TempMeasurement\”,\”namespace\”:\”com.dokafka\”,\”fields\”:[{\”name\”:\”measuredValue\”,\”type\”:\”double\”},{\”name\”:\”measurerName\”,\”type\”:\”string\”}]}’}, {validateFields: ‘false’, compatibility: ‘BACKWARD’}]”
}
این خطا نشان می‌دهد که اسکیمای جدید با نسخه قبلی سازگار نیست زیرا فیلد measurementDate جدید است و مقداری پیش‌فرض ندارد. این رفتار باعث می‌شود که مصرف‌کنندگان با اسکیمای جدید نتوانند داده‌هایی را که با اسکیمای قبلی ایجاد شده‌اند، بخوانند.
۶. برای رفع این مشکل، فایل را دوباره باز کنید و خطی برای مقدار پیش‌فرض به فیلد measurementDate اضافه کنید:
{
  “namespace”: “com.dokafka”,
  “name”: “TempMeasurement”,
  “type”: “record”,
  “fields”: [
      {
          “name”: “measuredValue”,
          “type”: “double”
      },
      {
          “name”: “measurerName”,
          “type”: “string”
      },
      {
          “name”: “measurementDate”,
          “type”: “string”,
          “default”: “”
      }
  ]
}
۷. پس از ذخیره و بستن فایل، دوباره دستور ارسال اسکیمای جدید را اجرا کنید. خروجی به‌صورت زیر خواهد بود:
{“id”:2}
Schema Registry نسخه دوم اسکیمای داده را پذیرفته و شناسه ۲ را به آن اختصاص داده است.
۸. برای دریافت نسخه اول، دستور زیر را اجرا کنید:
۹. خروجی به‌صورت زیر خواهد بود:
{
  “subject”: “TempMeasurement”,
  “version”: 1,
  “id”: 1,
  “schema”: “{\”type\”:\”record\”,\”name\”:\”TempMeasurement\”,\”namespace\”:\”com.dokafka\”,\”fields\”:[{\”name\”:\”measuredValue\”,\”type\”:\”double\”},{\”name\”:\”measurerName\”,\”type\”:\”string\”}]}”
}
۱۰. برای مشاهده همه نسخه‌ها، دستور زیر را اجرا کنید:
خروجی به‌صورت زیر خواهد بود:
[1, 2]
تغییر استراتژی سازگاری
برای تغییر استراتژی سازگاری، دستور زیر را اجرا کنید:
curl -X PUT -H “Content-Type: application/vnd.schemaregistry.v1+json” \
       –data ‘{“compatibility”: “BACKWARD_TRANSITIVE”}’ \
خروجی به‌صورت زیر خواهد بود:
{“compatibility”:”BACKWARD_TRANSITIVE”}
پاک‌سازی منابع Docker Compose
برای از بین بردن منابع Docker Compose، دستور زیر را اجرا کنید:
docker-compose -f schema-registry-compose.yaml down
در این مرحله، شما اسکیمای TempMeasurement را مطابق با استراتژی سازگاری BACKWARD به‌روزرسانی کرده و آن را در Schema Registry منتشر کردید.
نتیجه‌گیری
در این مقاله، شما کلاس‌های ProducerDemo و ConsumerDemo خود را گسترش دادید تا اشیای TempMeasurement را که با استفاده از Apache Avro سریالیزه شده‌اند، تولید و مصرف کنید. همچنین یاد گرفتید که چگونه از Schema Registry برای ذخیره‌سازی و تکامل اسکیمای داده استفاده کنید و مشتریان Kafka خود را به آن متصل کنید.
برای امتیاز به این نوشته کلیک کنید!
[کل: 0 میانگین: 0]

نظرات کاربران

دیدگاهی بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *