Skip to content

Commit

Permalink
Updated the ivarator mechanisms to cleanup directories when they are
Browse files Browse the repository at this point in the history
done.  This has resulted in collapsing the directory structure.  However
since they are now deleted when done we should be able to avoid creating
too many subdirectories.

Co-authored-by: Whitney O'Meara <[email protected]>
  • Loading branch information
ivakegg and jwomeara committed Apr 21, 2023
1 parent c7466b9 commit b5ee9eb
Show file tree
Hide file tree
Showing 8 changed files with 308 additions and 157 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1560,8 +1560,7 @@ private GenericObjectPool.Config createIvaratorSourcePoolConfig(int maxIvaratorS
protected String getHdfsCacheSubDirPrefix() {
// if we have a document specific range, or a list of specific doc ids (bundled document specific range per-se), then
// we could have multiple iterators running against this shard for this query at the same time.
// In this case we need to differentiate between the ivarator directories being created. However this is
// a situation we do not want to be in, so we will also display a large warning to be seen by the accumulo monitor.
// In this case we need to differentiate between the ivarator directories being created.
String hdfsPrefix = null;
if (isDocumentSpecificRange(this.range)) {
hdfsPrefix = range.getStartKey().getColumnFamily().toString().replace('\0', '_');
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package datawave.query.iterator.builder;

import datawave.query.iterator.ivarator.IvaratorCacheDir;
import datawave.core.iterators.querylock.QueryLock;
import datawave.query.composite.CompositeMetadata;
import datawave.query.iterator.ivarator.IvaratorCacheDir;
import datawave.query.iterator.profile.QuerySpanCollector;
import datawave.query.util.sortedset.FileSortedSet;
import org.apache.accumulo.core.data.Key;
Expand All @@ -12,6 +12,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
Expand Down Expand Up @@ -44,10 +45,22 @@ protected void validateIvaratorControlDir(IvaratorCacheDir ivaratorCacheDir) {
String ivaratorCacheDirURI = ivaratorCacheDir.getPathURI();
FileSystem hdfsFileSystem = ivaratorCacheDir.getFs();

final URI hdfsCacheURI;
try {
hdfsCacheURI = new URI(ivaratorCacheDirURI);
hdfsFileSystem.mkdirs(new Path(hdfsCacheURI));
final Path hdfsCachePath = new Path(new URI(ivaratorCacheDirURI));
// get the parent directory
final Path parentCachePath = hdfsCachePath.getParent();
final URI parentURI = parentCachePath.toUri();
if (!hdfsFileSystem.exists(parentCachePath)) {
// being able to make the parent directory is proof enough
hdfsFileSystem.mkdirs(parentCachePath);
} else if (hdfsFileSystem.getFileStatus(parentCachePath).isFile()) {
throw new IOException(parentCachePath.toString() + " exists but is a file. Expecting directory");
} else if (parentURI.getScheme().equals("file")) {
File parent = new File(parentURI.getPath());
if (!parent.canWrite() || !parent.canRead()) {
throw new IllegalStateException("Invalid permissions to directory " + parentCachePath);
}
}
} catch (MalformedURLException e) {
throw new IllegalStateException("Unable to load hadoop configuration", e);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public class IvaratorCacheDirConfig {
// the minimum percent of available storage required to use this filesystem
final protected double minAvailableStoragePercent;

// Do we require the ivarators to cleanup after completion (true by default)
final protected boolean requiresCleanup;

static {
objectMapper.configure(SerializationFeature.WRITE_SINGLE_ELEM_ARRAYS_UNWRAPPED, true);
objectMapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
Expand All @@ -47,23 +50,32 @@ public IvaratorCacheDirConfig(String basePathURI) {
this(basePathURI, DEFAULT_PRIORITY);
}

public IvaratorCacheDirConfig(String basePathURI, boolean requiresCleanup) {
this(basePathURI, DEFAULT_PRIORITY, requiresCleanup);
}

public IvaratorCacheDirConfig(String basePathURI, int priority, boolean requiresCleanup) {
this(basePathURI, priority, DEFAULT_MIN_AVAILABLE_STORAGE_MiB, DEFAULT_MIN_AVAILABLE_STORAGE_PERCENT, requiresCleanup);
}

public IvaratorCacheDirConfig(String basePathURI, int priority) {
this(basePathURI, priority, DEFAULT_MIN_AVAILABLE_STORAGE_MiB, DEFAULT_MIN_AVAILABLE_STORAGE_PERCENT);
this(basePathURI, priority, DEFAULT_MIN_AVAILABLE_STORAGE_MiB, DEFAULT_MIN_AVAILABLE_STORAGE_PERCENT, true);
}

public IvaratorCacheDirConfig(String basePathURI, int priority, long minAvailableStorageMiB) {
this(basePathURI, priority, minAvailableStorageMiB, DEFAULT_MIN_AVAILABLE_STORAGE_PERCENT);
this(basePathURI, priority, minAvailableStorageMiB, DEFAULT_MIN_AVAILABLE_STORAGE_PERCENT, true);
}

public IvaratorCacheDirConfig(String basePathURI, int priority, double minAvailableStoragePercent) {
this(basePathURI, priority, DEFAULT_MIN_AVAILABLE_STORAGE_MiB, minAvailableStoragePercent);
this(basePathURI, priority, DEFAULT_MIN_AVAILABLE_STORAGE_MiB, minAvailableStoragePercent, true);
}

private IvaratorCacheDirConfig(String basePathURI, int priority, long minAvailableStorageMiB, double minAvailableStoragePercent) {
private IvaratorCacheDirConfig(String basePathURI, int priority, long minAvailableStorageMiB, double minAvailableStoragePercent, boolean requiresCleanup) {
this.basePathURI = basePathURI;
this.priority = priority;
this.minAvailableStorageMiB = minAvailableStorageMiB;
this.minAvailableStoragePercent = minAvailableStoragePercent;
this.requiresCleanup = requiresCleanup;
}

@JsonIgnore
Expand Down Expand Up @@ -104,6 +116,10 @@ public double getMinAvailableStoragePercent() {
return minAvailableStoragePercent;
}

public boolean isRequiresCleanup() {
return requiresCleanup;
}

public static String toJson(IvaratorCacheDirConfig ivaratorCacheDirConfig) throws JsonProcessingException {
return toJson(Collections.singletonList(ivaratorCacheDirConfig));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1049,30 +1049,38 @@ public Object visit(ASTNumberLiteral node, Object o) {
}

/**
* Build a list of potential hdfs directories based on each ivarator cache dir configs.
* Build a list of potential directories based on each ivarator cache dir configs. This will take the
* ivarator base directories and create a subdirectory consisting of
* <ul>
* <li>the queryid</li>
* <li>scan id</li>
* <li>ivaratorCacheSubDirPrefix (base on start key)</li>
* <li>"term" + the term index</li>
* </ul>
*
* @return A path
* @return A list of ivarator cache directors specific to this ivarator instance
* @throws IOException
* for issues with read/write
*/
private List<IvaratorCacheDir> getIvaratorCacheDirs() throws IOException {
List<IvaratorCacheDir> pathAndFs = new ArrayList<>();

// first lets increment the count for a unique subdirectory
String subdirectory = ivaratorCacheSubDirPrefix + "term" + Integer.toString(++ivaratorCount);
StringBuilder subDirectoryBuilder = new StringBuilder();
subDirectoryBuilder.append(queryId);
if (scanId == null) {
log.warn("Running query iterator for " + queryId + " without a scan id. This could cause ivarator directory conflicts.");
} else {
subDirectoryBuilder.append('_').append(scanId);
}
// and lets increment the count for a unique subdirectory
subDirectoryBuilder.append('_').append(ivaratorCacheSubDirPrefix).append("term").append(++ivaratorCount);
String subDirectory = subDirectoryBuilder.toString();

if (ivaratorCacheDirConfigs != null && !ivaratorCacheDirConfigs.isEmpty()) {
for (IvaratorCacheDirConfig config : ivaratorCacheDirConfigs) {

// first, make sure the cache configuration is valid
// make sure the cache configuration is valid
if (config.isValid()) {
Path path = new Path(config.getBasePathURI(), queryId);
if (scanId == null) {
log.warn("Running query iterator for " + queryId + " without a scan id. This could cause ivarator directory conflicts.");
} else {
path = new Path(path, scanId);
}
path = new Path(path, subdirectory);
Path path = new Path(config.getBasePathURI(), subDirectory);
URI uri = path.toUri();
pathAndFs.add(new IvaratorCacheDir(config, hdfsFileSystem.getFileSystem(uri), uri.toString()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public void testMaxValueRangeMutiHdfsLocations() throws Exception {
// expected
}

ivaratorConfig(3, false);
ivaratorConfig(3, false, true);
runTest(query, query);
parsePlan(VALUE_THRESHOLD_JEXL_NODE, 1);
}
Expand Down
Loading

0 comments on commit b5ee9eb

Please sign in to comment.