Skip to content

Commit

Permalink
feat: Better exception handling for file commands
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Aug 7, 2024
1 parent 09fe235 commit b41ec3a
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public Integer call() throws Exception {
return 0;
}

protected abstract C executionContext() throws Exception;
protected abstract C executionContext();

private void setupLogging() {
Level level = logLevel();
Expand Down Expand Up @@ -73,7 +73,7 @@ private static void setBoolean(String property, boolean value) {
System.setProperty(property, String.valueOf(value));
}

protected abstract void execute(C context) throws Exception;
protected abstract void execute(C context);

public LoggingArgs getLoggingArgs() {
return loggingArgs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public abstract class AbstractJobCommand<C extends JobExecutionContext> extends
private JobLauncher jobLauncher;

@Override
protected C executionContext() throws Exception {
protected C executionContext() {
C context = newExecutionContext();
context.setJobName(jobName());
context.setJobRepositoryName(jobRepositoryName);
Expand All @@ -76,32 +76,29 @@ private String jobName() {
}

@Override
protected void execute(C context) throws Exception {
protected void execute(C context) {
Job job = job(context);
JobExecution jobExecution = context.getJobLauncher().run(job, new JobParameters());
if (JobUtils.isFailed(jobExecution.getExitStatus())) {
throw jobExecutionException(jobExecution);
JobExecution jobExecution;
try {
jobExecution = context.getJobLauncher().run(job, new JobParameters());
} catch (JobExecutionException e) {
throw new RiotException("Job failed", e);
}
}

private Exception jobExecutionException(JobExecution jobExecution) {
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
if (JobUtils.isFailed(stepExecution.getExitStatus())) {
return wrapException(stepExecution.getFailureExceptions());
if (JobUtils.isFailed(jobExecution.getExitStatus())) {
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
if (JobUtils.isFailed(stepExecution.getExitStatus())) {
throw wrapException(stepExecution.getFailureExceptions());
}
}
throw wrapException(jobExecution.getFailureExceptions());
}
return wrapException(jobExecution.getFailureExceptions());
}

private Exception wrapException(List<Throwable> throwables) {
private RiotException wrapException(List<Throwable> throwables) {
if (throwables.isEmpty()) {
return new JobExecutionException("Job failed");
}
Throwable throwable = throwables.get(0);
if (throwable instanceof Exception) {
return (Exception) throwable;
return new RiotException("Job failed");
}
return new JobExecutionException("Job failed", throwable);
return new RiotException("Job failed", throwables.get(0));
}

protected Job job(C context, Step<?, ?>... steps) {
Expand All @@ -122,7 +119,7 @@ protected boolean shouldShowProgress() {
return stepArgs.getProgressArgs().getStyle() != ProgressStyle.NONE;
}

protected abstract Job job(C context) throws Exception;
protected abstract Job job(C context);

private <I, O> TaskletStep step(C context, Step<I, O> step) {
SimpleStepBuilder<I, O> builder = simpleStep(context, step);
Expand All @@ -147,6 +144,7 @@ private <I, O> TaskletStep step(C context, Step<I, O> step) {
return ftStep.build();
}

@SuppressWarnings("removal")
private <I, O> SimpleStepBuilder<I, O> simpleStep(C context, Step<I, O> step) {
String stepName = context.getJobName() + "-" + step.getName();
if (step.getReader() instanceof ItemStreamSupport) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ protected ItemProcessor<KeyValue<String, Object>, Map<String, Object>> mapProces
return new FunctionItemProcessor<>(mapFunction);
}

protected <T> Step<KeyValue<String, Object>, T> step(C context, ItemWriter<T> writer) throws Exception {
protected <T> Step<KeyValue<String, Object>, T> step(C context, ItemWriter<T> writer) {
RedisItemReader<String, String, Object> reader = RedisItemReader.struct();
configure(context, reader);
Step<KeyValue<String, Object>, T> step = new Step<>(STEP_NAME, reader, writer);
Expand All @@ -60,7 +60,7 @@ protected void configure(C context, RedisItemReader<String, String, Object> read
redisReaderArgs.configure(reader);
}

public static void configure(Step<?, ?> step) throws Exception {
public static void configure(Step<?, ?> step) {
Assert.isInstanceOf(RedisItemReader.class, step.getReader(),
"Step reader must be an instance of RedisItemReader");
RedisItemReader<?, ?, ?> reader = (RedisItemReader<?, ?, ?>) step.getReader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public abstract class AbstractRedisCommand<C extends RedisExecutionContext> exte
private RedisClientArgs redisClientArgs = new RedisClientArgs();

@Override
protected C executionContext() throws Exception {
protected C executionContext() {
C context = super.executionContext();
context.setRedisContext(redisContext());
return context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ protected RedisExecutionContext newExecutionContext() {
}

@Override
protected Job job(RedisExecutionContext context) throws Exception {
protected Job job(RedisExecutionContext context) {
return job(context, step(context, databaseWriterArgs.writer()).processor(mapProcessor()));
}

Expand Down
13 changes: 10 additions & 3 deletions plugins/riot/src/main/java/com/redis/riot/FileExport.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.redis.riot;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
Expand All @@ -11,6 +12,7 @@
import org.springframework.batch.item.ItemWriter;
import org.springframework.core.io.WritableResource;

import com.redis.riot.core.RiotException;
import com.redis.riot.file.FileType;
import com.redis.riot.file.FileUtils;
import com.redis.riot.file.FileWriterArgs;
Expand Down Expand Up @@ -46,7 +48,7 @@ protected FileExportExecutionContext newExecutionContext() {
}

@Override
protected FileExportExecutionContext executionContext() throws Exception {
protected FileExportExecutionContext executionContext() {
FileExportExecutionContext context = super.executionContext();
FileWriterFactory factory = new FileWriterFactory();
factory.setArgs(fileWriterArgs);
Expand All @@ -56,8 +58,13 @@ protected FileExportExecutionContext executionContext() throws Exception {

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
protected Job job(FileExportExecutionContext context) throws Exception {
WritableResource resource = fileWriterArgs.resource(file);
protected Job job(FileExportExecutionContext context) {
WritableResource resource;
try {
resource = fileWriterArgs.resource(file);
} catch (IOException e) {
throw new RiotException("Could not open file " + file, e);
}
FileType fileType = fileType(resource);
ItemWriter writer = context.getFileWriterFactory().create(resource, fileType,
() -> headerRecord(context, fileType));
Expand Down
41 changes: 16 additions & 25 deletions plugins/riot/src/main/java/com/redis/riot/FileImport.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.redis.riot;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
Expand Down Expand Up @@ -57,7 +58,7 @@ protected FileImportExecutionContext newExecutionContext() {
}

@Override
protected FileImportExecutionContext executionContext() throws Exception {
protected FileImportExecutionContext executionContext() {
FileImportExecutionContext context = super.executionContext();
FileReaderFactory factory = new FileReaderFactory();
factory.addDeserializer(KeyValue.class, new KeyValueDeserializer());
Expand All @@ -67,10 +68,22 @@ protected FileImportExecutionContext executionContext() throws Exception {
}

@Override
protected Job job(FileImportExecutionContext context) throws IOException {
protected Job job(FileImportExecutionContext context) {
Assert.notEmpty(files, "No file specified");
List<Step<?, ?>> steps = new ArrayList<>();
for (Resource resource : resources()) {
List<Resource> resources = new ArrayList<>();
for (String file : files) {
try {
for (String expandedFile : FileUtils.expand(file)) {
resources.add(fileReaderArgs.resource(expandedFile));
}
} catch (FileNotFoundException e) {
throw new RiotException("File not found: " + file);
} catch (IOException e) {
throw new RiotException("Could not read file " + file, e);
}
}
for (Resource resource : resources) {
Step<?, ?> step = step(context, resource);
step.skip(ParseException.class);
step.skip(org.springframework.batch.item.ParseException.class);
Expand Down Expand Up @@ -134,28 +147,6 @@ private Function<Map<String, Object>, Map<String, Object>> toFieldFunction(Strin
return new MapToFieldFunction(key).andThen((Function) new RegexNamedGroupFunction(regex));
}

public List<Resource> resources() {
List<Resource> resources = new ArrayList<>();
for (String file : files) {
List<String> expandedFiles;
try {
expandedFiles = FileUtils.expand(file);
} catch (IOException e) {
throw new RiotException(String.format("Could not expand file %s", file), e);
}
for (String expandedFile : expandedFiles) {
Resource resource;
try {
resource = fileReaderArgs.resource(expandedFile);
} catch (IOException e) {
throw new RiotException(String.format("Could not create resource from file %s", expandedFile), e);
}
resources.add(resource);
}
}
return resources;
}

public List<String> getFiles() {
return files;
}
Expand Down
7 changes: 3 additions & 4 deletions plugins/riot/src/main/java/com/redis/riot/Replicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ protected boolean isQuickCompare() {
}

@Override
protected Job job(TargetRedisExecutionContext context) throws Exception {
protected Job job(TargetRedisExecutionContext context) {
List<Step<?, ?>> steps = new ArrayList<>();
Step<KeyValue<byte[], Object>, KeyValue<byte[], Object>> replicateStep = replicateStep(context);
Step<KeyValue<byte[], Object>, KeyValue<byte[], Object>> replicateStep = step(context);
steps.add(replicateStep);
if (shouldCompare()) {
steps.add(compareStep(context));
Expand Down Expand Up @@ -117,8 +117,7 @@ protected void configureTargetWriter(TargetRedisExecutionContext context, RedisI
context.configureTargetWriter(writer);
}

private Step<KeyValue<byte[], Object>, KeyValue<byte[], Object>> replicateStep(TargetRedisExecutionContext context)
throws Exception {
private Step<KeyValue<byte[], Object>, KeyValue<byte[], Object>> step(TargetRedisExecutionContext context) {
RedisItemReader<byte[], byte[], Object> reader = reader();
configureSourceReader(context, reader);
RedisItemWriter<byte[], byte[], KeyValue<byte[], Object>> writer = writer();
Expand Down

0 comments on commit b41ec3a

Please sign in to comment.