[flink] Fix batch query on empty datalake-enabled table to return 0 rows instead of failing#3208
Conversation
|
Thanks for the review! |
|
@fresh-borzoni @luoyuxia PTAL |
luoyuxia
left a comment
There was a problem hiding this comment.
@matrixsparse Thanks for the pr. Left one comment. PTAL
| if (splits == null) { | ||
| throw new UnsupportedOperationException( | ||
| "Currently, Batch mode can only be supported if one lake snapshot exists for the table."); | ||
| return Collections.emptyList(); |
There was a problem hiding this comment.
Why return empty list? Empty list will cause flink return empty result although actaully there are still records in fluss.
One way is that we can still generate only fluss split although lake snapshot doesn't exist.
Just like spark does in FlussLakeAppendBatch#doPlan
There was a problem hiding this comment.
Thanks for the feedback. This has been updated: rather than returning an empty list, the logic now falls back to Fluss-only split generation, consistent with Spark's planFallbackPartitions(). Partition validation is also added for partitioned tables. @luoyuxia PTAL, thanks!
f04fd1a to
7339c52
Compare
fresh-borzoni
left a comment
There was a problem hiding this comment.
@matrixsparse Ty for the PR, LGTM 👍
| () -> { | ||
| List<SourceSplitBase> splits = generateHybridLakeFlussSplits(); | ||
| // No lake snapshot exists, fall back to Fluss-only splits | ||
| if (splits == null) { |
There was a problem hiding this comment.
nit: maybe it's a good idea to add some info log for easier tracing, at least stream-mode has it when it goes through the lake path:
smth like:
LOG.info("No lake snapshot found for table {}, falling back to Fluss-only splits.", tablePath);
There was a problem hiding this comment.
Good suggestion! Added the info log for tracing. Thanks @fresh-borzoni!
Summary
throws UnsupportedOperationException. This PR makes it return empty result
(0 rows) instead, consistent with Spark connector behavior.
Root Cause
In
FlinkSourceEnumerator.startInBatchMode(),generateHybridLakeFlussSplits()returns null when no lake snapshot exists. The stream mode code in the same
commit (7937996) correctly falls back to
initNonPartitionedSplits(), but thebatch mode path was left to throw an exception.
Change
FlinkSourceEnumerator.java: ReturnCollections.emptyList()instead ofthrowing
UnsupportedOperationExceptionwhen splits is null.Verification
mvnw test -pl fluss-flink/fluss-flink-common -Dtest="FlinkSourceEnumerator*"Fixes #3207