مقدمه
با رشد کسبوکارها، حفظ سازگاری بین فرمتهای دادهای در هنگام استریم رویدادها اهمیت زیادی دارد. دادههای ذخیرهشده در موضوعات Apache Kafka تغییرناپذیر هستند و نمیتوان آنها را بهطور بازگشتی برای تطبیق با نیازهای فعلی تغییر داد. مقابله با این مشکل زمانی که تغییرات زیادی در اسکیما وجود داشته باشد، میتواند چالشبرانگیز باشد.
Apache Avro یک کتابخانه سریالسازی داده است که برای پایپلاینهای استریم رویداد طراحی شده است. این کتابخانه به شما اجازه میدهد ساختارهای پیچیده (که به آنها اسکیما گفته میشود) برای دادههای خود تعریف کنید و قابلیتهای سریالسازی برای حمل و نقل و ذخیرهسازی کارآمد را فراهم میآورد. برای پیگیری اسکیماها و نسخههای آنها، Confluent Schema Registry ایجاد شده است. این سرویس بهعنوان یک مخزن متمرکز برای اسکیماهای شما عمل میکند، ذخیرهسازی آنها را مدیریت میکند و سازگاری متقابل را تضمین میکند. این امکان را به شما میدهد تا بر روی دادهها تمرکز کنید به جای آنکه روشهایی برای تبدیل دستی نسخههای مختلف اسکیما ایجاد کنید.
در این آموزش، شما Confluent Schema Registry را با استفاده از Docker مستقر خواهید کرد و تولیدکننده و مصرفکننده Kafka که در آموزشهای قبلی این سری ایجاد کردهاید را گسترش خواهید داد. شما این بخشها را دوباره طراحی خواهید کرد تا اشیائی را که با اسکیماهایی که خودتان تعریف کردهاید مطابقت دارند، ایجاد و مصرف کنند. همچنین اسکیما را تغییر خواهید داد و یاد خواهید گرفت که چگونه آن را تکامل دهید بدون اینکه دادههای متناسب با نسخههای قبلی شکسته شوند.
الزامات
برای دنبال کردن این آموزش، شما به موارد زیر نیاز دارید:
• نصب Docker بر روی دستگاه خود. برای سیستمعامل Ubuntu، به آموزش “How To Install and Use Docker on Ubuntu” مراجعه کنید. شما فقط نیاز به انجام مرحله اول و دوم این آموزش دارید.
• نصب Docker Compose بر روی دستگاه خود. برای سیستمعامل Ubuntu، مراحل اول و دوم آموزش “How To Install and Use Docker Compose on Ubuntu” را دنبال کنید.
• یک پروژه Java با Kafka producer که طبق راهنمای “How To Set Up a Kafka Producer to Source Data Through CLI” راهاندازی شده باشد.
• یک پروژه Java با Kafka consumer که طبق راهنمای “How To Set Up a Kafka Consumer to Receive Data Through CLI” راهاندازی شده باشد.
• نصب jq بر روی دستگاه خود. برای آشنایی بیشتر، به مقاله “How To Transform JSON Data with jq” مراجعه کنید.
• درک مفاهیم Kafka، از جمله موضوعات (topics)، تولیدکنندگان (producers) و مصرفکنندگان (consumers). برای اطلاعات بیشتر، به آموزش “Introduction to Kafka” مراجعه کنید.
مرحله 1 – راهاندازی Schema Registry با استفاده از Docker Compose
در این بخش، شما یاد خواهید گرفت که چگونه Confluent Schema Registry را با استفاده از Docker Compose راهاندازی کنید. برخلاف Kafka که میتواند به صورت مستقل و با استفاده از KRaft اجرا شود، Schema Registry به یک نمونه ZooKeeper برای عملکرد نیاز دارد.
به عنوان بخشی از پیشنیازها، شما Kafka را به عنوان یک سرویس systemd بر روی ماشین محلی خود مستقر کردهاید. در این مرحله، شما یک گره Kafka را از طریق Docker Compose مستقر خواهید کرد. ابتدا باید سرویس Kafka را با اجرای دستور زیر متوقف کنید:
sudo systemctl stop kafka
حال باید پیکربندی Docker Compose را در یک فایل به نام schema-registry-compose.yaml تعریف کنید. برای ایجاد و باز کردن آن برای ویرایش، دستور زیر را اجرا کنید:
nano schema-registry-compose.yaml
سطرهای زیر را اضافه کنید:
version: ‘3’
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
– zookeeper
ports:
– 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka-schema-registry:
image: confluentinc/cp-schema-registry
hostname: kafka-schema-registry
container_name: kafka-schema-registry
depends_on:
– zookeeper
– kafka
ports:
– “8081:8081”
environment:
SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: ‘PLAINTEXT://kafka:29092’
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
در اینجا شما سه سرویس تعریف میکنید: zookeeper، kafka و kafka-schema-registry. تمامی این سه سرویس از جدیدترین تصاویر Docker که توسط Confluent ارائه شدهاند استفاده میکنند. سرویس ZooKeeper در پورت 2181 و Kafka در پورت 9092 در دسترس خواهد بود. برای Kafka، در قسمت environment آدرس ZooKeeper را پیکربندی کرده و یک listener اضافی در پورت 29092 تعریف میکنید که Schema Registry از آن برای اتصال مستقیم به Kafka استفاده خواهد کرد. شما مشخص میکنید که سرویس Kafka ابتدا باید منتظر بماند تا ZooKeeper شروع به کار کند.
سپس، سرویس kafka-schema-registry را در پورت 8081 در دسترس قرار میدهید و آدرس اتصال به سرویس Kafka را در قسمت environment مشخص میکنید. همچنین تعیین میکنید که Schema Registry فقط زمانی شروع به کار کند که ZooKeeper و Kafka به طور کامل راهاندازی شده باشند.
فایل را ذخیره کرده و آن را ببندید، سپس دستور زیر را برای راهاندازی سرویسها در پسزمینه اجرا کنید:
docker-compose -f schema-registry-compose.yaml up -d
خروجی در انتهای اجرای دستور مشابه خطوط زیر خواهد بود:
Creating root_zookeeper_1 … done
Creating root_kafka_1 … done
Creating kafka-schema-registry … done
شما میتوانید با استفاده از دستور زیر، کانتینرهای در حال اجرا را مشاهده کنید:
docker ps
خروجی مشابه به این خواهد بود:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
6783568a74c8 confluentinc/cp-schema-registry “/etc/confluent/dock…” 19 seconds ago Up 17 seconds 0.0.0.0:8081->8081/tcp, :::8081->8081/tcp kafka-schema-registry
6367df4b55f7 confluentinc/cp-kafka:latest “/etc/confluent/dock…” 19 seconds ago Up 18 seconds 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp root_kafka_1
a5f5b09984e0 confluentinc/cp-zookeeper:latest “/etc/confluent/dock…” 19 seconds ago Up 19 seconds 2181/tcp, 2888/tcp, 3888/tcp root_zookeeper_1
در این مرحله، شما یک نمونه از Schema Registry را همراه با ZooKeeper و Kafka از طریق Docker Compose مستقر کردهاید. حالا شما یاد خواهید گرفت که چگونه در پروژه جاوای خود از Avro schemas استفاده کنید.
مرحله 2 – استفاده از Avro Schemas
در این بخش، شما به پروژه خود Avro را اضافه خواهید کرد و وابستگیهای مربوطه را اضافه خواهید کرد. شما یاد خواهید گرفت که چگونه Avro schemas را تعریف کرده و کلاسهای جاوا را برای انواع تعریفشده به طور خودکار ایجاد کنید. سپس، schema خود را به Schema Registry اضافه خواهید کرد.
اضافه کردن Avro به پروژه شما
ابتدا باید وابستگی org.apache.avro را اضافه کنید. به صفحه مخزن Maven برای کلاینت جاوا در مرورگر خود بروید و آخرین نسخه موجود را انتخاب کرده، سپس قطعه XML ارائه شده برای Maven را کپی کنید. در زمان نگارش این مقاله، آخرین نسخه کتابخانه کلاینت جاوا 1.11.3 بود.
وابستگیها در فایل pom.xml که در ریشه پروژه شما قرار دارد اضافه میشوند. آن را برای ویرایش با اجرای دستور زیر باز کنید:
nano pom.xml
بخش <dependencies> را پیدا کرده و تعریف وابستگی را اضافه کنید:
…
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
…
این کار Avro را در دسترس پروژه شما قرار میدهد. با این حال، برای این که بتوانید کلاسهای جاوا را از Avro schemas تولید کنید، باید افزونه avro-maven-plugin را نیز از مخزن Maven به همان روش اضافه کنید.
پس از تعریف وابستگیها، باید اطمینان حاصل کنید که پلاگین منابع را تولید میکند. بخش <build> فایل pom.xml را پیدا کرده و خطوط زیر را اضافه کنید:
…
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.3</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/java/com/dokafka/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
…
در اینجا، شما افزونه avro-maven-plugin را برای تولید منابع جاوا بر اساس schemas که در /src/main/java/com/dokafka قرار دارند و قرار دادن آنها در /src/main/java پیکربندی میکنید. توجه داشته باشید که نباید بخش <plugins> موجود در زیر <pluginManagement> را تغییر دهید.
بعد از انجام این مراحل، فایل را ذخیره کرده و آن را ببندید.
برای بررسی اینکه همهچیز به درستی پیکربندی شده است، پروژه را بسازید:
mvn package
خروجی در انتهای اجرا مشابه این خواهد بود:
[INFO] ————————————————————————
[INFO] BUILD SUCCESS
[INFO] ————————————————————————
[INFO] Total time: 5.759 s
[INFO] Finished at: 2024-04-01T13:38:31Z
[INFO] ————————————————————————
تعریف Avro Schemas
در حال حاضر، شما یک schema به نام TempMeasurement ایجاد خواهید کرد که یک اندازهگیری دما در یک نقطه زمانی را توصیف میکند. شما آن را در کنار کلاسهای ProducerDemo و ConsumerDemo که به عنوان بخشی از پیشنیازها ایجاد کردهاید، ذخیره خواهید کرد. برای ایجاد و ویرایش آن دستور زیر را اجرا کنید:
nano src/main/java/com/dokafka/TempMeasurement.avsc
سطرهای زیر را اضافه کنید:
{
“namespace”: “com.dokafka”,
“name”: “TempMeasurement”,
“type”: “record”,
“fields”: [
{
“name”: “measuredValue”,
“type”: “double”
},
{
“name”: “measurerName”,
“type”: “string”
}
]
}
فایلهای Avro schema به صورت JSON نوشته میشوند و پسوند فایل آنها .avsc است. ابتدا شما namespace schema را مشخص میکنید، که همچنین namespace کلاسهای جاوای تولید شده خواهد بود. سپس نام آن را به TempMeasurement تنظیم کرده و نوع schema را به record تعیین میکنید، که نشاندهنده یک شیء Avro است.
سپس، شما فیلدهای schema خود را که شامل measuredValue و measurerName از نوعهای double و string به ترتیب هستند، مشخص میکنید. Avro از انواع دیگری مانند int، long، float، boolean و bytes نیز پشتیبانی میکند.
فایل را ذخیره کرده و آن را ببندید، سپس پروژه را بسازید:
mvn package
حال، فایلها را زیر دایرکتوری src/main/java/com/dokafka لیست کنید:
ls src/main/java/com/dokafka
شما مشاهده خواهید کرد که یک کلاس TempMeasurement ایجاد شده است:
ConsumerDemo.java ProducerDemo.java TempMeasurement.avsc TempMeasurement.java
این کلاس کدی را برای ایجاد، سریالسازی و دِسریالسازی اشیاء TempMeasurement در خود دارد.
ذخیره Schema در Schema Registry
حالا که شما schema خود را تعریف کردهاید، میتوانید آن را به Schema Registry اضافه کنید با اجرای دستور زیر:
curl -X POST -H “Content-Type: application/vnd.schemaregistry.v1+json” \
-d “$(jq -R -s ‘{“schema”: .}’ < src/main/java/com/dokafka/TempMeasurement.avsc)” \
Schema Registry در آدرس http://localhost:8081 در دسترس است و از طریق HTTP قابل دسترسی است. در این دستور، ابتدا متد HTTP را به POST تنظیم میکنید همراه با Content-Type مناسب که Registry آن را قبول میکند.
شما بدنه درخواست را به -d میدهید و از jq برای بستهبندی محتوای schema در یک فیلد به نام schema استفاده میکنید، زیرا این فرمت است که Schema Registry آن را میپذیرد. در نهایت، درخواست را به subjects/TempMeasurement/versions هدایت میکنید، جایی که شما مشخص میکنید که schema جدید چگونه باید نامگذاری شود.
خروجی به صورت زیر خواهد بود:
{“id”:1}
Schema Registry درخواست را پذیرفته و به آن ID شماره 1 اختصاص داده است.
برای مشاهده تمام schemas موجود، دستور زیر را اجرا کنید:
شما تنها یک schema موجود خواهید دید:
[“TempMeasurement”]
در این مرحله، شما وابستگیهای لازم را به پروژه Maven خود اضافه کردهاید و کد خودکارسازی برای Avro schemas که تعریف میکنید را پیکربندی کردهاید. اکنون شما آماده هستید که دادهها را بر اساس schemas تولید و مصرف کنید.
مرحله 3 – تولید و مصرف دادهها طبق Schemas
Confluent یک کتابخانه Avro serializer برای Kafka به نام kafka-avro-serializer فراهم کرده است. در این مرحله، شما این کتابخانه را به پروژه خود اضافه خواهید کرد و producer و consumer خود را برای ارسال و دریافت اشیاء TempMeasurement پیکربندی میکنید.
افزودن وابستگیهای لازم
برای افزودن این کتابخانه به پروژه خود، به مخزن Maven بروید و تعریف XML برای آخرین نسخه موجود را کپی کنید (نسخهای که در زمان نگارش این مطلب بهروزترین بود، نسخه 7.6.0 بود). سپس، فایل pom.xml را برای ویرایش باز کنید:
nano pom.xml
از آنجایی که بسته kafka-avro-serializer در مخزن Maven Confluent میزبانی میشود، باید آن را با افزودن خطوط زیر تعریف کنید:
<repositories>
<repository>
<id>confluent</id>
</repository>
</repositories>
سپس، تعریف کتابخانه را به بخش <dependencies> اضافه کنید:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.0</version>
</dependency>
برای اینکه مشتری Kafka بتواند با Schema Registry ارتباط برقرار کند، همچنین باید kafka-schema-registry-client را بهعنوان یک وابستگی اضافه کنید.
پس از انجام این تغییرات، فایل را ذخیره کرده و بسته را کامپایل کنید.
بهروزرسانی کلاسهای ProducerDemo و ConsumerDemo
بهروزرسانی ProducerDemo
کلاس ProducerDemo را برای اتصال به Schema Registry و تولید اشیاء از نوع TempMeasurement در یک topic بهروزرسانی کنید. ابتدا آن را برای ویرایش باز کنید:
nano src/main/java/com/dokafka/ProducerDemo.java
کتابخانههای KafkaAvroSerializer و KafkaAvroSerializerConfig را وارد کنید:
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
سپس، بخش اول متد main را به صورت زیر تغییر دهید:
String topicName = “java_demo_avro”;
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
properties.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, “http://localhost:8081“);
KafkaProducer<String, TempMeasurement> producer = new KafkaProducer<>(properties);
ProducerRecord<String, TempMeasurement> producerRecord =
new ProducerRecord<>(topicName, new TempMeasurement(Double.parseDouble(args[0]), args[1]));
در اینجا:
• نام topic را به java_demo_avro تنظیم میکنید. Kafka این topic را اگر قبلاً وجود نداشته باشد، ایجاد خواهد کرد.
• کلاس serializer برای مقدار را از StringSerializer به KafkaAvroSerializer تغییر میدهید و آدرس Schema Registry را با استفاده از SCHEMA_REGISTRY_URL_CONFIG مشخص میکنید.
• تعریف مقدار قبلی که از نوع String بود را به TempMeasurement تغییر میدهید و برای producerRecord یک نمونه جدید از TempMeasurement با دو پارامتر measurementValue و measurerName از آرگومانهای خط فرمان ایجاد میکنید.
پس از انجام این تغییرات، فایل را ذخیره کرده و آن را ببندید.
برای اینکه بتوانید دو پارامتر را به اسکریپت run-producer.sh ارسال کنید، آن را برای ویرایش باز کنید:
nano run-producer.sh
پارامتر زیر را به دستور اضافه کنید:
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemo $1 $2
سپس فایل را ذخیره کرده و آن را ببندید. حالا میتوانید یک نمونه TempMeasurement تولید کنید با اجرای دستور زیر:
./run-producer.sh 100 sammy
در این دستور، 100 بهعنوان measurementValue و sammy بهعنوان measurerName ارسال میشود.
بهروزرسانی ConsumerDemo
برای دریافت شیء Avro که به تازگی تولید کردهاید، باید کلاس ConsumerDemo را بهروزرسانی کنید. ابتدا آن را برای ویرایش باز کنید:
nano src/main/java/com/dokafka/ConsumerDemo.java
کتابخانههای KafkaAvroDeserializer و KafkaAvroDeserializerConfig را وارد کنید:
import org.apache.kafka.common.serialization.StringDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
سپس، متد main را به صورت زیر تغییر دهید:
String topic = “java_demo_avro”;
String groupId = “group1”;
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “earliest”);
properties.setProperty(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, “http://localhost:8081“);
properties.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, “true”);
final KafkaConsumer<String, TempMeasurement> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
try {
while (true) {
ConsumerRecords<String, TempMeasurement> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, TempMeasurement> record: records) {
log.info(String.format(“measuredValue: %s, measurerName: %s\n”,
record.value().getMeasuredValue(),
record.value().getMeasurerName()));
}
}
} catch (Exception e) {
log.error(“An error occurred”, e);
} finally {
consumer.close();
}
در اینجا:
• topic و آدرس Schema Registry را بهروزرسانی میکنید.
• deserializer را به KafkaAvroDeserializer تغییر میدهید.
• با تنظیم SPECIFIC_AVRO_READER_CONFIG به true، شما به deserializer میگویید که اشیاء واقعی از نوع TempMeasurement برگرداند. در غیر این صورت، یک GenericRecord Avro که هنوز شامل تمام فیلدها است، اما نوعگذاری مشخص ندارد، برگردانده خواهد شد.
حالا میتوانید دادههای Avro تولید شده را دریافت کنید.
سپس، نوع مقدار TempMeasurement را در سراسر کد باقیمانده انتشار میدهید. در داخل حلقه for، فراخوانی متد logging را بهروزرسانی میکنید تا measurementTemp و measurementValue را بهدرستی نمایش دهد.
با توجه به یکپارچگی با Schema Registry، تولیدکننده (producer) شیء را همراه با شناساییکننده (identifier) اسکیمای آن به topic ارسال میکند، نه اینکه خود اسکیمای شیء را ارسال کند. مصرفکننده (consumer) سپس آن اسکیمای کامل را از Schema Registry دریافت کرده و آن را deserializes میکند.
پس از انجام تغییرات، فایل را ذخیره کرده و آن را ببندید. سپس مصرفکننده را اجرا کنید:
./run-consumer.sh
خروجی آخر مشابه این خواهد بود:
[main] INFO com.dokafka.ConsumerDemo – measuredValue: 100.0, measurerName: sammy
این نشاندهنده این است که مصرفکننده Kafka پیام Avro را بهدرستی deserialized کرده است، همانطور که از پیام log مشخص است.
در این مرحله، شما کلاسهای تولیدکننده و مصرفکننده خود را بهروزرسانی کردید تا از اشیاء Avro استفاده کنند. در مرحله بعد، شما یاد خواهید گرفت که چگونه اسکیمای خود را بهروزرسانی کرده و سازگاری آن را با Schema Registry پیگیری کنید.
مرحله ۴ – تکامل اسکیمای داده و سازگاری
در این مرحله، شما یاد خواهید گرفت که چگونه اسکیمای موجود را بهروزرسانی کنید و این تغییرات چگونه بر سازگاری با نسخهها و مشتریان موجود تأثیر میگذارند.
علاوه بر ذخیره و نسخهبندی اسکیمای دادهها از طریق زمان، Schema Registry برای فعالسازی تکامل اسکیمای داده بسیار مهم است. اسکیمای داده میتواند در طول عمر یک پروژه تغییر کند، در حالی که دادههای تولیدشده قبلاً نمیتوانند تغییر کنند.
Schema Registry مسئول مدیریت سازگاری بین نسخههای مختلف اسکیمای داده است و به مصرفکننده اجازه میدهد تا دادهها را مطابق با نسخه داخلی اسکیمای خود تجزیه کند. این قابلیت به تولیدکنندگان و مصرفکنندگان اجازه میدهد که در ارتباط با نسخه دقیق اسکیمای خود همگام نباشند، زیرا آنها میتوانند در کدهای مختلف قرار داشته باشند.
استراتژیهای اصلی سازگاری که Schema Registry ارائه میدهد عبارتند از:
• BACKWARD: اطمینان میدهد که مصرفکنندگان با استفاده از اسکیمای جدید میتوانند دادههایی را که بر اساس نسخه قبلی ایجاد شدهاند، بخوانند.
• FORWARD: به این معنی که مصرفکنندگان با استفاده از اسکیمای جدید میتوانند دادههایی را که بر اساس اسکیمای جدید ایجاد شدهاند، بخوانند (بدون تضمین برای نسخههای قبلی).
• FULL: ترکیبی از دو استراتژی قبلی است.
• NONE: به این معنی که بررسیهای سازگاری غیرفعال هستند.
سه استراتژی اول دارای همتایان ترانزیتی (مثل BACKWARD_TRANSITIVE) هستند که ملزم میکند اسکیمای جدید با تمام نسخههای قبلی اسکیمای داده سازگار باشد، نه فقط نسخه قبلی.
استراتژی پیشفرض BACKWARD است.
بهروزرسانی اسکیمای TempMeasurement
در این مرحله، شما اسکیمای TempMeasurement را بهروزرسانی میکنید تا یک فیلد جدید به نام measurementDate برای ذخیره تاریخ اندازهگیری اضافه شود.
۱. فایل اسکیمای TempMeasurement را برای ویرایش باز کنید:
nano src/main/java/com/dokafka/TempMeasurement.avsc
۲. آن را به شکل زیر تغییر دهید:
{
“namespace”: “com.dokafka”,
“name”: “TempMeasurement”,
“type”: “record”,
“fields”: [
{
“name”: “measuredValue”,
“type”: “double”
},
{
“name”: “measurerName”,
“type”: “string”
},
{
“name”: “measurementDate”,
“type”: “string”
}
]
}
۳. شما یک فیلد جدید به نام measurementDate تعریف کردهاید که تاریخ اندازهگیری را بهصورت متنی ذخیره خواهد کرد. فایل را ذخیره و بسته کنید.
۴. برای ایجاد نسخه جدید از اسکیمای بهروزرسانیشده در Schema Registry، دستور زیر را اجرا کنید:
curl -X POST -H “Content-Type: application/vnd.schemaregistry.v1+json” \
-d “$(jq -R -s ‘{“schema”: .}’ < src/main/java/com/dokafka/TempMeasurement.avsc)” \
۵. در این مرحله، خطای زیر به شما نمایش داده خواهد شد:
{
“error_code”: 409,
“message”: “Schema being registered is incompatible with an earlier schema for subject \”TempMeasurement\”, details: [{errorType:’READER_FIELD_MISSING_DEFAULT_VALUE’, description:’The field ‘measurementDate’ at path ‘/fields/2’ in the new schema has no default value and is missing in the old schema’, additionalInfo:’measurementDate’}, {oldSchemaVersion: 1}, {oldSchema: ‘{\”type\”:\”record\”,\”name\”:\”TempMeasurement\”,\”namespace\”:\”com.dokafka\”,\”fields\”:[{\”name\”:\”measuredValue\”,\”type\”:\”double\”},{\”name\”:\”measurerName\”,\”type\”:\”string\”}]}’}, {validateFields: ‘false’, compatibility: ‘BACKWARD’}]”
}
این خطا نشان میدهد که اسکیمای جدید با نسخه قبلی سازگار نیست زیرا فیلد measurementDate جدید است و مقداری پیشفرض ندارد. این رفتار باعث میشود که مصرفکنندگان با اسکیمای جدید نتوانند دادههایی را که با اسکیمای قبلی ایجاد شدهاند، بخوانند.
۶. برای رفع این مشکل، فایل را دوباره باز کنید و خطی برای مقدار پیشفرض به فیلد measurementDate اضافه کنید:
{
“namespace”: “com.dokafka”,
“name”: “TempMeasurement”,
“type”: “record”,
“fields”: [
{
“name”: “measuredValue”,
“type”: “double”
},
{
“name”: “measurerName”,
“type”: “string”
},
{
“name”: “measurementDate”,
“type”: “string”,
“default”: “”
}
]
}
۷. پس از ذخیره و بستن فایل، دوباره دستور ارسال اسکیمای جدید را اجرا کنید. خروجی بهصورت زیر خواهد بود:
{“id”:2}
Schema Registry نسخه دوم اسکیمای داده را پذیرفته و شناسه ۲ را به آن اختصاص داده است.
۸. برای دریافت نسخه اول، دستور زیر را اجرا کنید:
۹. خروجی بهصورت زیر خواهد بود:
{
“subject”: “TempMeasurement”,
“version”: 1,
“id”: 1,
“schema”: “{\”type\”:\”record\”,\”name\”:\”TempMeasurement\”,\”namespace\”:\”com.dokafka\”,\”fields\”:[{\”name\”:\”measuredValue\”,\”type\”:\”double\”},{\”name\”:\”measurerName\”,\”type\”:\”string\”}]}”
}
۱۰. برای مشاهده همه نسخهها، دستور زیر را اجرا کنید:
خروجی بهصورت زیر خواهد بود:
[1, 2]
تغییر استراتژی سازگاری
برای تغییر استراتژی سازگاری، دستور زیر را اجرا کنید:
curl -X PUT -H “Content-Type: application/vnd.schemaregistry.v1+json” \
–data ‘{“compatibility”: “BACKWARD_TRANSITIVE”}’ \
خروجی بهصورت زیر خواهد بود:
{“compatibility”:”BACKWARD_TRANSITIVE”}
پاکسازی منابع Docker Compose
برای از بین بردن منابع Docker Compose، دستور زیر را اجرا کنید:
docker-compose -f schema-registry-compose.yaml down
در این مرحله، شما اسکیمای TempMeasurement را مطابق با استراتژی سازگاری BACKWARD بهروزرسانی کرده و آن را در Schema Registry منتشر کردید.
نتیجهگیری
در این مقاله، شما کلاسهای ProducerDemo و ConsumerDemo خود را گسترش دادید تا اشیای TempMeasurement را که با استفاده از Apache Avro سریالیزه شدهاند، تولید و مصرف کنید. همچنین یاد گرفتید که چگونه از Schema Registry برای ذخیرهسازی و تکامل اسکیمای داده استفاده کنید و مشتریان Kafka خود را به آن متصل کنید.
برای امتیاز به این نوشته کلیک کنید!
[کل: 0 میانگین: 0]
نظرات کاربران