Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1535,6 +1535,13 @@ public class ConfigOptions {
.withDescription(
"If true, compaction will be triggered automatically when tiering service writes to the datalake. It is disabled by default.");

public static final ConfigOption<Integer> TABLE_DATALAKE_AUTO_COMPACTION_MIN_INPUT_FILES =
key("table.datalake.auto-compaction.min-input-files")
.intType()
.defaultValue(3)
.withDescription(
"The minimum number of small files in a bucket required to trigger automatic compaction.");

public static final ConfigOption<Boolean> TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT =
key("table.datalake.auto-expire-snapshot")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ public boolean isDataLakeAutoCompaction() {
return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION);
}

/**
* Gets the minimum number of small files per bucket required to trigger automatic compaction.
*/
public int getDataLakeAutoCompactionMinInputFiles() {
return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION_MIN_INPUT_FILES);
}

/** Whether auto expire snapshot is enabled. */
public boolean isDataLakeAutoExpireSnapshot() {
return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toFilterExpression;
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static org.apache.fluss.utils.Preconditions.checkArgument;
import static org.apache.fluss.utils.Preconditions.checkState;

/**
Expand All @@ -62,14 +63,12 @@ public class IcebergRewriteDataFiles {

private static final Logger LOG = LoggerFactory.getLogger(IcebergRewriteDataFiles.class);

// TODO: make compaction strategy configurable
private static final int MIN_FILES_TO_COMPACT = 3;

private final Table table;
private final String partition;
private final TableBucket bucket;
private final Expression filter;
private long targetSizeInBytes = 128 * 1024 * 1024; // 128MB default
private int minInputFiles;

public IcebergRewriteDataFiles(Table table, @Nullable String partition, TableBucket bucket) {
this.table = table;
Expand All @@ -83,6 +82,12 @@ public IcebergRewriteDataFiles targetSizeInBytes(long targetSize) {
return this;
}

public IcebergRewriteDataFiles minInputFiles(int minInputFiles) {
checkArgument(minInputFiles >= 1, "minInputFiles must be >= 1, but was %s", minInputFiles);
this.minInputFiles = minInputFiles;
return this;
}

private List<CombinedScanTask> planRewriteFileGroups(long snapshotId) throws IOException {
List<FileScanTask> fileScanTasks = new ArrayList<>();
try (CloseableIterable<FileScanTask> tasks =
Expand All @@ -95,11 +100,11 @@ private List<CombinedScanTask> planRewriteFileGroups(long snapshotId) throws IOE
tasks.forEach(fileScanTasks::add);
}

// the files < targetSizeInBytes is less than MIN_FILES_TO_COMPACT, don't compact
// if the number of files < targetSizeInBytes is less than minInputFiles, don't compact
if (fileScanTasks.stream()
.filter(fileScanTask -> fileScanTask.length() < targetSizeInBytes)
.count()
< MIN_FILES_TO_COMPACT) {
< minInputFiles) {
// return empty file group
return Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,12 @@ private void scheduleCompaction(WriterInitContext writerInitContext) {
table,
writerInitContext.partition(),
writerInitContext.tableBucket())
.targetSizeInBytes(compactionTargetSize(table));
.targetSizeInBytes(compactionTargetSize(table))
.minInputFiles(
writerInitContext
.tableInfo()
.getTableConfig()
.getDataLakeAutoCompactionMinInputFiles());
return rewriter.execute();
} catch (Exception e) {
LOG.info("Fail to compact iceberg data files.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
import static org.apache.fluss.utils.Preconditions.checkState;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Unit test to verify compaction via {@link IcebergRewriteDataFiles}. */
class IcebergRewriteTest {
Expand Down Expand Up @@ -159,6 +160,44 @@ void testMultipleBucketRewrite() throws Exception {
assertThat(filesAfterBucket1).isEqualTo(1);
}

@Test
void testMinInputFilesIsConfigurable() throws Exception {
TablePath tablePath = TablePath.of("iceberg", "min_input_files_table");
createTable(tablePath);
Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath));

// Seed only 2 tiny files — below the default threshold of 3.
appendTinyFilesWithRowsAndBucket(icebergTable, 2, 3, 6000, 0);
icebergTable.refresh();
assertThat(countDataFiles(icebergTable)).isEqualTo(2);

// Default minInputFiles (3) should skip rewrite.
assertThat(createIcebergRewriteDataFiles(icebergTable, 0).execute()).isNull();

// Lowering minInputFiles to 2 should now trigger a rewrite.
RewriteDataFileResult result =
createIcebergRewriteDataFiles(icebergTable, 0).minInputFiles(2).execute();
assertThat(result).isNotNull();
assertThat(result.deletedDataFiles()).hasSize(2);
assertThat(result.addedDataFiles()).hasSize(1);

commitRewrite(icebergTable, result);
icebergTable.refresh();
assertThat(countDataFiles(icebergTable)).isEqualTo(1);
}

@Test
void testMinInputFilesRejectsInvalidValue() {
TablePath tablePath = TablePath.of("iceberg", "min_input_files_validation");
createTable(tablePath);
Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath));
IcebergRewriteDataFiles rewriter = createIcebergRewriteDataFiles(icebergTable, 0);

assertThatThrownBy(() -> rewriter.minInputFiles(0))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("minInputFiles must be >= 1");
}

private IcebergRewriteDataFiles createIcebergRewriteDataFiles(Table table, int bucket) {
table.refresh();
return new IcebergRewriteDataFiles(table, null, new TableBucket(0, bucket));
Expand Down
Loading