Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6539] New LSM tree style archived timeline #9209

Merged
merged 1 commit into from
Aug 29, 2023

Conversation

danny0405
Copy link
Contributor

Change Logs

A new LSM style archived timeline.

Impact

none

Risk level (write none, low medium or high below)

none

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@vinothchandar vinothchandar self-assigned this Jul 18, 2023
@danny0405 danny0405 force-pushed the HUDI-6539 branch 3 times, most recently from e88d7dc to 8784e5f Compare July 20, 2023 10:20
@danny0405 danny0405 force-pushed the HUDI-6539 branch 4 times, most recently from 6926ee7 to a71bc4b Compare July 21, 2023 09:26
@danny0405 danny0405 changed the title [WIP][HUDI-6539] New LSM tree style archived timeline [HUDI-6539] New LSM tree style archived timeline Jul 24, 2023
@danny0405 danny0405 force-pushed the HUDI-6539 branch 5 times, most recently from c281ded to a7f8558 Compare July 25, 2023 07:11
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please split the code earlier into a "org.apache.hudi.storage.lsm" package? where we keep the parquet LSM code away from its use in the ArchivedTimeline? I think it ll help with even reviewing code a lot.

/**
* A combination of instants covering action states: requested, inflight, completed.
*/
public class ActiveInstant implements Serializable, Comparable<ActiveInstant> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to "ActiveAction" since its really the instants that make up the action to completed state?

Copy link
Contributor Author

@danny0405 danny0405 Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a triple of an instant with 3 different states, maybe we can come out with a better name for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets call this class. ActiveAction which is a triplet of instants . That's how we call this today

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for ActiveAction.

}

/**
* A COMPACTION action eventually becomes COMMIT when completed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something to think about is - whether we keep it "COMPACTION" with the new changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I'm +1 for keeping the action of compaction and log_compaction just as it is, this avoids many ambiguilties, but I kind of think it should be a separate topic, we need a discuss whether to reuse the action for all kinds of table services: compaction, log_compaction, clustering, etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 we can decouple this.

*
* <p><h2>The LSM Tree Compaction</h2>
* Use the universal compaction strategy, that is: when N(by default 10) number of parquet files exist in the current layer, they are merged and flush as a large file in the next layer.
* We have no limit for the layer number, assumes there are 10 instants for each file in L0, there could be 100 instants per-file in L1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be good to think about a bound here and control how LSM merge is going to be based on that.
I suggest . Not having more than 1GB files, to ensure merge process can run on lower end VMs/machines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, size based compaction makes sense to me, we can always pick the oldest files from a layer but also control the gross file size of source files to be under 1GB.

Copy link
Member

@vinothchandar vinothchandar Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

size makes sense, do we need the size info in the manifest as well then?

Copy link
Member

@vinothchandar vinothchandar Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do some schemaful file to store manifests? json/parquet/avro? Can we get all information that we need to plan LSM compaction/cleaning into the manifest, so that we are not listing anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally, we encode the manifet content as a JSON string, which is more friendly to extend in the future if there are new requests. Also, the file size are encoded along with the file name.

*
* <p>In order to make snapshot isolation of the archived timeline write/read, we add two kinds of metadata files for the LSM tree version management:
* <ol>
* <li>Manifest file: Each new file in layer 0 or each compaction would generate a new manifest file, the manifest file records the valid file handles of the latest snapshot;</li>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e list of all files in the entire LSM. correct?

Copy link
Contributor Author

@danny0405 danny0405 Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, to be more accurate, it keeps the list of all the files in the latest snapshot. The LSM tree itself has multiple versions.

* <p>In order to make snapshot isolation of the archived timeline write/read, we add two kinds of metadata files for the LSM tree version management:
* <ol>
* <li>Manifest file: Each new file in layer 0 or each compaction would generate a new manifest file, the manifest file records the valid file handles of the latest snapshot;</li>
* <li>Version file: A version file is generated right after a complete manifest file is formed.</li>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this contain? Pointer to the latest manifest file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It behaves like a MARKER file for the manifest file, a version file handle indicates the version of snapshot is now complete for reader view, the reader list all the valid version files to fetch the valid versions of the current timeline.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To understand this better,

for distributed file system like HDFS: this helps reader from reading any partially written MANIFEST file?
for cloud storage: PUTs are atomic, so reader will not see any partially written MANIFEST file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but is there any possibility the reader see an empty MANIFEST file? Anyway, we now always write a temp file first then rename to the final file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update:

  1. we now only have 1 version hint file, whose content is the latest version number.
  2. for HDFS, we will do a rename from a temp file to the final file, for both the manifest and version file.
  3. for s3, do a direct write because object storage itself has atomicity for write operation.

* </ul>
*
* <p><h2>The Legacy Files Cleaning and Read Retention</h2>
* Only triggers file cleaning after a valid compaction.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use OCC here for concurrency control between LSM merge and writer? Even just taking a lock and letting write or lsm merge fail if there was sth concurrent would be ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now for multi-writer, we alredy have a explicit lock guard for the archiver. The lsm merge now is an inline action right after the write, we might need to support async compaction in the future ? I have no idea yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah for now we assume this is within a lock and done inline.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File a JIRA to track?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* Keeps only 3 valid snapshot versions for the reader, that means, a file is kept for at lest 3 archival trigger interval, for default configuration, it is 30 instants time span,
* which is far longer that the archived timeline loading time.
*
* <p><h3>Instants TTL</h3></p>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer lazily loading it vs ignoring it completely. i.e the LSM performs well if reading is within 1 week in the past, but should be correct always.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 too, by default we can eagerly load 3 ~ 7 days of instants into the memory for fast look up of completion time, if there is any instant out of this range, do a lazy loading for it, the performance should be not too bad because we have a data skipping on files.

* <p><h2>The Legacy Files Cleaning and Read Retention</h2>
* Only triggers file cleaning after a valid compaction.
*
* <p><h3>Clean Strategy</h3></p>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not following. are you talking about cleaning of the Hudi data table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the cleaning of the LSM tree legacy files itself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets include the info we need for e.g timestamp of when a version was created? to help retain x hours of versions.

Copy link
Contributor Author

@danny0405 danny0405 Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either is okay, the version number based cleaning works better when the timeline is committed more frequently, because we do not need timetravel queries on the timeline, the cleaning can be more radical.

@danny0405
Copy link
Contributor Author

can you please split the code earlier into a "org.apache.hudi.storage.lsm" package? where we keep the parquet LSM code away from its use in the ArchivedTimeline? I think it ll help with even reviewing code a lot.

Moved the LSM write path codes into ArchivedTimelineWriter.

@danny0405 danny0405 force-pushed the HUDI-6539 branch 3 times, most recently from 4ade37c to 57c1b84 Compare August 3, 2023 05:16
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks very promising and like the overall direction. Left tons of naming and layering comments and pointed out some gaps.

Could you please resolve the naming, code structure ones and comment on how we can make this more full fledged especially the compaction implementation.
Please resolve the comments that are addressed or we have aligned on, So we can track the pending items along easily.

* @param path the file to be deleted.
* @return IOException caught, if any. Null otherwise.
*/
private static IOException tryDelete(FileSystem fs, Path path) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use Option

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

* Avro schema for different archived instant read cases.
*/
public abstract class ArchivedInstantReadSchemas {
public static final Schema SLIM_SCHEMA = new Schema.Parser().parse("{\n"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename: TIMELINE_LSM_SLIM_READ_SCHEMA

import java.util
import scala.collection.JavaConverters._

object ArchivedTimelineReadBenchmark extends HoodieBenchmarkBase {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a jmh benchmark?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

private final int maxInstantsToKeep;
private final int minInstantsToKeep;
private final HoodieTable<T, I, K, O> table;
private final HoodieTableMetaClient metaClient;
private final TransactionManager txnManager;

private final ArchivedTimelineWriter timelineWriter;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

archivedTimelineWriter?
Rename?

} else {
LOG.info("No Instants to archive");
}

if (shouldMergeSmallArchiveFiles()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you confirm we are removing this functionality fully within this pull request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of, it is replaced with the new compaction, but we still keep one config option, that is the number of bacth files for each compaction input source.

}
}

private Map<String, Boolean> deleteFilesParallelize(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to FSUtils?

* <p>
* </p>
* This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized.
* Represents the Archived Timeline for the Hoodie table.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please this class free from LSM design. Single Responsibility Principle

* We have no limit for the layer number, assumes there are 10 instants for each file in L0, there could be 100 instants per file in L1,
* so 3000 instants could be represented as 3 parquets in L2, it is pretty fast if we use concurrent read.
*
* <p>The benchmark shows 1000 instants read cost about 10 ms.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move all this out to LSMTimeline class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private List<String> getCandidateFiles(List<HoodieArchivedManifest.FileEntry> files, int filesBatch) throws IOException {
List<String> candidates = new ArrayList<>();
long totalFileLen = 0L;
long maxFileSizeInBytes = 1024 * 1024 * 1000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pull into constant

public void compactAndClean(HoodieEngineContext context) throws IOException {
// 1. List all the latest snapshot files
HoodieArchivedManifest latestManifest = HoodieArchivedTimeline.latestSnapshotManifest(metaClient);
int layer = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we don't compact beyond layer 0 now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have no limit for layers.

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still few places to cleanup; but LGTM overall. Please revert the one change that seems unrelated (or lmk if its related).

},
{
"name":"plan",
"type":["null", "bytes"],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking of the scenario where we want users to write SQL to query the timeline. if we do bytes, we need to probably provide udfs for converting from bytes to a plan? Follow up JIRA? (I think this is still better than nested schema, which can be expensive to write.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, bytes is more effocient. Fired a follow up JIRA: https://issues.apache.org/jira/browse/HUDI-6747

private Option<String> getMetadataKey(String action) {
switch (action) {
case HoodieTimeline.CLEAN_ACTION:
return Option.of("hoodieCleanMetadata");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any existing constants we can use for this? else ignore.

import java.util.stream.Collectors;

/**
* An archived timeline writer which organizes the files as an LSM tree.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove "archived"

* limitations under the License.
*/

package org.apache.hudi.client.utils;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make a org.apache.hudi.client.timeline package and move all these classes there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

LOG.info("Writing schema " + wrapperSchema.toString());
for (ActiveAction activeAction : activeActions) {
try {
if (preWriteCallback != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we try to use Option instead of null as sentinels?

* Returns a new file name.
*/
private static String newFileName(String minInstant, String maxInstant, int layer) {
return minInstant + "_" + maxInstant + "_" + layer + HoodieFileFormat.PARQUET.getFileExtension();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String.format?

if (!completedCommitBeforeOldestPendingInstant.isPresent()
|| HoodieTimeline.compareTimestamps(oldestPendingInstant.get().getTimestamp(),
LESSER_THAN, completedCommitBeforeOldestPendingInstant.get().getTimestamp())) {
if (!completedCommitBeforeOldestPendingInstant.isPresent()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this change? this may break sth. please revert if unnecessary for this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the check

HoodieTimeline.compareTimestamps(oldestPendingInstant.get().getTimestamp(),
          LESSER_THAN, completedCommitBeforeOldestPendingInstant.get().getTimestamp())

is always false. I can revert the change.

"type":"record",
"name":"HoodieLSMTimelineInstant",
"namespace":"org.apache.hudi.avro.model",
"fields":[
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add a version field to each record, so we can evolve as we go if needed

/**
* Parse the maximum instant time from the file name.
*/
public static String getMaxInstantTime(String fileName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do these methods have UTs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests added.

/**
* Parse the minimum instant time from the file name.
*/
public static String getMinInstantTime(String fileName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of individual parsing methods, can introduce a POJO here, with getters. i.e

LSMFile class with min, max, level as fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did see much gains here, we can extend it in the near future if we have more complex name parsing on the file names.

@danny0405 danny0405 force-pushed the HUDI-6539 branch 4 times, most recently from 613a47b to 1739fa3 Compare August 25, 2023 09:04
* Maintain only one version pointer file, add file size limination to compaction strategy
* write the manifest as JSON, move the timeline write path to separate class for convenient review
@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405 danny0405 merged commit d924f18 into apache:master Aug 29, 2023
27 checks passed
leosanqing pushed a commit to leosanqing/hudi that referenced this pull request Sep 13, 2023
* replace the log based archived timeline with new parquet based timeline
* the timeline is organized as an LSM tree, it has multiple versions for read/write snapshot isolation
* maintain only one version pointer file, add file size limination to compaction strategy
* write the manifest as JSON
@waywtdcc
Copy link
Contributor

Hello, does the master branch now support lsm format merge? @danny0405

@danny0405
Copy link
Contributor Author

Hello, does the master branch now support lsm format merge? @danny0405

No, only the archived timeline uses LSM layout for instants access.

throw new HoodieException(e);
}
};
this.timelineWriter.write(instantsToArchive, Option.of(action -> deleteAnyLeftOverMarkers(context, action)), Option.of(exceptionHandler));
LOG.info("Deleting archived instants " + instantsToArchive);
success = deleteArchivedInstants(instantsToArchive, context);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we need to consider how to handle deletion failure exception?

Copy link
Contributor Author

@danny0405 danny0405 Nov 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should, but currently the redundants left by failed deletion do not affect the correctness, the completion time would be still loaded correctly if an instant locates at both avtive and archived timelines.

We need to think through the design though, it is arduous to maintain the whole multiple handlings as atomic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No more doubts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: ✅ Done
Development

Successfully merging this pull request may close these issues.

5 participants