[spark] Fix empty projection causing Invalid metadata length for COUNT(*)/COUNT(1)#3227
[spark] Fix empty projection causing Invalid metadata length for COUNT(*)/COUNT(1)#3227Kaixuan-Duan wants to merge 1 commit intoapache:mainfrom
Conversation
| SchemaGetter schemaGetter, | ||
| ArrowCompressionInfo compressionInfo, | ||
| int[] selectedFieldPositions) { | ||
| // Empty projection (selectedFieldPositions.length == 0) is currently not supported on the |
There was a problem hiding this comment.
It would be good to also verify this behavior and the fix in the Flink connector if needed.
There was a problem hiding this comment.
Looked into the Flink connector: it doesn't implement SupportsAggregatePushDown, and Flink's planner doesn't push an empty projection for COUNT(*) either, so this bug is not reproducible there today. The new server-side guard is connector-agnostic, so it still protects Flink/Kafka/direct API users if any caller ever pushes an empty projection in the future.
|
@Kaixuan-Duan Hi, seems is it same with #2725. cc @beryllw |
| } | ||
| } | ||
|
|
||
| test("Spark Read: COUNT(*) without filter") { |
There was a problem hiding this comment.
What happens when there is a filter?
There was a problem hiding this comment.
COUNT(*) with filter will not traverse an empty projection path. The original behavior is correct, and this PR does not affect it.
|
@luoyuxia I’m sorry. It was my fault. I didn't notice that there was already a PR pointing to issue#2724 |
Purpose
Linked issue: close #2724
When Spark pushes down an empty column projection for
COUNT(*)/COUNT(1)queries, the Fluss server fails withIllegalStateException("Invalid metadata length")inFileLogProjection.project(), causing the client to retry indefinitely and the query to hang.This PR fixes the issue from two sides:
Server side: reject empty projection early with a clear
InvalidColumnProjectionExceptioninstead of crashing with an internal error.Spark connector side: fall back to projecting the first column when Spark pushes down an empty projection, so the row count is preserved without fetching unnecessary data.
Brief change log
FileLogProjection#setCurrentProjection: add a guard that throws InvalidColumnProjectionException when selectedFieldPositions is empty.FileLogProjectionTest: add testEmptyProjectionRejectsWithClearError to verify the server-side guard.FlussBatch#projection/FlussMicroBatchStream#projection: when readSchema yields an empty projection, fall back to Array(0) (first column).SparkLogTableReadTest: addCOUNT(*)andCOUNT(1)end-to-end tests for log tables.SparkPrimaryKeyTableReadTest: addCOUNT(*)end-to-end test for primary key tables.Tests
./mvnw -pl fluss-common -DskipTests=false -Dtest=FileLogProjectionTest#testEmptyProjectionRejectsWithClearError test
./mvnw -pl fluss-spark/fluss-spark-ut -am install -DskipTests
./mvnw -pl fluss-spark/fluss-spark-ut -Dsuites='org.apache.fluss.spark.SparkLogTableReadTest' test
./mvnw -pl fluss-spark/fluss-spark-ut -Dsuites='org.apache.fluss.spark.SparkPrimaryKeyTableReadTest' test
API and Format
Documentation