Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] S3 Deltastreamer: Block has already been inflated #6428

Open
dyang108 opened this issue Aug 17, 2022 · 25 comments
Open

[BUG] S3 Deltastreamer: Block has already been inflated #6428

dyang108 opened this issue Aug 17, 2022 · 25 comments
Assignees
Labels
aws-support hudistreamer issues related to Hudi streamer (Formely deltastreamer) metadata metadata table priority:major degraded perf; unable to move forward; potential bugs

Comments

@dyang108
Copy link

dyang108 commented Aug 17, 2022

Describe the problem you faced

Deltastreamer with write output to S3 exits unexpectedly when running in continuous mode.

To Reproduce

Steps to reproduce the behavior:
I ran the following:

/etc/spark/bin/spark-submit --conf -Dconfig.file=/service.conf,spark.executor.extraJavaOptions=-Dlog4j.debug=true --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --jars /etc/spark/work-dir/* /etc/spark/work-dir/hudi-utilities-bundle_2.11-0.12.0.jar --props /mnt/mesos/sandbox/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider --source-class org.apache.hudi.utilities.sources.AvroKafkaSource --target-base-path "s3a://strava.scratch/tmp/derick/hudi" --target-table "aligned_activities" --op "UPSERT" --source-ordering-field "ts" --table-type "COPY_ON_WRITE" --source-limit 100 --continuous

the /etc/spark/work-dir/ looks like this:
aws-java-sdk-bundle-1.12.283.jar hadoop-aws-2.6.5.jar hudi-utilities-bundle_2.11-0.12.0.jar scala-library-2.11.12.jar spark-streaming-kafka-0-10_2.11-2.4.8.jar

Expected behavior

I don't expect there to be issues on compaction here.

Environment Description

  • Hudi version : 0.12.0 (also tried 0.11.1)

  • Spark version : 2.4.8

  • Hive version :

  • Hadoop version : 2.6.5

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : Yes, docker on Mesos

I'm reading from an Avro kafka topic

Additional context

Add any other context about the problem here.

Reading Avro record from Kafka

hoodie.datasource.write.recordkey.field=activityId
auto.offset.reset=latest

Stacktrace

22/08/17 23:07:26 ERROR HoodieAsyncService: Service shutdown with error
java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20220817230714888
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:190)
    at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:187)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:557)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20220817230714888
    at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
    at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:45)
    at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:113)
    at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:97)
    at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:155)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:588)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:335)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:687)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 7, localhost, executor driver): org.apache.hudi.exception.HoodieMetadataException: Failed to retrieve files in partition s3a://strava.scratch/tmp/derick/hudi/__HIVE_DEFAULT_PARTITION__ from metadata
    at org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:137)
    at org.apache.hudi.metadata.HoodieMetadataFileSystemView.listPartition(HoodieMetadataFileSystemView.java:65)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$ensurePartitionLoadedCorrectly$9(AbstractTableFileSystemView.java:305)
    at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.ensurePartitionLoadedCorrectly(AbstractTableFileSystemView.java:296)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.getLatestBaseFilesBeforeOrOn(AbstractTableFileSystemView.java:517)
    at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:103)
    at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.getLatestBaseFilesBeforeOrOn(PriorityBasedFileSystemView.java:144)
    at org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForPartition(HoodieIndexUtils.java:69)
    at org.apache.hudi.index.HoodieIndexUtils.lambda$getLatestBaseFilesForAllPartitions$ff6885d8$1(HoodieIndexUtils.java:89)
    at org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:137)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file 
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:352)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:192)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:110)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:103)
    at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:63)
    at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:51)
    at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader$Builder.build(HoodieMetadataMergedLogRecordReader.java:230)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:508)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:470)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:416)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getOrCreateReaders$11(HoodieBackedTableMetadata.java:402)
    at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:402)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$1(HoodieBackedTableMetadata.java:211)
    at java.util.HashMap.forEach(HashMap.java:1290)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:209)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKey(HoodieBackedTableMetadata.java:141)
    at org.apache.hudi.metadata.BaseTableMetadata.fetchAllFilesInPartition(BaseTableMetadata.java:312)
    at org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:135)
    ... 37 more
Caused by: java.lang.IllegalStateException: Block has already been inflated
    at org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:67)
    at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:267)
    at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:278)
    at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:83)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:473)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:343)
    ... 55 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:361)
    at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
    at org.apache.hudi.client.common.HoodieSparkEngineContext.flatMap(HoodieSparkEngineContext.java:137)
    at org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions(HoodieIndexUtils.java:87)
    at org.apache.hudi.index.simple.HoodieSimpleIndex.fetchRecordLocationsForAffectedPartitions(HoodieSimpleIndex.java:144)
    at org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocationInternal(HoodieSimpleIndex.java:113)
    at org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocation(HoodieSimpleIndex.java:91)
    at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:49)
    at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:32)
    at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:53)
    ... 11 more
Caused by: org.apache.hudi.exception.HoodieMetadataException: Failed to retrieve files in partition s3a://strava.scratch/tmp/derick/hudi/__HIVE_DEFAULT_PARTITION__ from metadata
    at org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:137)
    at org.apache.hudi.metadata.HoodieMetadataFileSystemView.listPartition(HoodieMetadataFileSystemView.java:65)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$ensurePartitionLoadedCorrectly$9(AbstractTableFileSystemView.java:305)
    at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.ensurePartitionLoadedCorrectly(AbstractTableFileSystemView.java:296)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.getLatestBaseFilesBeforeOrOn(AbstractTableFileSystemView.java:517)
    at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:103)
    at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.getLatestBaseFilesBeforeOrOn(PriorityBasedFileSystemView.java:144)
    at org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForPartition(HoodieIndexUtils.java:69)
    at org.apache.hudi.index.HoodieIndexUtils.lambda$getLatestBaseFilesForAllPartitions$ff6885d8$1(HoodieIndexUtils.java:89)
    at org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:137)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    ... 3 more
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file 
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:352)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:192)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:110)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:103)
    at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:63)
    at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:51)
    at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader$Builder.build(HoodieMetadataMergedLogRecordReader.java:230)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:508)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:470)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:416)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getOrCreateReaders$11(HoodieBackedTableMetadata.java:402)
    at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:402)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$1(HoodieBackedTableMetadata.java:211)
    at java.util.HashMap.forEach(HashMap.java:1290)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:209)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKey(HoodieBackedTableMetadata.java:141)
    at org.apache.hudi.metadata.BaseTableMetadata.fetchAllFilesInPartition(BaseTableMetadata.java:312)
    at org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:135)
    ... 37 more
Caused by: java.lang.IllegalStateException: Block has already been inflated
    at org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:67)
    at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:267)
    at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:278)
    at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:83)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:473)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:343)
    ... 55 more
22/08/17 23:07:26 INFO DeltaSync: Shutting down embedded timeline server
22/08/17 23:07:26 INFO EmbeddedTimelineService: Closing Timeline server
22/08/17 23:07:26 INFO TimelineService: Closing Timeline Service
22/08/17 23:07:26 INFO Javalin: Stopping Javalin ...
22/08/17 23:07:26 INFO SparkUI: Stopped Spark web UI at http://d16171598c10:8090
22/08/17 23:07:26 ERROR Javalin: Javalin failed to stop gracefully
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
    at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
    at org.apache.hudi.org.eclipse.jetty.server.AbstractConnector.doStop(AbstractConnector.java:333)
    at org.apache.hudi.org.eclipse.jetty.server.AbstractNetworkConnector.doStop(AbstractNetworkConnector.java:88)
    at org.apache.hudi.org.eclipse.jetty.server.ServerConnector.doStop(ServerConnector.java:248)
    at org.apache.hudi.org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:89)
    at org.apache.hudi.org.eclipse.jetty.server.Server.doStop(Server.java:450)
    at org.apache.hudi.org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:89)
    at io.javalin.Javalin.stop(Javalin.java:195)
    at org.apache.hudi.timeline.service.TimelineService.close(TimelineService.java:325)
    at org.apache.hudi.client.embedded.EmbeddedTimelineService.stop(EmbeddedTimelineService.java:141)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.close(DeltaSync.java:905)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.close(HoodieDeltaStreamer.java:831)
    at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.onDeltaSyncShutdown(HoodieDeltaStreamer.java:223)
    at org.apache.hudi.async.HoodieAsyncService.lambda$shutdownCallback$0(HoodieAsyncService.java:171)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
22/08/17 23:07:26 INFO Javalin: Javalin has stopped
22/08/17 23:07:26 INFO TimelineService: Closed Timeline Service
22/08/17 23:07:26 INFO EmbeddedTimelineService: Closed Timeline server
22/08/17 23:07:26 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/08/17 23:07:26 INFO MemoryStore: MemoryStore cleared
22/08/17 23:07:26 INFO BlockManager: BlockManager stopped
22/08/17 23:07:26 INFO BlockManagerMaster: BlockManagerMaster stopped
22/08/17 23:07:26 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/08/17 23:07:26 INFO SparkContext: Successfully stopped SparkContext
@xushiyan xushiyan added priority:critical production down; pipelines stalled; Need help asap. hudistreamer issues related to Hudi streamer (Formely deltastreamer) labels Aug 23, 2022
@rmahindra123
Copy link
Contributor

@yihua can you take a look, seems like a metadata related issue?

@dyang108
Copy link
Author

Update: I got it working on an older version of Hudi 0.10.1, so seems like a regression

@nsivabalan nsivabalan added writer-core Issues relating to core transactions/write actions reader-core labels Aug 27, 2022
@xushiyan xushiyan added metadata metadata table and removed writer-core Issues relating to core transactions/write actions reader-core labels Sep 18, 2022
@nsivabalan
Copy link
Contributor

@dyang108 : is this happening infrequently? or your pipeline is just stuck. We have unit tests, integration tests for metadata table and we haven't this this issue yet. trying to gauge whats diff in your env or set up.
looks like we are just trying to read records from metadata table. nothing fancy.

@dyang108
Copy link
Author

This happened consistently with the command above every time i ran on Hudi version : 0.12.0 (also tried 0.11.1)

The pipeline failed and exited when I saw this issue.

@kasured
Copy link

kasured commented Sep 24, 2022

We have the same stacktrace when running on hudi version 0.11.0, spark 3.2.1, EMR 6.7. We have metadata service enabled and our Spark Streaming Query fails each time. This is a COW table

@kasured
Copy link

kasured commented Sep 24, 2022

@nsivabalan What might be a general workaround in that situation to unblock the processing? Of course it depends on the root cause. However will deleting and recreating metadata from hudi-cli help ? One other option might be to disable metadata on the current table and proceed.

@nsivabalan
Copy link
Contributor

got it. did you mean, you are using EMR's spark or oss spark? I understand its EMR cluster.

@yihua yihua assigned nsivabalan and unassigned yihua Sep 26, 2022
@yihua
Copy link
Contributor

yihua commented Sep 26, 2022

@kasured to unblock the processing, could you try disabling and deleting the metadata table by setting hoodie.metadata.enable=false in Hudi configs? This automatically deletes the metadata table after a few commits.

@kasured
Copy link

kasured commented Sep 27, 2022

@yihua Yes that helped. However I can assume that the same can be done with hudi-cli as I wrote before. medatada delete and metadata create

@nsivabalan Yes we are using amazon bundle for Spark 3.2.1 which is provided by EMR 6.7

@nsivabalan
Copy link
Contributor

yes, you are right. you can disable via hudi-cli as well.

@nsivabalan
Copy link
Contributor

Since we could not reproduce w/ OSS spark, can you reach out to aws support.
CC @umehrot2 @rahil-c : Have you folks seen this issue before. seems like simple read from metadata table is failing w/ EMR spark.

@dyang108
Copy link
Author

I saw this issue with Spark on mesos (on EC2), not EMR Spark

@gunjanchaudhary87
Copy link

Hi ,

Is there any resolution for this issue yet or any idea by which release this issue can be fixed ?
I am also facing the same issue.
My test case is very simple - to reload same file twice

  1. With hoodie.datasource.write.operation = BulkInsert
  2. With hoodie.datasource.write.operation = Upsert

When metadata is enabled Bulk Insert works fine , but Upsert Aborts with "Caused by: java.lang.IllegalStateException: Block has already been inflated"
When metedata is disabled ( hoodie.metadata.enable = false ) The Upsert works fine.

My test cases mostly depend on Metadata , so I need it to be enabled. Please let me know if there is any other workaround.

Thank you !

@danny0405
Copy link
Contributor

cc @nsivabalan to look into this issue, thanks.

@danny0405 danny0405 changed the title [SUPPORT] S3 Deltastreamer: Block has already been inflated [BUG] S3 Deltastreamer: Block has already been inflated Jan 26, 2023
@jschuck9
Copy link

jschuck9 commented Feb 7, 2023

Is a fix for this issue planned to be regressed into 13.0 or a 12.x patch release?

@wyb199701
Copy link

I got the same issue.
When I excluded the hudi-common from hudi-aws, it worked successfully

@xccui
Copy link
Member

xccui commented Apr 20, 2023

Hit this when using Flink 1.16 and Hudi bdb50dd on EKS. Metadata table was disabled enabled.

2023-04-19 23:55:23
org.apache.hudi.exception.HoodieMetadataException: Failed to retrieve files in partition s3a://path-to-data/ from metadata
    at org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:152)
    at org.apache.hudi.metadata.HoodieMetadataFileSystemView.listPartition(HoodieMetadataFileSystemView.java:69)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$ensurePartitionLoadedCorrectly$16(AbstractTableFileSystemView.java:428)
    at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(Unknown Source)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.ensurePartitionLoadedCorrectly(AbstractTableFileSystemView.java:419)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.getLatestMergedFileSlicesBeforeOrOn(AbstractTableFileSystemView.java:854)
    at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:104)
    at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.getLatestMergedFileSlicesBeforeOrOn(PriorityBasedFileSystemView.java:195)
    at org.apache.hudi.sink.partitioner.profile.DeltaWriteProfile.smallFilesProfile(DeltaWriteProfile.java:62)
    at org.apache.hudi.sink.partitioner.profile.WriteProfile.getSmallFiles(WriteProfile.java:191)
    at org.apache.hudi.sink.partitioner.BucketAssigner.getSmallFileAssign(BucketAssigner.java:179)
    at org.apache.hudi.sink.partitioner.BucketAssigner.addInsert(BucketAssigner.java:137)
    at org.apache.hudi.sink.partitioner.BucketAssignFunction.getNewRecordLocation(BucketAssignFunction.java:215)
    at org.apache.hudi.sink.partitioner.BucketAssignFunction.processRecord(BucketAssignFunction.java:200)
    at org.apache.hudi.sink.partitioner.BucketAssignFunction.processElement(BucketAssignFunction.java:162)
    at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
    at [org.apache.flink.streaming.runtime.io](http://org.apache.flink.streaming.runtime.io/).AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at [org.apache.flink.streaming.runtime.io](http://org.apache.flink.streaming.runtime.io/).AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at [org.apache.flink.streaming.runtime.io](http://org.apache.flink.streaming.runtime.io/).StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:375)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:222)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:199)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:115)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:74)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:465)
    at org.apache.hudi.metadata.HoodieMetadataLogRecordReader$Builder.build(HoodieMetadataLogRecordReader.java:218)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:539)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:440)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:425)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$3(HoodieBackedTableMetadata.java:239)
    at java.base/java.util.HashMap.forEach(Unknown Source)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:237)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKey(HoodieBackedTableMetadata.java:152)
    at org.apache.hudi.metadata.BaseTableMetadata.fetchAllFilesInPartition(BaseTableMetadata.java:339)
    at org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:150)
    ... 28 more
Caused by: java.lang.IllegalStateException: Block has already been inflated
    at org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:76)
    at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:276)
    at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:287)
    at org.apache.hudi.common.table.log.block.HoodieDataBlock.readRecordsFromBlockPayload(HoodieDataBlock.java:166)
    at org.apache.hudi.common.table.log.block.HoodieDataBlock.getRecordIterator(HoodieDataBlock.java:128)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.getRecordsIterator(AbstractHoodieLogRecordReader.java:807)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:630)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:674)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:366

@xccui
Copy link
Member

xccui commented Apr 20, 2023

I don't know the detailed logic here, but apparently recursively invoking inflate() when hitting an IOException will cause the state check to fail.

image

@codope codope added priority:major degraded perf; unable to move forward; potential bugs and removed priority:critical production down; pipelines stalled; Need help asap. release-0.14.0 labels May 24, 2023
@zinking
Copy link

zinking commented Jun 16, 2023

@kasured to unblock the processing, could you try disabling and deleting the metadata table by setting hoodie.metadata.enable=false in Hudi configs? This automatically deletes the metadata table after a few commits.

right, this seems obviously flawed, it is hiding the actual IO Exception, instead throwing an irrevelant block inflated.

@danny0405
Copy link
Contributor

@zinking Can you fire a fix for it.

@envomp
Copy link
Contributor

envomp commented Oct 9, 2023

Hey folks,

This issue: https://gist.github.com/envomp/268bdd35a3b3399db59583c0e159c229#file-cover-logs
Seems to be a cover-up to real underlying issue which in our case was: https://gist.github.com/envomp/268bdd35a3b3399db59583c0e159c229#file-actual-logs

Which in turn was caused by TIMELINE_SERVER_BASED marker types being unable when using spark structured streaming. Workaround was to disable metadata table.

@ad1happy2go
Copy link
Collaborator

@envomp Are you setting fs.s3a.connection.maximum to a higher value. That might fix the Connection timeout issue.

@envomp
Copy link
Contributor

envomp commented Oct 9, 2023

Hey @ad1happy2go

We have the following s3a configurations:

spark.hadoop.fs.s3a.path.style.access: true
spark.hadoop.fs.s3a.threads.max: 64
spark.hadoop.fs.s3a.connection.maximum: 1024
spark.hadoop.fs.s3a.maxRetries: 64

Also tried setting fs.s3a.connection.maximum to 8096 but the issue persisted.

EDIT:

For a table with smaller volume this is how disabling metadata table affected the app duraton time:
Screenshot 2023-10-09 at 14 34 42

@chestnutqiang
Copy link
Contributor

chestnutqiang commented Feb 6, 2024

same problem, version 0.14.1, on hdfs .

@yihua
Copy link
Contributor

yihua commented Sep 22, 2024

The issue of Block has already been inflated is due to a bug that is fixed by #7434.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
aws-support hudistreamer issues related to Hudi streamer (Formely deltastreamer) metadata metadata table priority:major degraded perf; unable to move forward; potential bugs
Projects
Status: 🚧 Needs Repro
Development

No branches or pull requests