Skip to content
Snippets Groups Projects
Commit 6be42510 authored by tpylak's avatar tpylak
Browse files

DMV-32 FileStoreRemoteMounted.delete() should be monitored on a per-file basis

SVN: 7140
parent b14aa61c
No related branches found
No related tags found
No related merge requests found
......@@ -64,24 +64,30 @@ public final class DataMover
@Private
static final String PROCESS_MARKER_PREFIX = Constants.MARKER_PREFIX + "thread_";
private static final String TEMPLATE = PROCESS_MARKER_PREFIX + "%s_processing";
private static final String PROCESSING_MARKER_TEMPLATE =
PROCESS_MARKER_PREFIX + "%s_processing";
@Private
static final String INCOMING_PROCESS_MARKER_FILENAME = String.format(TEMPLATE, "incoming");
static final String INCOMING_PROCESS_MARKER_FILENAME =
String.format(PROCESSING_MARKER_TEMPLATE, "incoming");
@Private
static final String OUTGOING_PROCESS_MARKER_FILENAME = String.format(TEMPLATE, "outgoing");
static final String OUTGOING_PROCESS_MARKER_FILENAME =
String.format(PROCESSING_MARKER_TEMPLATE, "outgoing");
@Private
static final String LOCAL_PROCESS_MARKER_FILENAME = String.format(TEMPLATE, "local");
static final String LOCAL_PROCESS_MARKER_FILENAME =
String.format(PROCESSING_MARKER_TEMPLATE, "local");
@Private
static final String RECOVERY_PROCESS_MARKER_FILENAME = String.format(TEMPLATE, "recovery");
static final String RECOVERY_PROCESS_MARKER_FILENAME =
String.format(PROCESSING_MARKER_TEMPLATE, "recovery");
/**
* This marker file indicates that we are in a <i>shutdown</i> mode, started by the program.
*/
static final String SHUTDOWN_PROCESS_MARKER_FILENAME = String.format(TEMPLATE, "shutdown");
static final String SHUTDOWN_PROCESS_MARKER_FILENAME =
String.format(PROCESSING_MARKER_TEMPLATE, "shutdown");
private static final String[] PROCESS_MARKER_FILENAMES =
{ INCOMING_PROCESS_MARKER_FILENAME, OUTGOING_PROCESS_MARKER_FILENAME,
......
......@@ -17,13 +17,26 @@
package ch.systemsx.cisd.datamover.filesystem;
import java.io.File;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.commons.lang.time.DurationFormatUtils;
import org.apache.log4j.Logger;
import ch.rinn.restrictions.Private;
import ch.systemsx.cisd.common.Constants;
import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities;
import ch.systemsx.cisd.common.concurrent.ExecutionResult;
import ch.systemsx.cisd.common.concurrent.InactivityMonitor;
import ch.systemsx.cisd.common.concurrent.NamingThreadPoolExecutor;
import ch.systemsx.cisd.common.concurrent.InactivityMonitor.IActivitySensor;
import ch.systemsx.cisd.common.concurrent.InactivityMonitor.IInactivityObserver;
import ch.systemsx.cisd.common.exceptions.Status;
import ch.systemsx.cisd.common.logging.LogCategory;
import ch.systemsx.cisd.common.logging.LogFactory;
import ch.systemsx.cisd.common.utilities.FileUtilities;
import ch.systemsx.cisd.common.utilities.FileUtilities.SimpleActivityObserver;
import ch.systemsx.cisd.datamover.filesystem.intf.IPathRemover;
/**
......@@ -35,12 +48,18 @@ import ch.systemsx.cisd.datamover.filesystem.intf.IPathRemover;
*/
final class RetryingPathRemover implements IPathRemover
{
private static final long DELETE_ONE_FILE_TIMEOUT_MILLIS =
Constants.MILLIS_TO_WAIT_BEFORE_TIMEOUT;
private static final Logger operationLog =
LogFactory.getLogger(LogCategory.OPERATION, RetryingPathMover.class);
private static final Logger notificationLog =
LogFactory.getLogger(LogCategory.NOTIFY, RetryingPathMover.class);
private final static ExecutorService executor =
new NamingThreadPoolExecutor("Deletion Thread").daemonize();
private final int maxRetriesOnFailure;
private final long millisToSleepOnFailure;
......@@ -66,7 +85,7 @@ final class RetryingPathRemover implements IPathRemover
boolean deletionOK = false;
while (true)
{
deletionOK = FileUtilities.deleteRecursively(path);
deletionOK = deleteAndMonitor(path);
if (deletionOK)
{
break;
......@@ -104,4 +123,83 @@ final class RetryingPathRemover implements IPathRemover
return Status.OK;
}
}
@Private
static class DeleteActivityDetector implements IActivitySensor, SimpleActivityObserver
{
private volatile long lastActivityTime = System.currentTimeMillis();
private final File path;
public DeleteActivityDetector(File path)
{
this.path = path;
}
// called each time when one file gets deleted
synchronized public void update()
{
lastActivityTime = System.currentTimeMillis();
}
synchronized public String describeInactivity(long now)
{
return "No delete activity of path " + path.getPath() + " for "
+ DurationFormatUtils.formatDurationHMS(now - lastActivityTime);
}
synchronized public long getTimeOfLastActivityMoreRecentThan(long thresholdMillis)
{
return lastActivityTime;
}
}
// if there is no progress during deletion, it will be stopped
private boolean deleteAndMonitor(final File path)
{
final DeleteActivityDetector sensor = new DeleteActivityDetector(path);
Callable<Boolean> deleteCallable = new Callable<Boolean>()
{
public Boolean call() throws Exception
{
return FileUtilities.deleteRecursively(path, null, sensor);
}
};
return executeAndMonitor(sensor, deleteCallable, DELETE_ONE_FILE_TIMEOUT_MILLIS);
}
@Private
Boolean executeAndMonitor(final IActivitySensor sensor, final Callable<Boolean> deleteCallable,
final long inactivityThresholdMillis)
{
final Future<Boolean> deleteFuture = executor.submit(deleteCallable);
IInactivityObserver inactivityObserver = new IInactivityObserver()
{
// called when inactivity took longer than a timeout
public void update(long inactiveSinceMillis, String descriptionOfInactivity)
{
operationLog.error(descriptionOfInactivity);
deleteFuture.cancel(true);
}
};
InactivityMonitor inactivityMonitor =
new InactivityMonitor(sensor, inactivityObserver, inactivityThresholdMillis, true);
ExecutionResult<Boolean> executionResult =
ConcurrencyUtilities.getResult(deleteFuture, ConcurrencyUtilities.NO_TIMEOUT);
inactivityMonitor.stop();
Boolean result = executionResult.tryGetResult();
if (result != null)
{
return result.booleanValue();
} else
{
operationLog.error("Operation terminated with an error status: "
+ executionResult.getStatus());
return false;
}
}
}
\ No newline at end of file
......@@ -84,12 +84,8 @@ public final class FileStoreRemoteMounted extends AbstractFileStore
public final Status delete(final StoreItem item)
{
final Status statusOrNull = localImplMonitored.delete(item);
if (statusOrNull == null)
{
return Status.createRetriableError("Could not delete '" + item + "': time out.");
}
return statusOrNull;
// we do not run delete with a timeout
return localImpl.delete(item);
}
public final BooleanStatus exists(final StoreItem item)
......@@ -103,12 +99,14 @@ public final class FileStoreRemoteMounted extends AbstractFileStore
return statusOrNull;
}
public final StatusWithResult<Long> lastChanged(final StoreItem item, final long stopWhenFindYounger)
public final StatusWithResult<Long> lastChanged(final StoreItem item,
final long stopWhenFindYounger)
{
final StatusWithResult<Long> statusOrNull = localImplMonitored.lastChanged(item, stopWhenFindYounger);
final StatusWithResult<Long> statusOrNull =
localImplMonitored.lastChanged(item, stopWhenFindYounger);
if (statusOrNull == null)
{
return StatusWithResult.<Long>createError(String.format(
return StatusWithResult.<Long> createError(String.format(
"Could not determine \"last changed time\" of %s: time out.", item));
}
return statusOrNull;
......@@ -121,7 +119,7 @@ public final class FileStoreRemoteMounted extends AbstractFileStore
localImplMonitored.lastChangedRelative(item, stopWhenFindYoungerRelative);
if (statusOrNull == null)
{
return StatusWithResult.<Long>createError(String.format(
return StatusWithResult.<Long> createError(String.format(
"Could not determine \"last changed time\" of %s: time out.", item));
}
return statusOrNull;
......
/*
* Copyright 2008 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.datamover.filesystem;
import java.io.File;
import java.util.concurrent.Callable;
import org.testng.AssertJUnit;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import ch.rinn.restrictions.Friend;
import ch.systemsx.cisd.common.logging.LogInitializer;
import ch.systemsx.cisd.datamover.filesystem.RetryingPathRemover.DeleteActivityDetector;
/**
* @author Tomasz Pylak
*/
@Friend(toClasses = RetryingPathRemover.class)
public class RetryingPathRemoverTest
{
@BeforeClass
public void init()
{
LogInitializer.init();
}
@Test
public void testDeleteAndMonitorHappyCase()
{
testDeleteAndMonitor(false);
}
@Test
public void testDeleteAndMonitorBlockedFail()
{
testDeleteAndMonitor(true);
}
public void testDeleteAndMonitor(final boolean shouldNotBlock)
{
RetryingPathRemover remover = new RetryingPathRemover(0, 0);
final long timeout = 20;
final DeleteActivityDetector sensor = new DeleteActivityDetector(new File("."));
Callable<Boolean> callable = new Callable<Boolean>()
{
public Boolean call() throws Exception
{
for (int i = 0; i < 20; i++)
{
Thread.sleep(timeout / 2); // simulates delete
sensor.update();
}
if (shouldNotBlock == false)
{
Thread.sleep(timeout * 4); // simulates blocked delete
AssertJUnit.fail("operation should be killed till this point");
}
return true;
}
};
Boolean result = remover.executeAndMonitor(sensor, callable, timeout);
AssertJUnit.assertEquals(shouldNotBlock, result.booleanValue());
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment