Search

ksqlDB 101

Introduction

ksqlDB는 컨플루언트에서 카프카를 저장소로 하는 앱을 만들기 위해 제공하는 서비스다.
전에도 말한거처럼 ksqlDB는 카프카 스트림즈의 일반화 그리고 클라우드 서비스화 된 버전으로 느껴진다. 따라서 스트림즈에서 입력 소스 스트림에 대해 처리하는, filter, join 그리고 (window) aggregation의 동작을 할 수 있다.
다만, 그걸 익숙한 sql 문법으로 제공한다고 한다(내 생각에 카프카는 탈 sql 또는 nosql? 같다고도 느꼈지만). 그러나 표준 sql이라곤 할 수 없어 어차피 특수한 문법이나 키워드는 익혀야 한다.
강의 초반 컨플루언트 클라우드에서 ksqlDB 클러스터 구성에 대한 것은 생략한다. 이번에도 실습 비용을 위해 강의에 따라오는 무료 쿠폰, KSQLDB101을 사용하자. ksqlDB 클러스터는 CSU(confluent streaming units)마다 약 0.3$/h의 꽤 비싼 비용이 발생한다. 강의가 짧으니 빠르게 실습하고 내리자

Interacting

핸즈온 강의에 더 많은 내용이 있어서 이걸 임베드한다.

웹 콘솔

컨플루언트 클라우드 웹 콘솔에서 ksqlDB를 먼저 사용해 볼 것이다. 이 비디오에선 클러스터 대신 ksqlDB 애플리케이션이라는 표현을 쓰는데 현재는 바뀐 것 같고 비디오 강의가 최신화되지 않은걸로 보인다. 클러스터 생성, 실행(up) 후에 들어가 에디터(flow) 탭에서 다음 쿼리를 실행하자:
CREATE STREAM MOVEMENTS (PERSON VARCHAR KEY, LOCATION VARCHAR) WITH (VALUE_FORMAT='JSON', PARTITIONS=1, KAFKA_TOPIC='movements'); INSERT INTO MOVEMENTS VALUES ('Allison', 'Denver'); INSERT INTO MOVEMENTS VALUES ('Robin', 'Leeds'); INSERT INTO MOVEMENTS VALUES ('Robin', 'Ilkley'); INSERT INTO MOVEMENTS VALUES ('Allison', 'Boulder');
SQL
복사
길게 설명하지 않아도 다른 rdb의 sql이 익숙하다면 이해할 수 있을 것이다. 다만 여기서 만드는 것이 테이블과 레코드가 아니라 스트림과 이벤트라는 점이 다르다.
에디터 탭 우측 바에도 생성된 스트림이 보인다. 쿼리로 만든 MOVEMENTS 외에도 ksqlDB 클러스터 메타 스트림에 해당하는 KSQL_PROCESSING_LOG 도 보인다. 에디터 우측의 나머지 탭들도 구경해보자. connect 강의에서 살펴본 stream lineage에 이어 플로우(flow) 탭을 보면 스트림을 시각화하여 보여준다. 컨플루언트의 이런 ui 제공은 좋은거 같다.

CLI

