Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated the ivarator mechanisms to cleanup directories #1849

Open
wants to merge 30 commits into
base: integration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3ffa4cd
Updated the ivarator mechanisms to cleanup directories when they are
ivakegg Feb 11, 2023
7dd687c
review comments and plugged hole where directories may not be cleaned up
ivakegg Apr 29, 2023
86c6444
formatting
ivakegg Sep 22, 2023
3323bac
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Sep 29, 2023
0c9c5a0
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Oct 3, 2023
41b333e
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Oct 4, 2023
ad42e86
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Oct 25, 2023
b599a1b
Update tests.yml (#2095)
alerman Nov 3, 2023
70bab4d
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Nov 20, 2023
50e424b
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Nov 22, 2023
9bbc9ce
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Nov 22, 2023
9bacf9d
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Nov 24, 2023
bbd5c12
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Nov 24, 2023
721cb16
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Nov 24, 2023
e39a172
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Nov 24, 2023
3882f44
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Nov 29, 2023
e2a15e6
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Dec 16, 2023
1351366
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Dec 20, 2023
587222c
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Jan 16, 2024
9685d2a
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Jan 26, 2024
d3b4ef2
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Feb 9, 2024
15202f8
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Feb 14, 2024
80ddb11
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Feb 16, 2024
e2fbfcf
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Apr 5, 2024
93a11ee
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Apr 12, 2024
b58622f
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Apr 22, 2024
dcb45e7
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg May 3, 2024
eef4e80
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg May 10, 2024
29ed534
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg May 10, 2024
fd8698c
Merge branch 'integration' into feature/ivaratorCleanup
ivakegg Sep 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1488,8 +1488,7 @@ private GenericObjectPool.Config createIvaratorSourcePoolConfig(int maxIvaratorS
protected String getHdfsCacheSubDirPrefix() {
// if we have a document specific range, or a list of specific doc ids (bundled document specific range per-se), then
// we could have multiple iterators running against this shard for this query at the same time.
// In this case we need to differentiate between the ivarator directories being created. However this is
// a situation we do not want to be in, so we will also display a large warning to be seen by the accumulo monitor.
// In this case we need to differentiate between the ivarator directories being created.
String hdfsPrefix = null;
if (isDocumentSpecificRange(this.range)) {
hdfsPrefix = range.getStartKey().getColumnFamily().toString().replace('\0', '_');
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datawave.query.iterator.builder;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
Expand Down Expand Up @@ -43,18 +44,30 @@ public abstract class IvaratorBuilder extends IndexIteratorBuilder {

protected void validateIvaratorControlDir(IvaratorCacheDir ivaratorCacheDir) {
String ivaratorCacheDirURI = ivaratorCacheDir.getPathURI();
FileSystem hdfsFileSystem = ivaratorCacheDir.getFs();
FileSystem fileSystem = ivaratorCacheDir.getFs();

final URI hdfsCacheURI;
try {
hdfsCacheURI = new URI(ivaratorCacheDirURI);
hdfsFileSystem.mkdirs(new Path(hdfsCacheURI));
final Path cachePath = new Path(new URI(ivaratorCacheDirURI));
// get the parent directory
final Path parentCachePath = cachePath.getParent();
final URI parentURI = parentCachePath.toUri();
if (!fileSystem.exists(parentCachePath)) {
// being able to make the parent directory is proof enough
fileSystem.mkdirs(parentCachePath);
} else if (fileSystem.getFileStatus(parentCachePath).isFile()) {
throw new IOException(parentCachePath + " exists but is a file. Expecting directory");
} else if (parentURI.getScheme().equals("file")) {
ivakegg marked this conversation as resolved.
Show resolved Hide resolved
File parent = new File(parentURI.getPath());
if (!parent.canWrite() || !parent.canRead()) {
throw new IllegalStateException("Invalid permissions to directory " + parentCachePath);
}
}
ivakegg marked this conversation as resolved.
Show resolved Hide resolved
} catch (MalformedURLException e) {
throw new IllegalStateException("Unable to load hadoop configuration", e);
throw new IllegalStateException("Invalid ivarator configuration: " + ivaratorCacheDirURI, e);
} catch (IOException e) {
throw new IllegalStateException("Unable to create hadoop file system", e);
throw new IllegalStateException("Unable to create file system", e);
} catch (URISyntaxException e) {
throw new IllegalStateException("Invalid hdfs cache dir URI: " + ivaratorCacheDirURI, e);
throw new IllegalStateException("Invalid cache dir URI: " + ivaratorCacheDirURI, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public class IvaratorCacheDirConfig implements Serializable {
// the minimum percent of available storage required to use this filesystem
protected double minAvailableStoragePercent;

// Do we require the ivarators to cleanup after completion (true by default)
final protected boolean requiresCleanup;
ivakegg marked this conversation as resolved.
Show resolved Hide resolved

static {
objectMapper.configure(SerializationFeature.WRITE_SINGLE_ELEM_ARRAYS_UNWRAPPED, true);
objectMapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
Expand All @@ -49,23 +52,32 @@ public IvaratorCacheDirConfig(String basePathURI) {
this(basePathURI, DEFAULT_PRIORITY);
}

public IvaratorCacheDirConfig(String basePathURI, boolean requiresCleanup) {
this(basePathURI, DEFAULT_PRIORITY, requiresCleanup);
}

public IvaratorCacheDirConfig(String basePathURI, int priority, boolean requiresCleanup) {
this(basePathURI, priority, DEFAULT_MIN_AVAILABLE_STORAGE_MiB, DEFAULT_MIN_AVAILABLE_STORAGE_PERCENT, requiresCleanup);
}

public IvaratorCacheDirConfig(String basePathURI, int priority) {
this(basePathURI, priority, DEFAULT_MIN_AVAILABLE_STORAGE_MiB, DEFAULT_MIN_AVAILABLE_STORAGE_PERCENT);
this(basePathURI, priority, DEFAULT_MIN_AVAILABLE_STORAGE_MiB, DEFAULT_MIN_AVAILABLE_STORAGE_PERCENT, true);
}

public IvaratorCacheDirConfig(String basePathURI, int priority, long minAvailableStorageMiB) {
this(basePathURI, priority, minAvailableStorageMiB, DEFAULT_MIN_AVAILABLE_STORAGE_PERCENT);
this(basePathURI, priority, minAvailableStorageMiB, DEFAULT_MIN_AVAILABLE_STORAGE_PERCENT, true);
}

public IvaratorCacheDirConfig(String basePathURI, int priority, double minAvailableStoragePercent) {
this(basePathURI, priority, DEFAULT_MIN_AVAILABLE_STORAGE_MiB, minAvailableStoragePercent);
this(basePathURI, priority, DEFAULT_MIN_AVAILABLE_STORAGE_MiB, minAvailableStoragePercent, true);
}

private IvaratorCacheDirConfig(String basePathURI, int priority, long minAvailableStorageMiB, double minAvailableStoragePercent) {
private IvaratorCacheDirConfig(String basePathURI, int priority, long minAvailableStorageMiB, double minAvailableStoragePercent, boolean requiresCleanup) {
this.basePathURI = basePathURI;
this.priority = priority;
this.minAvailableStorageMiB = minAvailableStorageMiB;
this.minAvailableStoragePercent = minAvailableStoragePercent;
this.requiresCleanup = requiresCleanup;
}

@JsonIgnore
Expand Down Expand Up @@ -122,6 +134,10 @@ public void setMinAvailableStoragePercent(double minAvailableStoragePercent) {
this.minAvailableStoragePercent = minAvailableStoragePercent;
}

public boolean isRequiresCleanup() {
return requiresCleanup;
}

public static String toJson(IvaratorCacheDirConfig ivaratorCacheDirConfig) throws JsonProcessingException {
return toJson(Collections.singletonList(ivaratorCacheDirConfig));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1035,30 +1035,38 @@ public Object visit(ASTNumberLiteral node, Object o) {
}

/**
* Build a list of potential hdfs directories based on each ivarator cache dir configs.
* Build a list of potential directories based on each ivarator cache dir configs. This will take the ivarator base directories and create a subdirectory
* consisting of
* <ul>
* <li>the queryid</li>
* <li>scan id</li>
* <li>ivaratorCacheSubDirPrefix (base on start key)</li>
* <li>"term" + the term index</li>
* </ul>
*
* @return A path
* @return A list of ivarator cache directors specific to this ivarator instance
* @throws IOException
* for issues with read/write
*/
private List<IvaratorCacheDir> getIvaratorCacheDirs() throws IOException {
List<IvaratorCacheDir> pathAndFs = new ArrayList<>();

// first lets increment the count for a unique subdirectory
String subdirectory = ivaratorCacheSubDirPrefix + "term" + Integer.toString(++ivaratorCount);
StringBuilder subDirectoryBuilder = new StringBuilder();
subDirectoryBuilder.append(queryId);
if (scanId == null) {
log.warn("Running query iterator for " + queryId + " without a scan id. This could cause ivarator directory conflicts.");
} else {
subDirectoryBuilder.append('_').append(scanId);
}
ivakegg marked this conversation as resolved.
Show resolved Hide resolved
// and lets increment the count for a unique subdirectory
subDirectoryBuilder.append('_').append(ivaratorCacheSubDirPrefix).append("term").append(++ivaratorCount);
String subDirectory = subDirectoryBuilder.toString();

if (ivaratorCacheDirConfigs != null && !ivaratorCacheDirConfigs.isEmpty()) {
for (IvaratorCacheDirConfig config : ivaratorCacheDirConfigs) {

// first, make sure the cache configuration is valid
// make sure the cache configuration is valid
if (config.isValid()) {
Path path = new Path(config.getBasePathURI(), queryId);
if (scanId == null) {
log.warn("Running query iterator for " + queryId + " without a scan id. This could cause ivarator directory conflicts.");
} else {
path = new Path(path, scanId);
}
path = new Path(path, subdirectory);
Path path = new Path(config.getBasePathURI(), subDirectory);
URI uri = path.toUri();
pathAndFs.add(new IvaratorCacheDir(config, hdfsFileSystem.getFileSystem(uri), uri.toString()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public void testMaxValueRangeMutiHdfsLocations() throws Exception {
// expected
}

ivaratorConfig(3, false);
ivaratorConfig(3, false, true);
runTest(query, query);
parsePlan(VALUE_THRESHOLD_JEXL_NODE, 1);
}
Expand Down
Loading
Loading