Skip to content

HADOOP-19554. LocalDirAllocator still doesn't always recover from directory deletion #7651

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Unstable
public class LocalDirAllocator {


static final String E_NO_SPACE_AVAILABLE =
"No space available in any of the local directories";

//A Map from the config item names like "mapred.local.dir"
//to the instance of the AllocatorPerContext. This
//is a static object to make sure there exists exactly one instance per JVM
Expand Down Expand Up @@ -393,6 +396,8 @@ int getCurrentDirectoryIndex() {
*/
public Path getLocalPathForWrite(String pathStr, long size,
Configuration conf, boolean checkWrite) throws IOException {
LOG.debug("searchng for directory for file at {}, size = {}; checkWrite={}",
pathStr, size, checkWrite);
Context ctx = confChanged(conf);
int numDirs = ctx.localDirs.length;
int numDirsSearched = 0;
Expand All @@ -406,27 +411,40 @@ public Path getLocalPathForWrite(String pathStr, long size,
pathStr = pathStr.substring(1);
}
Path returnPath = null;

if(size == SIZE_UNKNOWN) { //do roulette selection: pick dir with probability
//proportional to available size
long[] availableOnDisk = new long[ctx.dirDF.length];
long totalAvailable = 0;

//build the "roulette wheel"
for(int i =0; i < ctx.dirDF.length; ++i) {
final DF target = ctx.dirDF[i];
// attempt to recreate the dir so that getAvailable() is valid
// if it fails, getAvailable() will return 0, so the dir will
// be declared unavailable.
// return value is logged at debug to keep spotbugs quiet.
final boolean b = new File(target.getDirPath()).mkdirs();
LOG.debug("mkdirs of {}={}", target, b);
availableOnDisk[i] = target.getAvailable();
totalAvailable += availableOnDisk[i];

final int dirCount = ctx.dirDF.length;
long[] availableOnDisk = new long[dirCount];
long totalAvailable = 0;

StringBuilder pathNames = new StringBuilder();

//build the "roulette wheel"
for (int i =0; i < dirCount; ++i) {
final DF target = ctx.dirDF[i];
// attempt to recreate the dir so that getAvailable() is valid
// if it fails, getAvailable() will return 0, so the dir will
// be declared unavailable.
// return value is logged at debug to keep spotbugs quiet.
final String name = target.getDirPath();
pathNames.append(" ").append(name);
final File dirPath = new File(name);
if (!dirPath.exists()) {
LOG.debug("creating buffer dir {}}", name);
dirPath.mkdirs();
}
availableOnDisk[i] = target.getAvailable();
totalAvailable += availableOnDisk[i];
}

LOG.debug("Directory count is {}; total available capacity is {}",
dirCount, totalAvailable);

if (totalAvailable == 0){
throw new DiskErrorException("No space available in any of the local directories.");
if (size == SIZE_UNKNOWN) { //do roulette selection: pick dir with probability
//proportional to available size
LOG.debug("Size not specified, so picking at random");

if (totalAvailable == 0) {
throw new DiskErrorException(E_NO_SPACE_AVAILABLE + pathNames);
}

// Keep rolling the wheel till we get a valid path
Expand All @@ -439,14 +457,19 @@ public Path getLocalPathForWrite(String pathStr, long size,
dir++;
}
ctx.dirNumLastAccessed.set(dir);
returnPath = createPath(ctx.localDirs[dir], pathStr, checkWrite);
final Path localDir = ctx.localDirs[dir];
returnPath = createPath(localDir, pathStr, checkWrite);
if (returnPath == null) {
totalAvailable -= availableOnDisk[dir];
availableOnDisk[dir] = 0; // skip this disk
numDirsSearched++;
LOG.debug("No capacity in {}", localDir);
} else {
LOG.debug("Allocated file {} in {}", returnPath, localDir);
}
}
} else {
LOG.debug("Size is {}; searching", size);
// Start linear search with random increment if possible
int randomInc = 1;
if (numDirs > 2) {
Expand All @@ -459,17 +482,20 @@ public Path getLocalPathForWrite(String pathStr, long size,
maxCapacity = capacity;
}
if (capacity > size) {
final Path localDir = ctx.localDirs[dirNum];
try {
returnPath = createPath(ctx.localDirs[dirNum], pathStr,
checkWrite);
returnPath = createPath(localDir, pathStr, checkWrite);
} catch (IOException e) {
errorText = e.getMessage();
diskException = e;
LOG.debug("DiskException caught for dir {}", ctx.localDirs[dirNum], e);
LOG.debug("DiskException caught for dir {}", localDir, e);
}
if (returnPath != null) {
ctx.getAndIncrDirNumLastAccessed(numDirsSearched);
LOG.debug("Allocated file {} in {}", returnPath, localDir);
break;
} else {
LOG.debug("No capacity in {}", localDir);
}
}
dirNum++;
Expand All @@ -484,7 +510,9 @@ public Path getLocalPathForWrite(String pathStr, long size,
//no path found
String newErrorText = "Could not find any valid local directory for " +
pathStr + " with requested size " + size +
" as the max capacity in any directory is " + maxCapacity;
" as the max capacity in any directory"
+ " (" + pathNames + " )"
+ " is " + maxCapacity;
if (errorText != null) {
newErrorText = newErrorText + " due to " + errorText;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@
import java.util.NoSuchElementException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Shell;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import static org.apache.hadoop.fs.LocalDirAllocator.E_NO_SPACE_AVAILABLE;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -564,13 +566,8 @@ public void testGetLocalPathForWriteForInvalidPaths(String paramRoot, String par
throws Exception {
initTestLocalDirAllocator(paramRoot, paramPrefix);
conf.set(CONTEXT, " ");
try {
dirAllocator.getLocalPathForWrite("/test", conf);
fail("not throwing the exception");
} catch (IOException e) {
assertEquals("No space available in any of the local directories.",
e.getMessage(), "Incorrect exception message");
}
intercept(IOException.class, E_NO_SPACE_AVAILABLE, () ->
dirAllocator.getLocalPathForWrite("/test", conf));
}

/**
Expand All @@ -587,10 +584,13 @@ public void testGetLocalPathForWriteForLessSpace(String paramRoot, String paramP
String dir0 = buildBufferDir(root, 0);
String dir1 = buildBufferDir(root, 1);
conf.set(CONTEXT, dir0 + "," + dir1);
LambdaTestUtils.intercept(DiskErrorException.class,
final DiskErrorException ex = intercept(DiskErrorException.class,
String.format("Could not find any valid local directory for %s with requested size %s",
"p1/x", Long.MAX_VALUE - 1), "Expect a DiskErrorException.",
() -> dirAllocator.getLocalPathForWrite("p1/x", Long.MAX_VALUE - 1, conf));
Assertions.assertThat(ex.getMessage())
.contains(new File(dir0).getName())
.contains(new File(dir1).getName());
}

/**
Expand All @@ -614,5 +614,31 @@ public void testDirectoryRecovery(String paramRoot, String paramPrefix) throws T
// and expect to get a new file back
dirAllocator.getLocalPathForWrite("file2", -1, conf);
}


/**
* Test for HADOOP-19554. LocalDirAllocator still doesn't always recover
* from directory tree deletion.
*/
@Timeout(value = 30)
@MethodSource("params")
@ParameterizedTest
public void testDirectoryRecoveryKnownSize(String paramRoot, String paramPrefix) throws Throwable {
initTestLocalDirAllocator(paramRoot, paramPrefix);
String dir0 = buildBufferDir(root, 0);
String subdir = dir0 + "/subdir1/subdir2";

conf.set(CONTEXT, subdir);
// get local path and an ancestor
final Path pathForWrite = dirAllocator.getLocalPathForWrite("file", 512, conf);
final Path ancestor = pathForWrite.getParent().getParent();

// delete that ancestor
localFs.delete(ancestor, true);
// and expect to get a new file back
dirAllocator.getLocalPathForWrite("file2", -1, conf);
}


}

Loading