1The Problem β Why Build Another SQL Layer?
Apache Kafka is everywhere. It is the de-facto backbone for event-driven architectures, real-time pipelines, and microservice communication. But for all its power, Kafka's native interface is still produce bytes, consume bytes. If you want to filter, aggregate, join, or window a stream of events, you're writing a Kafka Streams or Flink application β a non-trivial amount of boilerplate, build configuration, and operational overhead for what is, conceptually, a one-liner in SQL.
Confluent's ksqlDB solved this beautifully: you write SELECT ... FROM stream WHERE ... EMIT CHANGES, and it materializes a continuously running Kafka Streams topology behind the scenes. But ksqlDB is tightly coupled to the Confluent ecosystem, carries a licensing model that doesn't fit every team, and is a heavyweight component to operate.
We asked ourselves: What if we built a lightweight, open-source, ksqlDB-compatible SQL engine that runs against vanilla Apache Kafka and Amazon MSK β as a single JAR?
That question led to KafkaSQL.
A single java -jar kafkasql.jar should start a server that accepts SQL, builds Kafka Streams topologies on the fly, and streams results back over REST, WebSocket, or a CLI β with zero Confluent dependencies.
Tech Stack at a Glance
2High-Level Architecture
Before writing a line of code, we mapped out how a SQL statement flows from user to Kafka topic. The architecture separates concerns into five Maven modules β parser, engine, server, CLI, and UI β each buildable and testable independently.
graph TB
subgraph clients ["Client Interfaces"]
CLI["CLI Tool\n(JLine 3)"]
REST["REST Client"]
WebUI["Web UI\n(React + Monaco)"]
end
subgraph server ["KafkaSQL Server β Spring Boot"]
Parser["SQL Parser\n(ANTLR4)"]
Planner["Query Planner\n(Logical β Physical)"]
TopologyBuilder["Topology Builder\n(Physical β Kafka Streams)"]
MetaStore["Metadata Store\n(compacted topic)"]
StreamsRT["Kafka Streams\nRuntime"]
RocksDB["RocksDB\nState Store"]
end
subgraph kafka ["Kafka Cluster"]
UserTopics["User Topics\n(orders, clicks, payments, β¦)"]
InternalTopics["Internal Topics\n(_kafkasql_commands,\n_kafkasql_query_*)"]
end
CLI -->|"HTTP"| Parser
REST -->|"HTTP"| Parser
WebUI -->|"WebSocket"| Parser
Parser --> Planner
Planner --> TopologyBuilder
TopologyBuilder --> StreamsRT
MetaStore <-.->|"read/write"| InternalTopics
StreamsRT -->|"consume/produce"| UserTopics
StreamsRT -->|"changelog"| InternalTopics
StreamsRT --- RocksDB
style clients fill:#1c2129,stroke:#30363d,color:#e6edf3
style server fill:#161b22,stroke:#58a6ff,color:#e6edf3
style kafka fill:#1c2129,stroke:#d29922,color:#e6edf3
style CLI fill:#0d1117,stroke:#58a6ff,color:#79c0ff
style REST fill:#0d1117,stroke:#58a6ff,color:#79c0ff
style WebUI fill:#0d1117,stroke:#58a6ff,color:#79c0ff
style Parser fill:#0d1117,stroke:#bc8cff,color:#bc8cff
style Planner fill:#0d1117,stroke:#bc8cff,color:#bc8cff
style TopologyBuilder fill:#0d1117,stroke:#bc8cff,color:#bc8cff
style MetaStore fill:#0d1117,stroke:#3fb950,color:#3fb950
style StreamsRT fill:#0d1117,stroke:#d29922,color:#d29922
style RocksDB fill:#0d1117,stroke:#e6c07b,color:#e6c07b
style UserTopics fill:#0d1117,stroke:#d29922,color:#d29922
style InternalTopics fill:#0d1117,stroke:#d29922,color:#d29922
Query Execution Flow
Every SQL statement follows a well-defined pipeline:
- Parse β The ANTLR4 lexer and parser transform raw SQL text into a typed Abstract Syntax Tree.
- Plan β The query planner validates the AST against the metadata store, resolves column types, and builds a logical plan tree (
ProjectNode,FilterNode,AggregateNode, etc.). - Translate β The physical planner converts the logical plan into a
KafkaStreamsPhysicalPlanβ a 1:1 mapping to Kafka Streams DSL operations. - Execute β A new
KafkaStreamsinstance is started with the generatedTopology. For push queries, results are streamed back to the client via SSE or WebSocket.
Each persistent query (CREATE TABLE AS SELECT ...) runs in its own KafkaStreams instance with an isolated state directory. This means queries are independently restartable, scalable, and killable β at the cost of per-query resource overhead.
3Phase 1 β Writing the SQL Parser with ANTLR4
The first module we built β kafkasql-parser β is the foundation everything else depends on. Parsing SQL correctly is deceptively hard; parsing streaming SQL (with EMIT CHANGES, window clauses, and topic bindings) has no widely adopted grammar to borrow from.
The Grammar
We wrote a custom ANTLR4 grammar file β KafkaSQL.g4 β covering the streaming SQL dialect we wanted to support:
ANTLR4
// KafkaSQL.g4 β Streaming SQL Grammar (simplified excerpt)
grammar KafkaSQL;
statement
: createStreamStatement
| createTableAsSelectStatement
| selectStatement
| dropStatement
| showStatement
| describeStatement
;
createStreamStatement
: CREATE STREAM identifier
'(' columnDefinition (',' columnDefinition)* ')'
WITH '(' streamProperties ')'
;
selectStatement
: SELECT selectElements
FROM identifier
(WHERE expression)?
(GROUP BY groupByElements)?
(WINDOW windowSpec)?
(EMIT CHANGES)?
;
windowSpec
: TUMBLING '(' SIZE duration ')'
| HOPPING '(' SIZE duration ',' ADVANCE duration ')'
| SESSION '(' duration ')'
;
ANTLR4 generates a lexer, a parser, and a visitor/listener base class from this .g4 file at build time (via the Maven antlr4-maven-plugin). Our job is to write a visitor that walks the parse tree and produces typed AST nodes.
The AST
We created a sealed class hierarchy for AST nodes. Each node type maps directly to a SQL construct:
JAVA
public sealed interface Statement permits
CreateStreamStatement,
CreateTableAsSelectStatement,
SelectStatement,
DropStatement,
ShowStatement,
DescribeStatement { }
public record CreateStreamStatement(
String name,
List<ColumnDefinition> columns,
Map<String, String> properties // KAFKA_TOPIC, VALUE_FORMAT, etc.
) implements Statement { }
public record SelectStatement(
List<SelectItem> selectItems,
String fromStream,
Optional<Expression> whereClause,
Optional<GroupByClause> groupBy,
Optional<WindowClause> window,
boolean emitChanges
) implements Statement { }
Using Java 17 sealed interfaces and records gave us exhaustive pattern matching with zero boilerplate β a parse-time compile error if a new statement type isn't handled downstream.
ANTLR4's error recovery is aggressive by default. Ambiguous or malformed SQL silently produces a partial parse tree instead of failing. We had to implement a custom BailErrorStrategy and a descriptive ANTLRErrorListener to surface clear error messages like "line 3:14 β expected column type, got 'FOOBAR'".
4Phase 2 β The Core Engine: From AST to Kafka Streams Topology
This is the heart of KafkaSQL β the kafkasql-engine module. It takes a parsed AST and produces a running Kafka Streams topology. Three major subsystems make this work.
4.1 The Metadata Store
Streams and tables registered with CREATE STREAM or CREATE TABLE need to survive server restarts. We store metadata on a compacted Kafka topic (_kafkasql_commands). On startup, the engine replays this topic to rebuild the in-memory registry. This means KafkaSQL is stateless at the application layer β Kafka is the database for metadata.
JAVA
public class MetadataStore {
private final ConcurrentMap<String, StreamMetadata> streams
= new ConcurrentHashMap<>();
private final ConcurrentMap<String, TableMetadata> tables
= new ConcurrentHashMap<>();
// Replays _kafkasql_commands on startup
public void restore(Consumer<String, byte[]> commandConsumer) {
// poll from beginning, deserialize, populate maps
}
// Writes new metadata to the command topic
public void register(StreamMetadata meta) {
commandProducer.send("_kafkasql_commands", meta.name(), serialize(meta));
streams.put(meta.name(), meta);
}
}
4.2 The Query Planner
The planner operates in two stages:
- Logical Plan β A tree of relational operations. A
SELECT a, b FROM orders WHERE amount > 100 GROUP BY regionbecomes:
AggregateNode (GROUP BY region, COUNT(*))
β
FilterNode (amount > 100)
β
ProjectNode (a, b)
β
SourceNode (orders)
- Physical Plan β Translates each logical node into a Kafka Streams DSL operation. The mapping is direct and deterministic:
| SQL Construct | Kafka Streams DSL |
|---|---|
SELECT a, b FROM stream | stream.mapValues(r -> project(r, [a, b])) |
WHERE condition | stream.filter((k, v) -> evaluate(condition, v)) |
GROUP BY key | stream.groupByKey() or stream.groupBy(...) |
COUNT(*) | .count() β KTable |
SUM(col) | .aggregate(init, aggregator) β KTable |
AVG(col) | .aggregate(...) with sum + count tracker |
WINDOW TUMBLING (5 MIN) | .windowedBy(TimeWindows.ofSizeWithNoGrace(5m)) |
WINDOW HOPPING | .windowedBy(TimeWindows.ofSizeAndGrace(...).advanceBy(...)) |
STREAM-TABLE JOIN | kStream.join(kTable, valueJoiner) |
STREAM-STREAM JOIN | kStream.join(other, joiner, JoinWindows.of(...)) |
EMIT CHANGES | KStream.foreach() β push to WebSocket/SSE |
4.3 The Topology Builder
The topology builder is where the physical plan becomes real. It walks the plan tree and assembles an org.apache.kafka.streams.Topology object:
JAVA
public class TopologyBuilder {
public Topology build(PhysicalPlan plan, StreamsBuilder builder) {
KStream<String, JsonNode> source = builder.stream(
plan.sourceTopic(),
Consumed.with(Serdes.String(), jsonSerde)
);
KStream<String, JsonNode> pipeline = source;
// Apply WHERE filter
if (plan.hasFilter()) {
pipeline = pipeline.filter((key, value) ->
expressionEvaluator.evaluate(plan.filterExpression(), value)
);
}
// Apply SELECT projection
pipeline = pipeline.mapValues(value ->
expressionEvaluator.project(plan.selectItems(), value)
);
// Apply GROUP BY + aggregation
if (plan.hasGroupBy()) {
KGroupedStream<String, JsonNode> grouped =
pipeline.groupBy((key, value) ->
value.get(plan.groupByKey()).asText()
);
KTable<String, Long> aggregated = grouped.count(
Materialized.as(plan.queryId() + "-store")
);
aggregated.toStream().to(plan.outputTopic());
} else {
pipeline.to(plan.outputTopic());
}
return builder.build();
}
}
4.4 The Expression Evaluator
WHERE predicates and SELECT expressions operate on individual records at runtime. The expression evaluator walks the expression AST and evaluates it against a JsonNode record. Supporting types like VARCHAR, INT, BIGINT, DOUBLE, BOOLEAN, and TIMESTAMP required a small but careful type system with coercion rules.
We chose to interpret expressions at runtime rather than compile them to bytecode. For v1, the per-record overhead of tree-walking evaluation is negligible compared to Kafka I/O latency. Bytecode generation (via ASM or Janino) is a planned optimization for compute-heavy expressions.
5Phase 3 β The Server: REST API and Push Queries
The kafkasql-server module is a Spring Boot 3.x application that ties everything together. It exposes the SQL engine over HTTP and manages the lifecycle of running Kafka Streams topologies.
REST API
| Method | Endpoint | Purpose |
|---|---|---|
POST | /ksql | Execute DDL or start a persistent query |
POST | /query | Run a push query; stream results via SSE |
GET | /status | Server health and cluster info |
GET | /queries | List all running queries |
DELETE | /queries/{id} | Terminate a persistent query |
GET | /streams | List registered streams |
GET | /tables | List registered tables |
Push Queries with Spring WebFlux
Push queries (SELECT ... EMIT CHANGES) are the most interesting part of the server. Unlike a traditional request/response, a push query keeps the connection open and streams every matching record to the client in real time.
We used Spring WebFlux with Server-Sent Events (SSE) for HTTP clients and WebSocket for the Web UI:
JAVA
@PostMapping(value = "/query", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> pushQuery(@RequestBody SqlRequest request) {
Statement stmt = parser.parse(request.sql());
PhysicalPlan plan = planner.plan(stmt);
Sinks.Many<JsonNode> sink = Sinks.many().multicast().onBackpressureBuffer();
// The topology pushes records into the sink
Topology topology = topologyBuilder.build(plan, sink);
KafkaStreams streams = new KafkaStreams(topology, streamsConfig);
streams.start();
return sink.asFlux()
.map(record -> ServerSentEvent.builder(record.toString()).build())
.doOnCancel(() -> streams.close());
}
Why Not Spring Cloud Stream?
Spring Cloud Stream provides a declarative, annotation-driven model for Kafka Streams (@Input, @Output, function bindings). But KafkaSQL needs to construct topologies dynamically at runtime based on user SQL β something the declarative model fundamentally cannot support. We use Spring Boot for its REST layer, DI container, configuration management, and Actuator health checks, while keeping Kafka Streams topology construction entirely in our own code.
Configuration
The server is configured via a standard Spring Boot application.yml:
YAML
kafkasql:
kafka:
bootstrap-servers: localhost:9092
security-protocol: PLAINTEXT
state:
dir: /tmp/kafkasql-state
internal:
command-topic: _kafkasql_commands
replication-factor: 1
6Phase 4 β The CLI: An Interactive Terminal Experience
A SQL engine without a CLI is an API without a face. We built kafkasql-cli using JLine 3, the same library that powers the JShell REPL and many other Java CLI tools.
SHELL
$ java -jar kafkasql-cli.jar --server http://localhost:8080
_ __ __ _ ____ ___ _
| |/ /__ _ / _| | ____ _ / ___| / _ \| |
| ' // _` | |_| |/ / _` |\___ \| | | | |
| . \ (_| | _| < (_| | ___) | |_| | |___
|_|\_\__,_|_| |_|\_\__,_||____/ \__\_\_____|
Connected to KafkaSQL server v0.1.0
kafkasql> CREATE STREAM pageviews (
> user_id VARCHAR,
> page VARCHAR,
> ts BIGINT
> ) WITH (
> KAFKA_TOPIC='pageviews',
> VALUE_FORMAT='JSON'
> );
Stream PAGEVIEWS created.
kafkasql> SELECT user_id, COUNT(*)
> FROM pageviews
> GROUP BY user_id
> EMIT CHANGES;
+-----------+--------+
| USER_ID | COUNT |
+-----------+--------+
| alice | 3 |
| bob | 7 |
| charlie | 1 |
| alice | 4 | <-- streaming update
...
The CLI supports tab completion for keywords and stream/table names (fetched from the server at startup), persistent command history, and a script mode (--execute "SQL") for automation and CI pipelines.
For push queries, the CLI keeps a long-lived HTTP connection open (using SSE) and formats incoming records as a continuously updating table β clearing and redrawing rows as aggregations change.
7Phase 5 β The Web UI: SQL Editor in the Browser
Not everyone lives in the terminal. The kafkasql-ui module is a React + Vite + TypeScript single-page application that provides a rich graphical interface for the engine.
Key Features
- Monaco SQL Editor β Full syntax highlighting for the KafkaSQL dialect, with autocomplete for stream/table names and column names pulled from the metadata API.
- Streaming Results Table β Push query results appear in a virtualized table that updates in real time via WebSocket. Rows highlight briefly on change.
- Metadata Browser β A sidebar tree showing all registered streams and tables, their columns and types, and the underlying Kafka topic.
- Query Dashboard β A list of all running persistent queries with status, uptime, messages processed, and a kill button.
- Connection Config β Point the UI at any KafkaSQL server instance.
The UI is built as a static bundle and can be served directly from the Spring Boot app (embedded in the JAR under /static) or deployed standalone to any CDN or static host.
We debated between CodeMirror and Monaco for the SQL editor. Monaco won because it provides multi-cursor editing, integrated error squiggles (which we wire to parser errors), and a familiar VS Code feel. The trade-off is bundle size (~2 MB gzipped), but for a developer tool, this is acceptable.
8Phase 6 β Docker, Local Dev, and MSK Compatibility
Local Development with Docker Compose
Getting a local Kafka cluster running shouldn't require a PhD. Our docker-compose.yml spins up everything in one command:
YAML
# docker-compose.yml
services:
kafka:
image: apache/kafka:3.6.0
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /var/lib/kafka/data
CLUSTER_ID: "kafkasql-local-cluster-001"
ports:
- "9092:9092"
kafkasql:
build: .
ports:
- "8080:8080"
environment:
KAFKASQL_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
depends_on:
- kafka
We use Kafka in KRaft mode (no Zookeeper) β a single-node setup that starts in seconds. The KafkaSQL server connects as a standard Kafka client.
Multi-Stage Dockerfile
DOCKER
FROM eclipse-temurin:17-jdk-alpine AS build
WORKDIR /app
COPY . .
RUN ./mvnw -B package -DskipTests
FROM eclipse-temurin:17-jre-alpine
WORKDIR /app
COPY --from=build /app/kafkasql-server/target/kafkasql-server.jar app.jar
EXPOSE 8080
ENTRYPOINT ["java", "-jar", "app.jar"]
Amazon MSK Compatibility
A major design goal was seamless MSK support. Because KafkaSQL uses only the standard Kafka client APIs (no Confluent-specific extensions), connecting to MSK requires only configuration changes β zero code changes:
- IAM Authentication β Add the
aws-msk-iam-authdependency and configuresasl.jaas.configwithIAMLoginModule. - TLS β Set
security.protocol=SASL_SSL. - Profile Switching β A
kafkasql.kafka.providerconfig key toggles betweenvanillaandmskprofiles, auto-configuring auth settings.
YAML
# application-msk.yml
kafkasql:
kafka:
bootstrap-servers: b-1.mycluster.xxx.kafka.us-east-1.amazonaws.com:9098
security-protocol: SASL_SSL
sasl-mechanism: AWS_MSK_IAM
sasl-jaas-config: >
software.amazon.msk.auth.iam.IAMLoginModule required;
All Kafka Streams operations β state stores, changelogs, repartition topics β work identically on MSK and vanilla Kafka. The only difference is authentication. This was a deliberate architectural choice: no MSK-specific code paths exist anywhere in the engine.
9Phase 7 β Testing Kafka Streams Topologies
Testing a streaming SQL engine is uniquely challenging. You can't just assert on a return value β you need to verify that a continuously running topology produces the correct output records over time.
Unit Tests with TopologyTestDriver
Kafka provides kafka-streams-test-utils, which includes TopologyTestDriver β a lightweight, in-memory driver that executes a topology without a real Kafka cluster:
JAVA
@Test
void filterQuery_producesOnlyMatchingRecords() {
String sql = "SELECT user_id, amount FROM orders WHERE amount > 100";
Topology topology = engine.compileToTopology(sql);
try (TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
TestInputTopic<String, String> input = driver.createInputTopic(
"orders", stringSerde, stringSerde
);
TestOutputTopic<String, String> output = driver.createOutputTopic(
"_kafkasql_query_001", stringSerde, stringSerde
);
input.pipeInput("k1", """{"user_id":"alice","amount":150}""");
input.pipeInput("k2", """{"user_id":"bob","amount":50}""");
input.pipeInput("k3", """{"user_id":"charlie","amount":200}""");
List<KeyValue<String, String>> results = output.readKeyValuesToList();
assertThat(results).hasSize(2);
assertThat(results.get(0).value).contains("alice");
assertThat(results.get(1).value).contains("charlie");
}
}
Integration Tests with Testcontainers
For end-to-end tests β verifying that the REST API, parser, planner, and Kafka Streams all work together against a real broker β we use Testcontainers:
JAVA
@Testcontainers
@SpringBootTest(webEnvironment = RANDOM_PORT)
class KafkaSqlIntegrationTest {
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.5.0")
);
@Test
void createStreamAndQuery_endToEnd() {
// POST /ksql -> CREATE STREAM
webClient.post().uri("/ksql")
.bodyValue(new SqlRequest("CREATE STREAM ..."))
.exchange()
.expectStatus().isOk();
// Produce test records to Kafka
produceRecords("orders", testRecords);
// POST /query -> push query
Flux<String> results = webClient.post().uri("/query")
.bodyValue(new SqlRequest("SELECT ... EMIT CHANGES"))
.retrieve()
.bodyToFlux(String.class);
StepVerifier.create(results.take(3))
.expectNextCount(3)
.verifyComplete();
}
}
Kafka Streams' TopologyTestDriver processes records synchronously and instantly β there's no concept of wall-clock time or commit intervals. This makes tests fast but can mask timing-related bugs. We run a smaller set of tests against real Testcontainers Kafka to catch these.
10Lessons Learned and Sharp Edges
Building KafkaSQL surfaced several hard-won lessons:
ANTLR4 Grammar Complexity
SQL grammars are inherently ambiguous. Operator precedence, optional clauses, and context-sensitive keywords (STREAM is both a keyword and a valid identifier in some positions) require careful rule ordering and semantic predicates. Our advice: start with the smallest viable grammar and extend incrementally with a test for every new production rule.
Kafka Streams Lifecycle Management
Each persistent query runs its own KafkaStreams instance. This gives isolation but means managing dozens of independent thread pools, state directories, and state transitions (CREATED β RUNNING β REBALANCING β RUNNING β PENDING_SHUTDOWN β NOT_RUNNING). We implemented a KafkaStreams.StateListener per instance and a central query registry that tracks health and restarts failed topologies.
RocksDB State Store Sizing
Aggregation queries (GROUP BY) materialize state in RocksDB. A high-cardinality group-by key (e.g., user IDs) can grow the state store to gigabytes. We added configurable retention, state store size monitoring via JMX, and documented guidelines for when to use windowed aggregations (which bound state size naturally) vs. unbounded global aggregations.
Expression Evaluation Performance
Our tree-walking expression evaluator handles thousands of records per second, which is more than sufficient for most use cases. But for high-throughput streams with complex WHERE predicates, it becomes a bottleneck. The planned next step is generating bytecode for hot expressions using Janino or ASM.
11What's Next
Kubernetes Operator
Deploy KafkaSQL as a Kubernetes-native operator with CRDs for queries, auto-scaling based on consumer lag, and rolling upgrades. Spring Boot Actuator already provides the /health, /readiness, and /liveness endpoints that K8s expects.
Avro & Schema Registry Support
Beyond JSON, support Avro-encoded topics with automatic schema resolution from a Schema Registry. Column types would be inferred from the Avro schema, eliminating the need for manual CREATE STREAM column definitions.
Expression Bytecode Compilation
Compile hot-path WHERE predicates and SELECT expressions to JVM bytecode at query start time, eliminating per-record tree-walking overhead for high-throughput streams.
Multi-Node Clustering
Run multiple KafkaSQL server instances behind a load balancer, with query assignment and metadata synchronization via the command topic. Kafka Streams already handles partition assignment across instances β we need to coordinate query ownership at the SQL layer.
GraalVM Native Image
Compile the CLI (and potentially the server) to a native binary with GraalVM for instant startup and minimal memory footprint β making KafkaSQL suitable for edge and resource-constrained environments.