ksqlDB CLI 클라이언트를 사용하여 쿼리해본다. 먼저 컨플루언트 클라우드에 만든 ksqlDB 클러스터에 접근하기 위한 인증 과정이 필요하다. 이는 컨플루언트 CLI로 한다. 그 명령들은 위 ksqlDB 클러스터 콘솔 가장 우측 탭 Confluent CLI에 있다:
# 1. Log in to the Confluent CLI, and set the context to your environment and cluster. $ confluent login $ confluent environment use env-xxxxxx $ confluent kafka cluster use lkc-yyyyy # 2. Create an API key and secret for your ksqlDB cluster. $ confluent api-key create --resource lksqlc-zzzzzz
Shell
복사
여기서 컨플루언트 클라우드 리소스의 위계가 env > (kafka) cluster(lkc-yyyyy) > ksqlDB cluster(lksqlc-zzzzzz) 있는 것을 알 수 있다. 마지막 명령 실행 후 나오는 api 키와 시크릿을 기록해 둔다. 이건 카프카 클러스터가 아닌 ksqlDB 클러스터에 대한 것이다:
$ KSQLDB_API_KEY=... $ KSQLDB_API_SECRET=...
Shell
복사
그리고 ksqlDB CLI를 이용해 접속한다. ksqlDB CLI는 설치나 실행법이 어디에도 아주 정확히 나와 있지 않다:
docker와 tarball을 이용해서 둘 다 써봤는데, docker 이미지 태그는 앞에 confluent/ksqldb-cli 라고 레포지토리를 써주어야 한다:
$ docker run -it confluentinc/ksqldb-cli:0.29.0 ksql \ -u $KSQLDB_API_KEY \ -p $KSQLDB_API_SECRET \ https://<ksqldb-cluster-uid>.<region>.<csp>.confluent.cloud:443
Shell
복사
엔드포인트의 경우 위에서 본 Confluent CLI 탭에 완성되어 있다. 컨플루언트가 뒷단에서 사용하는 클라우드 서비스의 엔드포인트를 알려준다.
tarball 설치의 경우 패키지 설치가 아니라 경로나 권한을 설정해주어야 하고 자바 런타임이 있어야 실행 가능하다(바이너리가 아닌가?). 오픈 소스 카프카도 그러진 않는데… 컨플루언트가 이런 부분은 많이 아쉽다
sql> show streams; Stream Name | Kafka Topic | Key Format | Value Format | Windowed ---------------------------------------------------------------------------------------- KSQL_PROCESSING_LOG | xxxxxx-yyyyyy-processing-log | KAFKA | JSON | false MOVEMENTS | movements | KAFKA | JSON | false ----------------------------------------------------------------------------------------
Shell
복사
ksql> select * from movements emit changes; +------------------------------------------+------------------------------------------+ |PERSON |LOCATION | +------------------------------------------+------------------------------------------+ Press CTRL-C to interrupt
Shell
복사
우선 스트림을 테이블에 대응하면 rdb에서 sql 사용과 비슷하게 이해할 수 있다. 콘솔과 마찬가지로 스트림을 확인할 수 있다.
select 문 뒤의 emit changes는 스트림에 들어오는 이벤트마다 결과에 반영해 출력한다. 이 쿼리를 실행하는 동안 새 이벤트가 발생한다면 결과에 출력된다.
ksql> CREATE TABLE PERSON_STATS WITH (VALUE_FORMAT='AVRO') AS > SELECT PERSON, > LATEST_BY_OFFSET(LOCATION) AS LATEST_LOCATION, > COUNT(*) AS LOCATION_CHANGES, > COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS > FROM MOVEMENTS >GROUP BY PERSON >EMIT CHANGES; Message ------------------------------------------- Created query with ID CTAS_PERSON_STATS_3 ------------------------------------------- ksql> show tables; Table Name | Kafka Topic | Key Format | Value Format | Windowed --------------------------------------------------------------------------------- PERSON_STATS | xxxxxx-yyyyyyPERSON_STATS | KAFKA | AVRO | false ---------------------------------------------------------------------------------
Shell
복사
강의에 있는 쿼리로 테이블을 생성한다. 여기서 테이블은 스트림에 대한 구체화 뷰로써 카프카 스트림즈의 KTable과 비슷하게 이해하면 되겠다.
(이어지는 모듈에서도) REST API 관련 설명은 생략한다.
몇몇 언어의 라이브러리로써도 지원한다. 자바는 컨플루언트에서 공식적으로 파이썬(너무 업데이트가 없다)과 는 커뮤니티 지원이다. 뭔가 신뢰는 안간다.

Creating, Exporting, and Importing Data

ksqlDB에 데이터를 생성하는 방법은 앞서 살펴 본 몇몇 클라이언트를 통해 CREATE STREAM/TABLE 쿼리로 스트림이나 테이블을 만들고, INSERT 쿼리로 이벤트 데이터를 채울 수 있다.

카프카 토픽과 연동

ksqlDB의 스트림과 테이블은 카프카 토픽이 뒷단에 있게 된다(backed by the topic):
따라서 둘 다 생성 시 WITH 절에 KAFKA_TOPIC은 필수 프로퍼티다. 만약 프로퍼티의 토픽이 없다면 ksqlDB에서 자동으로 만든다(앞서 만드는 순서를 생각하면 ksqlDB 클러스터가 카프카 클러스터에 종속적인 리소스이다).

다른 시스템에서 ksqlDB로 데이터 연동

카프카에선 커넥트 클러스터의 커넥터를 만들어 다른 시스템에 데이터를 보내거나 거기서 데이터를 스트림이나 테이블로 가져올 수 있다. CREATE CONNECTOR 문이 있고 WITH 절 안의 프로퍼티대로 플러그인 커넥터를 만들 수 있다:
다음은 mongodb에서 cdc를 가져오는 소스 커넥터와 스트림 이벤트를 엘라스틱서치 인덱스에 보내는 싱크 커넥터를 만드는 쿼리 예시이다:
CREATE SOURCE CONNECTOR SOURCE_MONGODB_UNIFI_01 WITH ( 'connector.class' = 'io.debezium.connector.mongodb.MongoDbConnector', 'mongodb.hosts' = 'rs0/mongodb:27017', 'mongodb.name' = 'unifi', 'collection.whitelist' = 'ace.device, ace.user' ); CREATE SINK CONNECTOR SINK_ELASTIC_ORDERS_01 WITH ( 'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', 'topics' = 'ORDERS_ENRICHED', 'Connection.url' = 'http://elasticsearch:9200', 'type.name' = '_doc' );
SQL
복사

Filtering

sql의 WHERE절을 알고 있다면 정말 쉽다. 정확히 그것에 대한 이야기이다. 그림처럼 주소가 여러 주에 속하는 이벤트를 뉴욕(NY)에 대해서만 걸러 스트림을 만드는 예제를 할 것이다.
먼저 원본 orders 스트림을 만든다:
CREATE stream orders (id INTEGER KEY, item VARCHAR, address STRUCT < city VARCHAR, state VARCHAR >) WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='json', partitions=1); INSERT INTO orders(id, item, address) VALUES(140, 'Mauve Widget', STRUCT(city:='Ithaca', state:='NY')); INSERT INTO orders(id, item, address) VALUES(141, 'Teal Widget', STRUCT(city:='Dallas', state:='TX')); INSERT INTO orders(id, item, address) VALUES(142, 'Violet Widget', STRUCT(city:='Pasadena', state:='CA')); INSERT INTO orders(id, item, address) VALUES(143, 'Purple Widget', STRUCT(city:='Yonkers', state:='NY')); INSERT INTO orders(id, item, address) VALUES(144, 'Tan Widget', STRUCT(city:='Amarillo', state:='TX'));
SQL
복사
STRUCT라는 키워드가 생소했는데, 표준 sql에선 중첩 객체 구조로써 지원한다. 이 스트림으로 부터 조건에 맞는 ny_orders 스트림을 만든다:
CREATE STREAM ny_orders AS SELECT * FROM ORDERS WHERE ADDRESS->STATE='NY' EMIT CHANGES;
SQL
복사
구조체 하위 필드는 화살표 연산자(->)로 접근한다. 이 스트림을 확인하면 주소의 주(state)가 모두 뉴욕이다:
SELECT * FROM ny_orders EMIT CHANGES;
SQL
복사

Lookups and Joins

이번에도 sql의 JOIN을 알면 쉽다. 다른 이벤트(스트림) 또는 데이터에서 참조키(외래키) 기반으로 데이터를 증강시킬 수 있다(enrich). ksqlDB에선 스트림 간 또는 테이블 간의 조인 뿐만 아니라 스트림-테이블의 조인도 가능하다.
핸즈온은 스트림-테이블 조인을 다룬다. 먼저 items 테이블을 만든다:
CREATE TABLE items (id VARCHAR PRIMARY KEY, make VARCHAR, model VARCHAR, unit_price DOUBLE) WITH (KAFKA_TOPIC='items', VALUE_FORMAT='avro', PARTITIONS=1); INSERT INTO items VALUES('item_3', 'Spalding', 'TF-150', 19.99); INSERT INTO items VALUES('item_4', 'Wilson', 'NCAA Replica', 29.99); INSERT INTO items VALUES('item_7', 'SKLZ', 'Control Training', 49.99);
SQL
복사
그리고 조인할 스트림 orders를 만든다. 전에 만든 스트림과 이름이 겹치므로 삭제 후 새로 만든다:
# cleanup DROP STREAM ny_orders; DROP STREAM orders; CREATE STREAM orders (ordertime BIGINT, orderid INTEGER, itemid VARCHAR, orderunits INTEGER) WITH (KAFKA_TOPIC='item_orders', VALUE_FORMAT='avro', PARTITIONS=1);
SQL
복사
그리고 두 테이블, 스트림을 조인하여 orders_enriched 스트림을 만든다. 조인키는 아이템의 id이고, left outer 조인으로 모든 orders 스트림 이벤트에 대한 조인 이벤트를 생성한다:
CREATE STREAM orders (ordertime BIGINT, orderid INTEGER, itemid VARCHAR, orderunits INTEGER) WITH (KAFKA_TOPIC='item_orders', VALUE_FORMAT='avro', PARTITIONS=1);
SQL
복사
이제 orders 이벤트를 생성하여 orders_enriched 스트림의 결과를 확인한다. 일반적인 rdb와 다른 점은 EMIT CHANGES로 실시간으로 생성되는 이벤트를 관찰할 수 있다는 것이다:
INSERT INTO orders VALUES (1620501334477, 65, 'item_7', 5); INSERT INTO orders VALUES (1620502553626, 67, 'item_3', 2); INSERT INTO orders VALUES (1620503110659, 68, 'item_7', 7); INSERT INTO orders VALUES (1620504934723, 70, 'item_4', 1); INSERT INTO orders VALUES (1620505321941, 74, 'item_7', 3); INSERT INTO orders VALUES (1620506437125, 72, 'item_7', 9); INSERT INTO orders VALUES (1620508354284, 73, 'item_3', 4); # results SELECT * FROM orders_enriched EMIT CHANGES;
SQL
복사

Transforming Data

앞선 filtering, join에 이은 삼대장 transforming이다. map 함수와 비슷하게 특정 이벤트 필드에 데이터를 변환해 새 스트림을 만드는 동작이다.
명시적으로 타입을 캐스팅하는 CAST 나 다른 스칼라 함수를 사용하여 필드를 변환할 수 있다. 강의에서 나온 TIMESTAMPTOSTRING 은 deprecated 되고 FORMAT_TIMESTAMP 를 사용하라고 한다. 전체 스칼라 함수도 참고하자:
또 명시적 타입 캐스팅이 아닌 묵시적 타입 coercion도 참고할만한다:
그림과 같은 예제 진행을 하기 위해 orders 스트림에 에폭시 밀리 타임스탬프 값으로 이벤트를 생성하고 사람이 읽기 쉬운 타임스탬프로 변환해보자. 이번에도 지난 실습에서 사용한 orders를 삭제 후 진행한다:
# cleanup DROP STREAM orders_enriched DELETE TOPIC; DROP STREAM orders DELETE TOPIC; CREATE STREAM orders (ordertime BIGINT, orderid INTEGER, itemid VARCHAR, orderunits INTEGER, address STRUCT < street VARCHAR, city VARCHAR, state VARCHAR>) WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='avro', PARTITIONS=1);
SQL
복사
그리고 orders에 이벤트를 생성한다:
INSERT INTO orders VALUES (1620504934723, 70, 'item_5', 1, STRUCT(street:='210 West Veterans Drive', city:='Sacramento', state:='California Foo2')); INSERT INTO orders VALUES (16205059321941, 72, 'item_6', 9, STRUCT(street:='10043 Bella Vista Blvd', city:='Oakland', state:='California')); INSERT INTO orders VALUES (1620503083019, 77, 'item_7', 12, STRUCT(street:='10083 Garvey Ave', city:='Rosemead', state:='California'));
SQL
복사
타임스탬프 필드(ordertime)를 문자열(order_timestamp)로 변환하는 스트림 orders_no_address를 만든다. ordertime이 BIGINT이기 때문에 여기선 TIMESTAMPTOSTRING를 그대로 사용한다:
CREATE STREAM orders_no_address AS SELECT TIMESTAMPTOSTRING(ordertime, 'yyyy-MM-dd HH:mm:ss') AS order_timestamp, orderid, itemid, orderunits FROM orders EMIT CHANGES;
SQL
복사
결과를 확인하면 order_timestamp가 원하는 포맷으로 출력되어야 한다:
SELECT * FROM orders_no_address EMIT CHANGES; { "ORDER_TIMESTAMP": "2021-05-08 19:44:43", "ORDERID": 77, "ITEMID": "item_7", "ORDERUNITS": 12 } ...
SQL
복사

Flatten Nested Records

중첩 레코드(객체)의 flatten… 이라곤 하지만, 다른 rdb와 다르게 중첩이 가능한 필드 레코드를 다루는 ksqlDB에서 하위 필드에 접근하는 방법에 대한 설명이라고 보면 될거 같다.
앞서 본거처럼 STRUCT 필드 아래엔 다른 중첩 필드가 있을 수 있고 -> 를 통해 접근한다
STRUCT 외에 구조체 데이터 타입으로써 ARRAY와 MAP이 있다. 각각 스트림이나 테이블에서 필드를 선언할 때 하위 타입을 지정해줄 수 있다:
CREATE TABLE users (registertime BIGINT, userid VARCHAR, gender VARCHAR, regionid VARCHAR, interests ARRAY<STRING>, # ARRAY: 엔트리 타입 지정 contactinfo MAP<STRING,STRING>) # MAP: 키-값의 타입 지정 WITH (KAFKA_TOPIC = 'users', VALUE_FORMAT='JSON', KEY = 'userid');
SQL
복사
둘 다 익숙한 [<key>] 로써 하위 객체에 접근한다. 배열은 숫자 인덱스 맵은 키를 써주면 된다:
CREATE TABLE users_interest_and_contactinfo AS SELECT interests[0] AS first_interest, # 배열 접근 contactinfo['zipcode'] AS zipcode, # 맵 접근 1 contactinfo['city'] AS city, # 맵 접근 2 userid, gender, regionid FROM users EMIT CHANGES;
SQL
복사

Converting Data Formats

앞선 trasforming이 이벤트 필드의 타입에 대한 것이라면 여기서 말하는 converting은 아브로, json 같은 이벤트 자체의 데이터 형식 즉, 직렬화-역직렬화에 대한 것이다.
예제는 좀 특이하게? 아브로에서 csv, 구분자 있는(DELIMITED) 형식 변환을 예시로 든다. 이는 스트림을 만들 때 WITH 절의 프로퍼티 VALUE_FORMAT에 지정할 수 있다(키에도 가능하다 KEY_FORMAT).
모든 가능한 형식은 다음과 같다:
NONE
DELIMITED =~ csv, 콤마(,) 구분 나열 값
JSON, JSON_SR (스키마 레지스트리 유무)
AVRO
KAFKA
PROTOBUF , PROTOBUF_NOSR (스키마 레지스트리 유무)
NONE은 키에만 쓰이는 특별한 형식으로 값에 대해 역직렬화를 하지 마라는 마커이다.그 외는 형식에 따라 스키마 레지스트리 필요 여부와 스키마 추론, 단일 필드 wrapping/unwarpping 여부가 각각 다르다.

Merging Two Streams

말그대로 두 개 이상의 스트림의 이벤트를 하나의 스트림(=토픽)에 모으는 기능이다. INSERT INTO 쿼리를 같은 대상 스트림으로 서로 다른 스트림에서부터 각각 실행하면 된다:
먼저 order_uk와 order_us 스트림을 각각 만들고 이벤트를 생성한다:
# order_uk CREATE STREAM orders_uk (ordertime BIGINT, orderid INTEGER, itemid VARCHAR, orderunits INTEGER, address STRUCT< street VARCHAR, city VARCHAR, state VARCHAR>) WITH (KAFKA_TOPIC='orders_uk', VALUE_FORMAT='json', PARTITIONS=1); INSERT INTO orders_uk VALUES (1620501334477, 65, 'item_7', 5, STRUCT(street:='234 Thorpe Street', city:='York', state:='England')); INSERT INTO orders_uk VALUES (1620502553626, 67, 'item_3', 2, STRUCT(street:='2923 Alexandra Road', city:='Birmingham', state:='England')); INSERT INTO orders_uk VALUES (1620503110659, 68, 'item_7', 7, STRUCT(street:='536 Chancery Lane', city:='London', state:='England')); # order_us CREATE STREAM orders_us (ordertime BIGINT, orderid INTEGER, itemid VARCHAR, orderunits INTEGER, address STRUCT< street VARCHAR, city VARCHAR, state VARCHAR>) WITH (KAFKA_TOPIC='orders_us', VALUE_FORMAT='json', PARTITIONS=1); INSERT INTO orders_us VALUES (1620501334477, 65, 'item_7', 5, STRUCT(street:='6743 Lake Street', city:='Los Angeles', state:='California')); INSERT INTO orders_us VALUES (1620502553626, 67, 'item_3', 2, STRUCT(street:='2923 Maple Ave', city:='Mountain View', state:='California')); INSERT INTO orders_us VALUES (1620503110659, 68, 'item_7', 7, STRUCT(street:='1492 Wandering Way', city:='Berkley', state:='California'));
SQL
복사
그리고 두 스트림을 merge할 orders_combined를 만든다. 우선 orders_us에서 가져와 새로 스트림을 만들고, orders_uk는 INSERT INTO 로 합친다:
CREATE STREAM orders_combined AS SELECT 'US' AS source, ordertime, orderid, itemid, orderunits, address FROM orders_us; # merge INSERT INTO orders_combined SELECT 'UK' AS source, ordertime, orderid, itemid, orderunits, address FROM orders_uk;
SQL
복사
여기서 AS 앞의 US/UK는 source 필드의 값으로 추가하여 이벤트를 생성한다:
{ "SOURCE": "UK", # AS 에 의해 추가된 필드 "ORDERTIME": 1620503110659, "ORDERID": 68, "ITEMID": "item_7", "ORDERUNITS": 7, "ADDRESS": { "STREET": "536 Chancery Lane", "CITY": "London", "STATE": "England" } }
SQL
복사
Flow 탭에서 merge 되는 두 스트림을 확인할 수 있다:

Splitting Two Streams

아까 합친 스트림을 두 스트림으로 쪼개는 것이다. 더 간단하다. source 필드를 기준으로 필터하여(WHERE) 각각 스트림에 넣어준다. WHERE 를 쓴거니 꼭 source 필드일 필요도 없다:
CREATE STREAM us_orders AS SELECT * FROM orders_combined WHERE source = 'US'; CREATE STREAM us_orders AS SELECT * FROM orders_combined WHERE source = 'US';
SQL
복사

Streams and Tables

스트림과 테이블에 대한 설명이다. 카프카 스트림즈의 KStream과 KTable과 같은거라 보아도 무방하겠다. 스트림은 연속되는 이벤트 기록이고 테이블은 한 순간의 상태, 스트림의 로그 압축된 것이라고 할 수 있다. 영상의 예제가 같은 레코드 셋을 비교해서 더 나은거 같다.

Materialized View

카프카 스트림즈에서 윈도우에 대한 집계 함수를 사용한 것이 ksqlDB에선 GROUP BY 와 함께 쓰는 SUM, COUNT 같은 sql 집계 함수이다:
집계 함수를 쓰기 위해선 항상 GROUP BY 를 써야하며 여기에 원하는 필드(키)를 하나 이상 줄 수 있다. 또 카프카 스트림즈에서처럼 tumbling, hopping, session 등의 윈도우를 사용할 수도 있다:
집계 함수 즉, 상태 있는 집계(stateful aggregation) 쿼리의 결과는 항상 테이블, 구체화 뷰(materialized view)이다.

Push and Pull Queries

ksqlDB 쿼리는 크게 두 타입이 있다. push와 pull. 둘의 차이는 같은 select 쿼리라도 EMIT CHANGES 를 붙이면 앞서 본 것처럼 조건에 해당하는 새로운 이벤트가 계속 결과로 보인다. 쿼리 맞는 이벤트를 구독하는 것이다. 반면 pull 쿼리는 현재의 상태만 결과로 보여준다:
# push SELECT LATEST_LOCATION, LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_STATS WHERE PERSON = 'Allison' EMIT CHANGES; # pull SELECT LATEST_LOCATION, LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_STATS WHERE PERSON = 'Allison' EMIT CHANGES;
SQL
복사
문서에는 한가지 유형이 더 있다. persistent 쿼리는 서버 측에서 계속 실행되는 쿼리이다. push와 pull은 클라이언트 측 쿼리이다.

Under the Covers of ksqlDB

ksqlDB under the cover라지만 겉핥기 정도이다. sql 사용법만 봤으니 내부 구조를 짧게라도 살펴본다.
놀랍지도 않게, ksqlDB는 카프카 스트림즈를 사용하고 당연히 상태 저장소로써 RockDB를 쓴다고 한다. ksqlDB의 스트림과 테이블은 연결된 카프카 클러스터의 토픽을 뒷단으로 하여 이벤트를 생성, 소비한다.
클러스터로 구성하면 노드를 확장하여 파티션만큼 분산, 병렬처리가 가능하다. 쿼리의 실행은 카프카 스트림즈 토폴로지를 구성하여 스트림즈 앱을 배포하는 것과 비슷하다.
진짜 짧게 끝냈기 때문에 ksqlDB에 대해 더 깊게 볼 수 있는 읽기 자료를 첨부했다:

ksql-migrations

스트림이나 테이블을 생성하는 ksqlDB 쿼리를 이제 (카프카 스트림즈)애플리케이션이라고 부른다(빌드업이 좀… ). 이 애플리케이션의 변화에 대한 버저닝 방법, alter 쿼리에 대한 마이그레이션 방법을 소개한다.
ksql-migrations 라는 CLI 툴이 있다고 한다. 읭? 어떻게 설치하는지도 안알려준다. 그리고 딸랑 CLI 라니… 이걸 사용하면 처음에 만든 쿼리에 대한 변경(alter)를 적용할 수 있다는데, 서비스라고 보긴 어려울거 같고 정말 할 수 있다 정도 같다. 핸즈온도 없기 때문에 나도 예시만 적어두고 끄덕끄덕하고 끝낸다:
# v1 echo 'CREATE TABLE users ( \ name STRING, registration_date TIMESTAMP \ ) WITH (kafka_topic=’users’, value_format=’json’ \ );' > V000001__create_users.sql # v2 echo 'ALTER TABLE users ADD COLUMN \ userid INT;' > V000002__create_users.sql ksql-migrations -c config.properties info
SQL
복사

Lambda Functions

ksqlDB에 내장된 몇가지 사용자 정의 람다를 호출하는 함수가 있다(invocation functions). TRANSFORM , REDUCE , FILTER 이다:
이는 필드 타입 중 구조체 타입인 ARRAY와 MAP을 위해 만들어졌다. ARRAY는 요소, MAP은 키 값에 대해서 람다 연산 할 수 있어야 하므로 둘의 인터페이스는 다르다. 예를 들어 TRANSFORM 의 람다는 배열의 경우 요소에 대해 하나만 쓰지만:
TRANSFORM(lambda_arr, (e) => UCASE(e))
SQL
복사
맵은 키와 값에 대해 두 개를 쓴다:
TRANSFORM(lambda_map, (k, v) => UCASE(k), (k, v) => v + 5)
SQL
복사
각 람다 호출 함수에 대한 강의의 예시이다:
# transform CREATE STREAM transformed AS SELECT id, TRANSFORM(map_field,(k,v)=> UCASE(k), (k, v)=> (v * 2)) FROM stream1 EMIT CHANGES; # reduce CREATE STREAM reduced AS SELECT name, REDUCE(scores,0,(s,x)=> (s+x)) AS total FROM stream2 EMIT CHANGES; # filter CREATE STREAM filtered AS SELECT id, FILTER(numbers,x => (x%2 = 0)) AS even_numbers FROM stream3 EMIT CHANGES;
SQL
복사

Monitoring