-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-6539] New LSM tree style archived timeline #9209
Conversation
e88d7dc
to
8784e5f
Compare
6926ee7
to
a71bc4b
Compare
c281ded
to
a7f8558
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please split the code earlier into a "org.apache.hudi.storage.lsm" package? where we keep the parquet LSM code away from its use in the ArchivedTimeline? I think it ll help with even reviewing code a lot.
/** | ||
* A combination of instants covering action states: requested, inflight, completed. | ||
*/ | ||
public class ActiveInstant implements Serializable, Comparable<ActiveInstant> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to "ActiveAction" since its really the instants that make up the action to completed state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just a triple of an instant with 3 different states, maybe we can come out with a better name for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets call this class. ActiveAction which is a triplet of instants . That's how we call this today
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for ActiveAction
.
} | ||
|
||
/** | ||
* A COMPACTION action eventually becomes COMMIT when completed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something to think about is - whether we keep it "COMPACTION" with the new changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I'm +1 for keeping the action of compaction
and log_compaction
just as it is, this avoids many ambiguilties, but I kind of think it should be a separate topic, we need a discuss whether to reuse the action for all kinds of table services: compaction, log_compaction, clustering, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 we can decouple this.
* | ||
* <p><h2>The LSM Tree Compaction</h2> | ||
* Use the universal compaction strategy, that is: when N(by default 10) number of parquet files exist in the current layer, they are merged and flush as a large file in the next layer. | ||
* We have no limit for the layer number, assumes there are 10 instants for each file in L0, there could be 100 instants per-file in L1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be good to think about a bound here and control how LSM merge is going to be based on that.
I suggest . Not having more than 1GB files, to ensure merge process can run on lower end VMs/machines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, size based compaction makes sense to me, we can always pick the oldest files from a layer but also control the gross file size of source files to be under 1GB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
size makes sense, do we need the size info in the manifest as well then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we do some schemaful file to store manifests? json/parquet/avro? Can we get all information that we need to plan LSM compaction/cleaning into the manifest, so that we are not listing anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finally, we encode the manifet content as a JSON string, which is more friendly to extend in the future if there are new requests. Also, the file size are encoded along with the file name.
* | ||
* <p>In order to make snapshot isolation of the archived timeline write/read, we add two kinds of metadata files for the LSM tree version management: | ||
* <ol> | ||
* <li>Manifest file: Each new file in layer 0 or each compaction would generate a new manifest file, the manifest file records the valid file handles of the latest snapshot;</li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i.e list of all files in the entire LSM. correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, to be more accurate, it keeps the list of all the files in the latest snapshot. The LSM tree itself has multiple versions.
* <p>In order to make snapshot isolation of the archived timeline write/read, we add two kinds of metadata files for the LSM tree version management: | ||
* <ol> | ||
* <li>Manifest file: Each new file in layer 0 or each compaction would generate a new manifest file, the manifest file records the valid file handles of the latest snapshot;</li> | ||
* <li>Version file: A version file is generated right after a complete manifest file is formed.</li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this contain? Pointer to the latest manifest file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It behaves like a MARKER file for the manifest file, a version file handle indicates the version of snapshot is now complete for reader view, the reader list all the valid version files to fetch the valid versions of the current timeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To understand this better,
for distributed file system like HDFS: this helps reader from reading any partially written MANIFEST file?
for cloud storage: PUTs are atomic, so reader will not see any partially written MANIFEST file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but is there any possibility the reader see an empty MANIFEST file? Anyway, we now always write a temp file first then rename to the final file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update:
- we now only have 1 version hint file, whose content is the latest version number.
- for HDFS, we will do a rename from a temp file to the final file, for both the manifest and version file.
- for s3, do a direct write because object storage itself has atomicity for write operation.
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
Outdated
Show resolved
Hide resolved
* </ul> | ||
* | ||
* <p><h2>The Legacy Files Cleaning and Read Retention</h2> | ||
* Only triggers file cleaning after a valid compaction. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can use OCC here for concurrency control between LSM merge and writer? Even just taking a lock and letting write or lsm merge fail if there was sth concurrent would be ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now for multi-writer, we alredy have a explicit lock guard for the archiver. The lsm merge now is an inline action right after the write, we might need to support async compaction in the future ? I have no idea yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah for now we assume this is within a lock and done inline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File a JIRA to track?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Keeps only 3 valid snapshot versions for the reader, that means, a file is kept for at lest 3 archival trigger interval, for default configuration, it is 30 instants time span, | ||
* which is far longer that the archived timeline loading time. | ||
* | ||
* <p><h3>Instants TTL</h3></p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer lazily loading it vs ignoring it completely. i.e the LSM performs well if reading is within 1 week in the past, but should be correct always.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 too, by default we can eagerly load 3 ~ 7 days of instants into the memory for fast look up of completion time, if there is any instant out of this range, do a lazy loading for it, the performance should be not too bad because we have a data skipping on files.
* <p><h2>The Legacy Files Cleaning and Read Retention</h2> | ||
* Only triggers file cleaning after a valid compaction. | ||
* | ||
* <p><h3>Clean Strategy</h3></p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not following. are you talking about cleaning of the Hudi data table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the cleaning of the LSM tree legacy files itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets include the info we need for e.g timestamp of when a version was created? to help retain x hours of versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either is okay, the version number based cleaning works better when the timeline is committed more frequently, because we do not need timetravel queries on the timeline, the cleaning can be more radical.
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
Show resolved
Hide resolved
Moved the LSM write path codes into |
4ade37c
to
57c1b84
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very promising and like the overall direction. Left tons of naming and layering comments and pointed out some gaps.
Could you please resolve the naming, code structure ones and comment on how we can make this more full fledged especially the compaction implementation.
Please resolve the comments that are addressed or we have aligned on, So we can track the pending items along easily.
* @param path the file to be deleted. | ||
* @return IOException caught, if any. Null otherwise. | ||
*/ | ||
private static IOException tryDelete(FileSystem fs, Path path) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use Option
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
* Avro schema for different archived instant read cases. | ||
*/ | ||
public abstract class ArchivedInstantReadSchemas { | ||
public static final Schema SLIM_SCHEMA = new Schema.Parser().parse("{\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename: TIMELINE_LSM_SLIM_READ_SCHEMA
import java.util | ||
import scala.collection.JavaConverters._ | ||
|
||
object ArchivedTimelineReadBenchmark extends HoodieBenchmarkBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a jmh benchmark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
private final int maxInstantsToKeep; | ||
private final int minInstantsToKeep; | ||
private final HoodieTable<T, I, K, O> table; | ||
private final HoodieTableMetaClient metaClient; | ||
private final TransactionManager txnManager; | ||
|
||
private final ArchivedTimelineWriter timelineWriter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
archivedTimelineWriter?
Rename?
} else { | ||
LOG.info("No Instants to archive"); | ||
} | ||
|
||
if (shouldMergeSmallArchiveFiles()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you confirm we are removing this functionality fully within this pull request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kind of, it is replaced with the new compaction, but we still keep one config option, that is the number of bacth files for each compaction input source.
} | ||
} | ||
|
||
private Map<String, Boolean> deleteFilesParallelize( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to FSUtils?
* <p> | ||
* </p> | ||
* This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized. | ||
* Represents the Archived Timeline for the Hoodie table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please this class free from LSM design. Single Responsibility Principle
* We have no limit for the layer number, assumes there are 10 instants for each file in L0, there could be 100 instants per file in L1, | ||
* so 3000 instants could be represented as 3 parquets in L2, it is pretty fast if we use concurrent read. | ||
* | ||
* <p>The benchmark shows 1000 instants read cost about 10 ms. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move all this out to LSMTimeline class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private List<String> getCandidateFiles(List<HoodieArchivedManifest.FileEntry> files, int filesBatch) throws IOException { | ||
List<String> candidates = new ArrayList<>(); | ||
long totalFileLen = 0L; | ||
long maxFileSizeInBytes = 1024 * 1024 * 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pull into constant
public void compactAndClean(HoodieEngineContext context) throws IOException { | ||
// 1. List all the latest snapshot files | ||
HoodieArchivedManifest latestManifest = HoodieArchivedTimeline.latestSnapshotManifest(metaClient); | ||
int layer = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we don't compact beyond layer 0 now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have no limit for layers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still few places to cleanup; but LGTM overall. Please revert the one change that seems unrelated (or lmk if its related).
}, | ||
{ | ||
"name":"plan", | ||
"type":["null", "bytes"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking of the scenario where we want users to write SQL to query the timeline. if we do bytes, we need to probably provide udfs for converting from bytes to a plan? Follow up JIRA? (I think this is still better than nested schema, which can be expensive to write.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, bytes is more effocient. Fired a follow up JIRA: https://issues.apache.org/jira/browse/HUDI-6747
private Option<String> getMetadataKey(String action) { | ||
switch (action) { | ||
case HoodieTimeline.CLEAN_ACTION: | ||
return Option.of("hoodieCleanMetadata"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any existing constants we can use for this? else ignore.
import java.util.stream.Collectors; | ||
|
||
/** | ||
* An archived timeline writer which organizes the files as an LSM tree. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove "archived"
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.hudi.client.utils; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make a org.apache.hudi.client.timeline package and move all these classes there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
LOG.info("Writing schema " + wrapperSchema.toString()); | ||
for (ActiveAction activeAction : activeActions) { | ||
try { | ||
if (preWriteCallback != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we try to use Option instead of null
as sentinels?
* Returns a new file name. | ||
*/ | ||
private static String newFileName(String minInstant, String maxInstant, int layer) { | ||
return minInstant + "_" + maxInstant + "_" + layer + HoodieFileFormat.PARQUET.getFileExtension(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String.format?
if (!completedCommitBeforeOldestPendingInstant.isPresent() | ||
|| HoodieTimeline.compareTimestamps(oldestPendingInstant.get().getTimestamp(), | ||
LESSER_THAN, completedCommitBeforeOldestPendingInstant.get().getTimestamp())) { | ||
if (!completedCommitBeforeOldestPendingInstant.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this change? this may break sth. please revert if unnecessary for this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the check
HoodieTimeline.compareTimestamps(oldestPendingInstant.get().getTimestamp(),
LESSER_THAN, completedCommitBeforeOldestPendingInstant.get().getTimestamp())
is always false. I can revert the change.
"type":"record", | ||
"name":"HoodieLSMTimelineInstant", | ||
"namespace":"org.apache.hudi.avro.model", | ||
"fields":[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets add a version field to each record, so we can evolve as we go if needed
/** | ||
* Parse the maximum instant time from the file name. | ||
*/ | ||
public static String getMaxInstantTime(String fileName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do these methods have UTs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests added.
/** | ||
* Parse the minimum instant time from the file name. | ||
*/ | ||
public static String getMinInstantTime(String fileName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of individual parsing methods, can introduce a POJO here, with getters. i.e
LSMFile class with min, max, level as fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did see much gains here, we can extend it in the near future if we have more complex name parsing on the file names.
613a47b
to
1739fa3
Compare
* Maintain only one version pointer file, add file size limination to compaction strategy * write the manifest as JSON, move the timeline write path to separate class for convenient review
* replace the log based archived timeline with new parquet based timeline * the timeline is organized as an LSM tree, it has multiple versions for read/write snapshot isolation * maintain only one version pointer file, add file size limination to compaction strategy * write the manifest as JSON
Hello, does the master branch now support lsm format merge? @danny0405 |
No, only the archived timeline uses LSM layout for instants access. |
throw new HoodieException(e); | ||
} | ||
}; | ||
this.timelineWriter.write(instantsToArchive, Option.of(action -> deleteAnyLeftOverMarkers(context, action)), Option.of(exceptionHandler)); | ||
LOG.info("Deleting archived instants " + instantsToArchive); | ||
success = deleteArchivedInstants(instantsToArchive, context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we need to consider how to handle deletion failure exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should, but currently the redundants left by failed deletion do not affect the correctness, the completion time would be still loaded correctly if an instant locates at both avtive and archived timelines.
We need to think through the design though, it is arduous to maintain the whole multiple handlings as atomic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No more doubts.
Change Logs
A new LSM style archived timeline.
Impact
none
Risk level (write none, low medium or high below)
none
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist