그 중 다음 두 강의에 대한 내용이다. 강의 끝의 핸즈온도 설명한다:
•
Working with Schema Formats
•
Managing Schemas
•
Integrating Schema Registry with Client Applications
Working with Schema Formats
스키마 형식인 아브로와 프로토버프에 대해서 좀 더 설명한다. 앞선 강의의 스키마 예제(purchase)를 이어서 설명한다.
프로토콜 버퍼
// src/main/proto/purchase.proto
syntax = "proto3";
package io.confluent.developer.proto;
option java_outer_classname = "PurchaseProto";
message Purchase {
string item = 1;
double total_cost = 2;
string customer_id = 3;
}
Protobuf
복사
메시지의 필드는 <type> <name> = <uniq_id> 로 정의한다. 스칼라 타입은 다음과 같다:
•
string
•
double
•
float
•
int32(자바의 int)
•
int64(자바의 long)
•
bool
스칼라보다 복잡한 필드 타입으로써 콜렉션이 있다:
message Purchase {
string item = 1;
double total_cost = 2;
string customer_id = 3;
repeated string coupon_codes = 4;
map<string, string> item_options = 5;
}
Protobuf
복사
•
repeated : 자바에서 타입에 대한 list로 변환된다.
•
map : 키는 string , int32 , int64 값은 다른 맵을 제외한 모든 타입이 가능하다.
그리고 enumeration도 지원한다:
message Purchase {
string item = 1;
double total_cost = 2;
string customer_id = 3;
repeated string coupon_codes = 4;
map<string, string> item_options = 5;
enum PurchaseType {
RETAIL = 0;
WHOLESALE = 1;
}
PurchaseType type = 6;
}
Protobuf
복사
첫번째 값은, 기본 값이기도 한, 항상 0이어야 한다.
다른 프로토버프 메시지를 가져와(import) 메시지 필드로 사용할 수 있디:
import "purchase.proto"
import "page_view.proto"
message CustomerEvent {
Purchase purchase = 1;
PageView page_view = 2;
}
Protobuf
복사
임의의 필드 값 중 하나인 스키마도 지원한다:
import "purchase.proto"
import "page_view.proto"
message CustomerEvent {
oneof customer_action {
Purchase purchase = 1;
PageView page_view = 2;
}
}
Protobuf
복사
customer_action 필드는 Purchase 와 PageView 중 하나가 된다. 내부 동작은 <필드>Case의 이름의 enum을 만들고 각 값 이름을 대문자로 변환한 값을 사용한다. 여기선 CustomerActionCase enum 안에 PURCHASE 와 PAGE_VIEW 가 있게 된다.
각 타입마다 기본 값이 있다. 메시지 직렬화 시 필드 값이 없다면 지정된다. 또 기본 값을 명시적으로 쓰더라도 값이 실제로 직렬화 되지 않는다:
•
string : ""
•
double : 0.0
•
repeated : []
•
map : {}
•
numbers(int32 , int64): 0
•
bool : false
아브로
{
"type": "record",
"namespace": "io.confluent.developer.avro",
"name": "Purchase",
"fields": [
{
"name": "item",
"type": "string"
},
{
"name": "total_cost",
"type": "double"
},
{
"name": "customer_id",
"type": "string"
},
{
"name": "item_options",
"type": {
"type": "map",
"values": "double"
}
}
]
}
JSON
복사
타입의 중첩 타입 맵으로 정의한다. 키는 암묵적으로 문자열이다.
{
"type": "record",
"namespace": "io.confluent.developer.avro",
"name": "Purchase",
"fields": [
{
"name": "category",
"type": {
"type": "enum",
"name": "purchase_type",
"symbols": [
"RETAIL",
"WHOLESALE"
]
}
}
]
}
JSON
복사
비슷하게 enum 타입도 지원한다.
{
"type": "record",
"namespace": "io.confluent.developer.avro",
"name": "CustomerEvent",
"fields": [
{
"name": "purchase",
"type": "io.confluent.developer.avro.Purchase"
},
{
"name": "page_view",
"type": "io.confluent.developer.avro.PageView"
},
{
"name": "id",
"type": "string"
}
]
}
JSON
복사
필드 타입은 레코드가 될 수도 있다. 프로토버프의 import 와 비슷한, 레코드 참조 기능이 있다. 필드 타입에 다른 레코드의 FQN(fully qualified name)을 적어 준다. 또는 중첩 json으로 레코드를 정의하여 가능하다.
자바 객체로써 사용
// Builder purchaseBuilder = Purchase.newBuilder();
Purchase getPurchaseObject(Builder purchaseBuilder) {
purchaseBuilder.clear();
purchaseBuilder.setCustomerId("vandelay")
.setTotalCost(random.nextDouble() * random.nextInt(100))
.setItem(items.get(random.nextInt(3)));
return purchaseBuilder.build();
}
Java
복사
아브로와 프로토버프 모두 빌더 패턴으로 객체를 생성할 수 있다.
// avro only
purchase.setTotalCost(100.00)
// protobuf and avro
Purchase.newBuilder(purchase)
.setTotalCost(100.00)
.build()
Java
복사
객체의 필드 변경 시 아브로는 세터 메소드를 지원한다. 하지만 프로토버프는 빌더를 통해 갱신해야한다. 프로트버프의 빌더가 반환하는 객체는 불변이기 때문이다. 따라서 갱신한 객체는 새로 생성된 객체이다.
Managing Schemas
스키마 아이디와 버전
레지스트리에 스키마를 등록할 때 제목(subject)과 스키마가 필요하다. subject 관련해선 다음 포스팅에서 설명한다. subject는 고유 ID와 버전 번호로 등록된다.
스키마 등록
스키마 레지스트리에 스키마를 등록하는 방법을 세 가지로 설명한다:
•
CLI confluent
•
REST API(curl 사용하여)
•
Gradle 플러그인
CLI confluent
$ confluent schema-registry schema create \
--subject purchases-vaule \
--schema src/main/proto/purchase.proto \
--type PROTOBUF
Shell
복사
confluent schema-registry schema create -h
타입의 기본 값은 아브로이다.
REST API
# avro
$ jq '. | {schema: tojson}' src/main/avro/purchase.avsc | \
curl -u <API_KEY:API_SECRET> \
-X POST https://<CLUSTER_HOST>/subjects/purchases-value/versions
-H "Content-Type:application/json"
-d @-
Shell
복사
#!/bin/bash
# protoJsonFmt.sh
PROTO_JSON=$(awk '{gsub(/\n/,"\\\n"); gsub(/"/, "\\\"");print}', $1) \
&& SCHEMA="{\"schemaType\":\"PROTOBUF\",\"schema\":\"${PROTO_JSON}\"\n}" \
&& echo ${SCHEMA}
Shell
복사
$ ./protoJsonFmt.sh src/main/proto/purchase.proto | \
curl -u <API_KEY:API_SECRET> \
-X POST https://<CLUSTER_HOST>/subjects/purchases-value/versions
-H "Content-Type:application/json"
-d @-
Shell
복사
아브로의 avsc 파일은 json이지만, 프로토버프 파일은 이를 json 형식으로 바꿔주기 위한 스크립트를 사용한다.
Gradle 플러그인
// build.gradle
plugins {
...
id "com.github.imflog.kafka-schema-registry-gradle-plugin" version "1.6.0"
...
}
schemaRegistry{
...
register {
subject('avro-purchase-value', 'src/main/avro/purchase.avsc', 'AVRO')
subject('proto-purchase-value', 'src/main/proto/purchase.proto', 'PROTOBUF')
}
}
Groovy
복사
gradle이나 maven 플러그인에서 구성을 통해 등록할 수도 있다. subject, 스키마 파일 경로 그리고 스키마 형식을 써준다. ./gradlew registerSchemasTask 를 실행하여 등록한다.
(번외) 프로듀서에서 스키마 자동 등록
스키마 업데이트
변경된 스키마를 레지스트리에 업데이트 하는 것은 생성과 마찬가지로 등록해준다. 그러면 같은 id에 버전이 증가하게 된다. 단순히 업데이트가 아닌 진화를 위해선 상하위 버전 간의 호환성이 필요하다(다음 포스팅에서 설명한다).
스키마 확인과 다운로드
# CLI confluent
$ confluent schema-registry schema describe \
--subject <subject> \
--version <version> \ # defeault "latest"
# REST API
$ curl -u <API_KEY:API_SECRET> \
-s "https://<CLUSTER_HOST>/subjects/<subject>/versions/<version>" jq '.'
# gradle task
$ ./gradlew downloadSchemasTask
Shell
복사
위의 등록 방법마다 일치하게 스키마를 확인하는 또는 로컬에 다운 받는 방법이다.
Integrating Schema Registry with Client Applications
다음 카프카 클라이언트에서 스키마 레지스트리에 연결하고 구성하는 통합에 대해서 알아본다:
•
CLI confluent (vs. kafka CLI)
•
Kafka Java API 애플리케이션(Producer, Consumer)
•
(Kafka Streams 애플리케이션; 안 나온다)
•
ksqlDB
CLI confluent
$ confluent kafka topic produce <topic> \
--value-format protobuf \
--schema src/main/proto/purchase.proto \
--sr-endpoint https://<SR_URL> \
--sr-api-key <API_KEY> \
--sr-api-secret <API_SECRET> \
--cluster <cluster_id>
Shell
복사
(SR은 스키마 레지스트리이다)
CLI 프로듀서로 토픽에 메시지를 쓸 때 사용할 스키마와 스키마 레지스트리에 대한 옵션 플래그 사용법이다. 역시 --value-format 의 기본 값은 아브로이다. 연결한 스키마 레지스트리를 통해 직렬화 방법을 스키마 id와 버전 그리고 형식(프로트버프)으로 기록한다.
$ conflulent kafka topic consume <topic> \
--from-beginning \
--value-format protobuf
--sr-endpoint <SR_URL>
--sr-api-key <API_KEY> \
--sr-api-secret <API_SECRET> \
--cluster <cluster_id>
Shell
복사
CLI 컨슈머도 비슷하다. 스키마 레지스트리에만 연결하면 메시지의 정보를 통해 역직렬화 방식을 가져온다.
vs. kafk CLI
자바 카프카 클라이언트
// producer
// src/main/java/io/confluent/developer/ProducerApp.java
public class ProducerApp {
...
public void producePurchaseEvents() {
Builder purchaseBuilder = Purchase.newBuilder();
Properties properties = loadProperties();
Map<String, Object> producerConfigs = new HashMap<>();
properties.forEach((key, value) -> producerConfigs.put((String) key, value));
producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
producerConfigs.put("schema.registry.url", properties.getProperty("schema.registry.url"));
producerConfigs.put("basic.auth.user.info", properties.getProperty("basic.auth.user.info"));
...
}
...
}
...
Java
복사
// consumer
// src/main/java/io/confluent/developer/ConsumerApp.java
public class ConsumerApp {
public void consumePurchaseEvents() {
Properties properties = loadProperties();
Map<String, Object> consumerConfigs = new HashMap<>();
properties.forEach((key, value) -> consumerConfigs.put((String) key, value));
...
consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class);
consumerConfigs.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, Purchase.class);
consumerConfigs.put("schema.registry.url", properties.getProperty("schema.registry.url"));
consumerConfigs.put("basic.auth.user.info", properties.getProperty("basic.auth.user.info"));
...
}
...
}
...
Java
복사
모든 confluent.properties에 선언된 구성은 적용되지만(properties.forEach), 스키마 레지스트리 관련한 구성은 명시적으로 더 적어준다. 프로듀서와 컨슈머 공통으로 필요한 구성은 다음과 같다:
•
schema.registry.url
•
basic.auth.user.info
프로듀서에선 value.serializer(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)만 구성하면 스키마 subject 규칙에 맞게 레지스트리에서 스키마를 찾아 직렬화한다.
그리고 컨슈머에선, 여러 스키마 형식 중 프로트버프에 대해선, specific.protobuf.value.type(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE)를 구성해야 한다.
프로트버프는 예제처럼 특정 타입일 경우, Message(GeneratedMessageV3?)의 확장이거나 DynamicMessage의 확장이다.
아브로는 SpecificRecord와 GenericRecord의 확장이다.
json의 경우 특정 자바 클래스이거나 JsonNode의 인스턴스이다.
(이건 내가 자바 코드를 정확히 볼줄 몰라 애매하다. 하지만 스키마 형식마다 특정 스키마에 맞춘 타입 클래스의 확장과 제너릭 타입에 대한 설명으로 이해했다.)
위 코드를 실행해보기 전에, 과거 실습으로 생성한 토픽이나 스키마가 있다면 깔끔하게 지우고 하자:
모든 토픽 삭제
모든 SR 스키마 삭제
./gradlew clean build 로 빌드 후 먼저 gradle 플러그인을 사용해서 스키마를 등록한다:
❯ ./gradlew registerSchemasTask
BUILD SUCCESSFUL in 1s
1 actionable task: 1 executed
❯ confluent schema-registry schema list
Schema ID | Subject | Version
------------+----------------------+----------
100007 | proto-purchase-value | 1
100008 | avro-purchase-value | 1
Java
복사
프로듀서가 이벤트를 만들 토픽을 만들어 준다. 프로듀서에 써 있듯 이름은 proto-purchase:
❯ confluent kafka topic create proto-purchase
Java
복사
프로듀서를 실행한다(./gradlew runProducer). proto-purchase-value 스키마를 사용해 메시지를 2개 생성한다:
❯ ./gradlew runProducer
> Task :runProducer
Producer now configured for using SchemaRegistry
Produced record at offset 0 with timestamp 1711120593238
Produced record at offset 1 with timestamp 1711120594110
Java
복사
이어서 컨슈머도 실행하여(./gradlew runConsumer) 스키마를 다운 받아 소비하는 것을 확인한다:
❯ ./gradlew runConsumer
> Task :runConsumer
Purchase details { Customer: vandelay, Total Cost: 17.707262, Item: sun-glasses }
Purchase details { Customer: vandelay, Total Cost: 0.191150, Item: t-shirt }
Java
복사
스키마 생명주기
1.
프로듀서가 레코드 직렬화 단계에 들어가면(serialize.serializer)
2.
serializer는 로컬 캐시에 스키마가 있는지 확인하고
3.
없다면 스키마 레지스트리에서 id를 사용해 검색하고 다운 받는다.
4.
이 스키마를 사용해 직렬화하고 레코드에 스키마 id를 적어서 보낸다.
컨슈머가 이 레코드를 소비할 땐 레코드의 스키마 id를 스키마 레지스트리에서 찾는다. 그리고 똑같이 로컬 캐시에 저장하고 사용한다. 프로듀서와 컨슈머 둘 다 스키마 id가 바뀌지 않는다면 계속 로컬의 스키마를 사용한다.
ksqlDB
CREATE STREAM add_item_to_cart (
cart_id BIGINT,
item_id BIGINT
)
WITH (
KAFKA_TOPIC = 'proto-purchase',
VALUE_FORMAT = 'PROTOBUF'
);
SQL
복사
ksqlDB는 스키마 레지스트리에서 자동으로 스키마를 찾는다. VALUE_FORMAT 만 맞게 적어주면 된다.
ksqlDB 클러스터를 만들고 쿼리해보자. ksqlDB 클러스터는 시간당 과금이 있으니 짧게 사용하고 내리는게 좋다. SCHEMA101 프로모션 코드를 사용하자. 위 쿼리가 성공하면 ADD_ITEM_CART 스트림이 생성된 것을 확인할 수 있다: