Skip to content
Snippets Groups Projects
Commit dcd8e973 authored by tpylak's avatar tpylak
Browse files

DMV-21 error handling in IFileStore for exists() and lastChanged*()

SVN: 6913
parent 5c8ce1c9
No related branches found
No related tags found
No related merge requests found
Showing
with 374 additions and 178 deletions
...@@ -24,19 +24,11 @@ package ch.systemsx.cisd.datamover.filesystem.intf; ...@@ -24,19 +24,11 @@ package ch.systemsx.cisd.datamover.filesystem.intf;
*/ */
public class BooleanStatus public class BooleanStatus
{ {
private final Boolean result; private final ResultStatus<Boolean> result;
// if true the result is unavailable
private final boolean errorOccurred;
// can be used not only in case of errors
private final String messageOrNull;
private BooleanStatus(Boolean result, boolean errorOccurred, String messageOrNull) private BooleanStatus(Boolean result, boolean errorOccurred, String messageOrNull)
{ {
this.result = result; this.result = new ResultStatus<Boolean>(result, errorOccurred, messageOrNull);
this.errorOccurred = errorOccurred;
this.messageOrNull = messageOrNull;
} }
public static final BooleanStatus createTrue() public static final BooleanStatus createTrue()
...@@ -76,7 +68,7 @@ public class BooleanStatus ...@@ -76,7 +68,7 @@ public class BooleanStatus
/** has operation finished with an error? */ /** has operation finished with an error? */
public boolean isError() public boolean isError()
{ {
return errorOccurred; return result.isError();
} }
/** /**
...@@ -86,13 +78,12 @@ public class BooleanStatus ...@@ -86,13 +78,12 @@ public class BooleanStatus
*/ */
public boolean getResult() public boolean getResult()
{ {
assert isError() == false : "Operation failed, there is no result"; return result.getResult();
return result;
} }
/** @return the message associated with the result or an error message */ /** @return the message associated with the result or an error message */
public String tryGetMessage() public String tryGetMessage()
{ {
return messageOrNull; return result.tryGetMessage();
} }
} }
...@@ -69,10 +69,9 @@ public interface IFileStore extends ISelfTestable ...@@ -69,10 +69,9 @@ public interface IFileStore extends ISelfTestable
* parameter. Supposed to be used when one does not care about the absolutely * parameter. Supposed to be used when one does not care about the absolutely
* youngest entry, but only, if there are entries that are "young enough". * youngest entry, but only, if there are entries that are "young enough".
* @return The time (in milliseconds since the start of the epoch) when <var>resource</var> was * @return The time (in milliseconds since the start of the epoch) when <var>resource</var> was
* last changed. * last changed or error status if checking failed.
* @throws UnknownLastChangedException when reading modification time failed
*/ */
public long lastChanged(StoreItem item, long stopWhenFindYounger); public NumberStatus lastChanged(StoreItem item, long stopWhenFindYounger);
/** /**
* Returns the last time when there was a write access to <var>item</var>. * Returns the last time when there was a write access to <var>item</var>.
...@@ -82,10 +81,9 @@ public interface IFileStore extends ISelfTestable ...@@ -82,10 +81,9 @@ public interface IFileStore extends ISelfTestable
* stopped when a file or directory is found that is younger than * stopped when a file or directory is found that is younger than
* <code>System.currentTimeMillis() - stopWhenYoungerRelative</code>. * <code>System.currentTimeMillis() - stopWhenYoungerRelative</code>.
* @return The time (in milliseconds since the start of the epoch) when <var>resource</var> was * @return The time (in milliseconds since the start of the epoch) when <var>resource</var> was
* last changed. * last changed or error status if checking failed.
* @throws UnknownLastChangedException when reading modification time failed
*/ */
public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative); public NumberStatus lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative);
/** /**
* List files in the scanned store. Sort in order of "oldest first". * List files in the scanned store. Sort in order of "oldest first".
......
...@@ -17,16 +17,51 @@ ...@@ -17,16 +17,51 @@
package ch.systemsx.cisd.datamover.filesystem.intf; package ch.systemsx.cisd.datamover.filesystem.intf;
/** /**
* Used to signal the error of getting last modification time of a store item * A class that holds the information about the result of an operation which is a number. There is a
* way to find out if an error occurred during operation execution, the result is unavailable then.
* *
* @author Tomasz Pylak * @author Tomasz Pylak
*/ */
public class UnknownLastChangedException extends RuntimeException public class NumberStatus
{ {
private static final long serialVersionUID = 1L; private final ResultStatus<Long> result;
public UnknownLastChangedException(String errorMsg) private NumberStatus(Long result, boolean errorOccurred, String messageOrNull)
{ {
super(errorMsg); this.result = new ResultStatus<Long>(result, errorOccurred, messageOrNull);
}
public static final NumberStatus create(long result)
{
return new NumberStatus(result, false, null);
}
public static final NumberStatus createError(String message)
{
return new NumberStatus(null, true, message);
}
public static final NumberStatus createError()
{
return new NumberStatus(null, true, null);
}
/**
* can be called only if no error occurred, otherwise it fails.
*/
public long getResult()
{
return result.getResult();
}
/** has operation finished with an error? */
public boolean isError()
{
return result.isError();
}
public String tryGetMessage()
{
return result.tryGetMessage();
} }
} }
/*
* Copyright 2008 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.datamover.filesystem.intf;
/**
* A class that holds the information about the status and the result of an operation. To be used
* whenever a failure of an operation is signaled back via a return value rather than an exception.
*
* @author Tomasz Pylak
*/
public class ResultStatus<T>
{
private final T result;
// if true the result is unavailable
private final boolean errorOccurred;
// can be used not only in case of errors
private final String messageOrNull;
public static <T> ResultStatus<T> createError()
{
return new ResultStatus<T>(null, true, null);
}
public static <T> ResultStatus<T> createError(String message)
{
return new ResultStatus<T>(null, true, message);
}
public static <T> ResultStatus<T> createResult(T result)
{
return new ResultStatus<T>(result, false, null);
}
public static <T> ResultStatus<T> createResult(T result, String message)
{
return new ResultStatus<T>(result, false, message);
}
protected ResultStatus(T result, boolean errorOccurred, String messageOrNull)
{
this.result = result;
this.errorOccurred = errorOccurred;
this.messageOrNull = messageOrNull;
}
/**
* can be called only if no error occurred
*
* @return result of an operation
*/
public T getResult()
{
assert isError() == false : "Operation failed, there is no result";
return result;
}
/** @return message associated with the result or an error if there is any */
public String tryGetMessage()
{
return messageOrNull;
}
/** has operation finished with an error? */
public boolean isError()
{
return errorOccurred;
}
}
...@@ -26,6 +26,7 @@ import org.apache.log4j.Logger; ...@@ -26,6 +26,7 @@ import org.apache.log4j.Logger;
import ch.rinn.restrictions.Private; import ch.rinn.restrictions.Private;
import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities; import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities;
import ch.systemsx.cisd.common.concurrent.ExecutionResult;
import ch.systemsx.cisd.common.concurrent.NamingThreadPoolExecutor; import ch.systemsx.cisd.common.concurrent.NamingThreadPoolExecutor;
import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel; import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel;
import ch.systemsx.cisd.common.logging.ISimpleLogger; import ch.systemsx.cisd.common.logging.ISimpleLogger;
...@@ -36,7 +37,7 @@ import ch.systemsx.cisd.common.utilities.ITerminable; ...@@ -36,7 +37,7 @@ import ch.systemsx.cisd.common.utilities.ITerminable;
import ch.systemsx.cisd.common.utilities.StoreItem; import ch.systemsx.cisd.common.utilities.StoreItem;
import ch.systemsx.cisd.datamover.filesystem.intf.BooleanStatus; import ch.systemsx.cisd.datamover.filesystem.intf.BooleanStatus;
import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore; import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore;
import ch.systemsx.cisd.datamover.filesystem.intf.UnknownLastChangedException; import ch.systemsx.cisd.datamover.filesystem.intf.NumberStatus;
import ch.systemsx.cisd.datamover.intf.ITimingParameters; import ch.systemsx.cisd.datamover.intf.ITimingParameters;
/** /**
...@@ -124,8 +125,7 @@ public class CopyActivityMonitor ...@@ -124,8 +125,7 @@ public class CopyActivityMonitor
@Private @Private
static interface IFileStoreMonitor static interface IFileStoreMonitor
{ {
// returns 0 when an error or timeout occurs during the check NumberStatus lastChangedRelative(StoreItem item, long stopWhenYoungerThan);
long lastChanged(StoreItem item, long stopWhenYoungerThan);
BooleanStatus exists(StoreItem item); BooleanStatus exists(StoreItem item);
...@@ -166,9 +166,9 @@ public class CopyActivityMonitor ...@@ -166,9 +166,9 @@ public class CopyActivityMonitor
{ {
final private long timeChecked; final private long timeChecked;
final private long timeOfLastModification; final private NumberStatus timeOfLastModification;
public PathCheckRecord(final long timeChecked, final long timeLastChanged) public PathCheckRecord(final long timeChecked, final NumberStatus timeLastChanged)
{ {
this.timeChecked = timeChecked; this.timeChecked = timeChecked;
this.timeOfLastModification = timeLastChanged; this.timeOfLastModification = timeLastChanged;
...@@ -185,7 +185,7 @@ public class CopyActivityMonitor ...@@ -185,7 +185,7 @@ public class CopyActivityMonitor
/** /**
* The newest last modification time found during the check. * The newest last modification time found during the check.
*/ */
public long getTimeOfLastModification() public NumberStatus getTimeOfLastModification()
{ {
return timeOfLastModification; return timeOfLastModification;
} }
...@@ -195,7 +195,8 @@ public class CopyActivityMonitor ...@@ -195,7 +195,8 @@ public class CopyActivityMonitor
{ {
return new IFileStoreMonitor() return new IFileStoreMonitor()
{ {
public long lastChanged(StoreItem item, long stopWhenFindYoungerRelative) public NumberStatus lastChangedRelative(StoreItem item,
long stopWhenFindYoungerRelative)
{ {
return destinationStore.lastChangedRelative(item, stopWhenFindYoungerRelative); return destinationStore.lastChangedRelative(item, stopWhenFindYoungerRelative);
} }
...@@ -298,7 +299,7 @@ public class CopyActivityMonitor ...@@ -298,7 +299,7 @@ public class CopyActivityMonitor
return false; return false;
} else } else
{ {
final boolean oldIsUnknown = (lastCheckOrNull.getTimeOfLastModification() == 0); final boolean oldIsUnknown = lastCheckOrNull.getTimeOfLastModification().isError();
// no need to check yet // no need to check yet
if (now - lastCheckOrNull.getTimeChecked() < quietPeriodMillis) if (now - lastCheckOrNull.getTimeChecked() < quietPeriodMillis)
{ {
...@@ -316,7 +317,9 @@ public class CopyActivityMonitor ...@@ -316,7 +317,9 @@ public class CopyActivityMonitor
return true; return true;
} else } else
{ {
return checkIfModifiedAndSet(now); long prevModificationTime =
lastCheckOrNull.getTimeOfLastModification().getResult();
return checkIfUnmodifiedAndSet(now, prevModificationTime);
} }
} }
} }
...@@ -328,11 +331,11 @@ public class CopyActivityMonitor ...@@ -328,11 +331,11 @@ public class CopyActivityMonitor
{ {
if (lastCheckOrNull == null) if (lastCheckOrNull == null)
{ {
lastCheckOrNull = new PathCheckRecord(now, 0); lastCheckOrNull = new PathCheckRecord(now, NumberStatus.createError());
return false; return false;
} else } else
{ {
if (lastCheckOrNull.getTimeOfLastModification() != 0) if (lastCheckOrNull.getTimeOfLastModification().isError() == false)
{ {
operationLog.warn(String.format( operationLog.warn(String.format(
"File or directory '%s' has vanished from '%s'.", itemToBeCopied, "File or directory '%s' has vanished from '%s'.", itemToBeCopied,
...@@ -347,14 +350,13 @@ public class CopyActivityMonitor ...@@ -347,14 +350,13 @@ public class CopyActivityMonitor
} }
} }
// check if item has been modified since last check by comparing its current modification // check if item has been unmodified ("quite") since last check by comparing its current
// time to the one acquired in the past // modification time to the one acquired in the past.
private boolean checkIfModifiedAndSet(long now) private boolean checkIfUnmodifiedAndSet(long now, long prevModificationTime)
{ {
final long prevModificationTime = lastCheckOrNull.getTimeOfLastModification(); final NumberStatus newModificationTime = lastChanged(itemToBeCopied);
final long newModificationTime = lastChanged(itemToBeCopied, prevModificationTime); if (newModificationTime.isError() == false
boolean newIsKnown = (newModificationTime != 0); && newModificationTime.getResult() != prevModificationTime)
if (newIsKnown && newModificationTime != prevModificationTime)
{ {
lastCheckOrNull = new PathCheckRecord(now, newModificationTime); lastCheckOrNull = new PathCheckRecord(now, newModificationTime);
return false; return false;
...@@ -366,22 +368,26 @@ public class CopyActivityMonitor ...@@ -366,22 +368,26 @@ public class CopyActivityMonitor
private void setFirstModificationDate(final long timeChecked) private void setFirstModificationDate(final long timeChecked)
{ {
long lastChanged = lastChanged(itemToBeCopied, 0L); // 0 if error NumberStatus lastChanged = lastChanged(itemToBeCopied);
lastCheckOrNull = new PathCheckRecord(timeChecked, lastChanged); lastCheckOrNull = new PathCheckRecord(timeChecked, lastChanged);
} }
} }
private long lastChanged(StoreItem item, long previousCheck) private NumberStatus lastChanged(StoreItem item)
{ {
final Long lastChanged = tryLastChanged(destinationStore, item); final NumberStatus lastChanged = lastChanged(destinationStore, item);
if (operationLog.isTraceEnabled() && lastChanged != null) if (lastChanged.isError())
{
operationLog.error(lastChanged.tryGetMessage());
} else if (operationLog.isTraceEnabled())
{ {
String msgTemplate = String msgTemplate =
"Checker reported last changed time of '%s' inside '%s' to be %3$tF %3$tT."; "Checker reported last changed time of '%s' inside '%s' to be %3$tF %3$tT.";
String msg = String.format(msgTemplate, item, destinationStore, lastChanged); String msg =
String.format(msgTemplate, item, destinationStore, lastChanged.getResult());
operationLog.trace(msg); operationLog.trace(msg);
} }
return (lastChanged != null) ? lastChanged : 0; return lastChanged;
} }
private long minusSafetyMargin(long period) private long minusSafetyMargin(long period)
...@@ -389,69 +395,51 @@ public class CopyActivityMonitor ...@@ -389,69 +395,51 @@ public class CopyActivityMonitor
return Math.max(0L, period - 1000L); return Math.max(0L, period - 1000L);
} }
private Long tryLastChanged(IFileStoreMonitor store, StoreItem item) private NumberStatus lastChanged(IFileStoreMonitor store, StoreItem item)
{ {
long stopWhenFindYoungerRelative = minusSafetyMargin(inactivityPeriodMillis); long stopWhenFindYoungerRelative = minusSafetyMargin(inactivityPeriodMillis);
final long timeoutMillis = Math.min(checkIntervallMillis * 3, inactivityPeriodMillis); final long timeoutMillis = Math.min(checkIntervallMillis * 3, inactivityPeriodMillis);
final ISimpleLogger simpleMachineLog = new Log4jSimpleLogger(machineLog); final ISimpleLogger simpleMachineLog = new Log4jSimpleLogger(machineLog);
final Future<Long> lastChangedFuture = final Future<NumberStatus> lastChangedFuture =
lastChangedExecutor.submit(createCheckerCallable(store, item, lastChangedExecutor.submit(createCheckerCallable(store, item,
stopWhenFindYoungerRelative)); stopWhenFindYoungerRelative));
try ExecutionResult<NumberStatus> executionResult =
ConcurrencyUtilities.getResult(lastChangedFuture, timeoutMillis, simpleMachineLog,
"Check for recent paths");
NumberStatus result = executionResult.tryGetResult();
if (result == null)
{ {
final Long lastChanged = return NumberStatus.createError(String.format(
ConcurrencyUtilities.getResult(lastChangedFuture, timeoutMillis, "Could not determine \"last changed time\" of %s: time out.", item));
simpleMachineLog, "Check for recent paths").tryGetResult(); } else
if (lastChanged == null)
{
operationLog.error(String.format(
"Could not determine \"last changed time\" of %s: time out.", item));
return null;
}
return lastChanged;
} catch (UnknownLastChangedException ex)
{ {
operationLog.error(String.format("Could not determine \"last changed time\" of %s: %s", return result;
item, ex));
return null;
} }
} }
private static Callable<Long> createCheckerCallable(final IFileStoreMonitor store, private static Callable<NumberStatus> createCheckerCallable(final IFileStoreMonitor store,
final StoreItem item, final long stopWhenYoungerThan) final StoreItem item, final long stopWhenYoungerThan)
{ {
return new Callable<Long>() return new Callable<NumberStatus>()
{ {
public Long call() throws Exception public NumberStatus call() throws Exception
{ {
if (machineLog.isTraceEnabled()) if (machineLog.isTraceEnabled())
{ {
machineLog machineLog
.trace("Starting quick check for recent paths on '" + item + "'."); .trace("Starting quick check for recent paths on '" + item + "'.");
} }
try final NumberStatus lastChanged =
{ store.lastChangedRelative(item, stopWhenYoungerThan);
final long lastChanged = store.lastChanged(item, stopWhenYoungerThan); if (machineLog.isTraceEnabled())
if (machineLog.isTraceEnabled())
{
machineLog
.trace(String
.format(
"Finishing quick check for recent paths on '%s', found to be %2$tF %2$tT.",
item, lastChanged));
}
return lastChanged;
} catch (RuntimeException ex)
{ {
if (machineLog.isTraceEnabled()) machineLog
{ .trace(String
final Throwable th = .format(
(ex instanceof CheckedExceptionTunnel) ? ex.getCause() : ex; "Finishing quick check for recent paths on '%s', found to be %2$tF %2$tT.",
machineLog.trace("Failed on quick check for recent paths on '" + item item, lastChanged));
+ "'.", th);
}
throw ex;
} }
return lastChanged;
} }
}; };
} }
......
...@@ -23,7 +23,6 @@ import org.apache.commons.io.FileUtils; ...@@ -23,7 +23,6 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.time.DateUtils; import org.apache.commons.lang.time.DateUtils;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel;
import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.exceptions.Status;
import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher;
import ch.systemsx.cisd.common.highwatermark.HostAwareFileWithHighwaterMark; import ch.systemsx.cisd.common.highwatermark.HostAwareFileWithHighwaterMark;
...@@ -32,6 +31,7 @@ import ch.systemsx.cisd.common.logging.LogCategory; ...@@ -32,6 +31,7 @@ import ch.systemsx.cisd.common.logging.LogCategory;
import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.logging.LogFactory;
import ch.systemsx.cisd.common.utilities.FileUtilities; import ch.systemsx.cisd.common.utilities.FileUtilities;
import ch.systemsx.cisd.common.utilities.StoreItem; import ch.systemsx.cisd.common.utilities.StoreItem;
import ch.systemsx.cisd.common.utilities.UnknownLastChangedException;
import ch.systemsx.cisd.datamover.common.MarkerFile; import ch.systemsx.cisd.datamover.common.MarkerFile;
import ch.systemsx.cisd.datamover.filesystem.intf.BooleanStatus; import ch.systemsx.cisd.datamover.filesystem.intf.BooleanStatus;
import ch.systemsx.cisd.datamover.filesystem.intf.FileStore; import ch.systemsx.cisd.datamover.filesystem.intf.FileStore;
...@@ -41,7 +41,7 @@ import ch.systemsx.cisd.datamover.filesystem.intf.IFileSysOperationsFactory; ...@@ -41,7 +41,7 @@ import ch.systemsx.cisd.datamover.filesystem.intf.IFileSysOperationsFactory;
import ch.systemsx.cisd.datamover.filesystem.intf.IPathMover; import ch.systemsx.cisd.datamover.filesystem.intf.IPathMover;
import ch.systemsx.cisd.datamover.filesystem.intf.IPathRemover; import ch.systemsx.cisd.datamover.filesystem.intf.IPathRemover;
import ch.systemsx.cisd.datamover.filesystem.intf.IStoreCopier; import ch.systemsx.cisd.datamover.filesystem.intf.IStoreCopier;
import ch.systemsx.cisd.datamover.filesystem.intf.UnknownLastChangedException; import ch.systemsx.cisd.datamover.filesystem.intf.NumberStatus;
/** /**
* An {@link IFileStore} implementation for local stores. * An {@link IFileStore} implementation for local stores.
...@@ -88,31 +88,41 @@ public class FileStoreLocal extends FileStore implements IExtendedFileStore ...@@ -88,31 +88,41 @@ public class FileStoreLocal extends FileStore implements IExtendedFileStore
return BooleanStatus.createFromBoolean(exists); return BooleanStatus.createFromBoolean(exists);
} }
public final long lastChanged(final StoreItem item, final long stopWhenFindYounger) public final NumberStatus lastChanged(final StoreItem item, final long stopWhenFindYounger)
{ {
try try
{ {
return FileUtilities.lastChanged(getChildFile(item), true, stopWhenFindYounger); long lastChanged =
} catch (CheckedExceptionTunnel tunnelEx) FileUtilities.lastChanged(getChildFile(item), true, stopWhenFindYounger);
return NumberStatus.create(lastChanged);
} catch (UnknownLastChangedException ex)
{ {
if (tunnelEx.getCause() instanceof IOException) return createLastChangedError(item, ex);
{
String errorMsg =
String.format("Could not determine \"last changed time\" of %s: %s", item,
tunnelEx.getCause());
throw new UnknownLastChangedException(errorMsg);
} else
{
throw tunnelEx;
}
} }
} }
public final long lastChangedRelative(final StoreItem item, public final NumberStatus lastChangedRelative(final StoreItem item,
final long stopWhenFindYoungerRelative) final long stopWhenFindYoungerRelative)
{ {
return FileUtilities.lastChangedRelative(getChildFile(item), true, try
stopWhenFindYoungerRelative); {
long lastChanged =
FileUtilities.lastChangedRelative(getChildFile(item), true,
stopWhenFindYoungerRelative);
return NumberStatus.create(lastChanged);
} catch (UnknownLastChangedException ex)
{
return createLastChangedError(item, ex);
}
}
private static NumberStatus createLastChangedError(final StoreItem item,
UnknownLastChangedException ex)
{
String errorMsg =
String.format("Could not determine \"last changed time\" of %s: %s", item, ex
.getCause());
return NumberStatus.createError(errorMsg);
} }
public final BooleanStatus tryCheckDirectoryFullyAccessible(final long timeOutMillis) public final BooleanStatus tryCheckDirectoryFullyAccessible(final long timeOutMillis)
......
...@@ -42,7 +42,7 @@ import ch.systemsx.cisd.datamover.filesystem.intf.IExtendedFileStore; ...@@ -42,7 +42,7 @@ import ch.systemsx.cisd.datamover.filesystem.intf.IExtendedFileStore;
import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore; import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore;
import ch.systemsx.cisd.datamover.filesystem.intf.IFileSysOperationsFactory; import ch.systemsx.cisd.datamover.filesystem.intf.IFileSysOperationsFactory;
import ch.systemsx.cisd.datamover.filesystem.intf.IStoreCopier; import ch.systemsx.cisd.datamover.filesystem.intf.IStoreCopier;
import ch.systemsx.cisd.datamover.filesystem.intf.UnknownLastChangedException; import ch.systemsx.cisd.datamover.filesystem.intf.NumberStatus;
/** /**
* @author Tomasz Pylak * @author Tomasz Pylak
...@@ -229,12 +229,12 @@ public class FileStoreRemote extends FileStore ...@@ -229,12 +229,12 @@ public class FileStoreRemote extends FileStore
return constructStoreCopier(destinationDirectory, requiresDeletion); return constructStoreCopier(destinationDirectory, requiresDeletion);
} }
public final long lastChanged(final StoreItem item, final long stopWhenFindYounger) public final NumberStatus lastChanged(final StoreItem item, final long stopWhenFindYounger)
{ {
return lastChanged(item); return lastChanged(item);
} }
private final long lastChanged(final StoreItem item) private final NumberStatus lastChanged(final StoreItem item)
{ {
final String itemPath = StoreItem.asFile(getPath(), item).getPath(); final String itemPath = StoreItem.asFile(getPath(), item).getPath();
...@@ -247,22 +247,22 @@ public class FileStoreRemote extends FileStore ...@@ -247,22 +247,22 @@ public class FileStoreRemote extends FileStore
final String resultLine = result.getOutput().get(0); final String resultLine = result.getOutput().get(0);
try try
{ {
return Long.parseLong(resultLine) * 1000; long lastChanged = Long.parseLong(resultLine) * 1000;
return NumberStatus.create(lastChanged);
} catch (final NumberFormatException e) } catch (final NumberFormatException e)
{ {
throw createLastChangeException(item, "The result of " + cmd + " on remote host " return createLastChangeError(item, "The result of " + cmd + " on remote host "
+ getHost() + "should be a number but was: " + result.getOutput()); + getHost() + "should be a number but was: " + result.getOutput());
} }
} else } else
{ {
throw createLastChangeException(item, errMsg); return createLastChangeError(item, errMsg);
} }
} }
private static UnknownLastChangedException createLastChangeException(StoreItem item, private static NumberStatus createLastChangeError(StoreItem item, String errorMsg)
String errorMsg)
{ {
return new UnknownLastChangedException("Cannot obtain last change time of the item " + item return NumberStatus.createError("Cannot obtain last change time of the item " + item
+ ". Reason: " + errorMsg); + ". Reason: " + errorMsg);
} }
...@@ -281,7 +281,7 @@ public class FileStoreRemote extends FileStore ...@@ -281,7 +281,7 @@ public class FileStoreRemote extends FileStore
} }
} }
public final long lastChangedRelative(final StoreItem item, public final NumberStatus lastChangedRelative(final StoreItem item,
final long stopWhenFindYoungerRelative) final long stopWhenFindYoungerRelative)
{ {
return lastChanged(item); return lastChanged(item);
......
...@@ -29,6 +29,7 @@ import ch.systemsx.cisd.datamover.filesystem.intf.IExtendedFileStore; ...@@ -29,6 +29,7 @@ import ch.systemsx.cisd.datamover.filesystem.intf.IExtendedFileStore;
import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore; import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore;
import ch.systemsx.cisd.datamover.filesystem.intf.IFileSysOperationsFactory; import ch.systemsx.cisd.datamover.filesystem.intf.IFileSysOperationsFactory;
import ch.systemsx.cisd.datamover.filesystem.intf.IStoreCopier; import ch.systemsx.cisd.datamover.filesystem.intf.IStoreCopier;
import ch.systemsx.cisd.datamover.filesystem.intf.NumberStatus;
/** /**
* A <code>FileStore</code> extension for remote paths mounted. * A <code>FileStore</code> extension for remote paths mounted.
...@@ -79,12 +80,12 @@ public final class FileStoreRemoteMounted extends FileStore ...@@ -79,12 +80,12 @@ public final class FileStoreRemoteMounted extends FileStore
return localImpl.exists(item); return localImpl.exists(item);
} }
public final long lastChanged(final StoreItem item, final long stopWhenFindYounger) public final NumberStatus lastChanged(final StoreItem item, final long stopWhenFindYounger)
{ {
return localImpl.lastChanged(item, stopWhenFindYounger); return localImpl.lastChanged(item, stopWhenFindYounger);
} }
public final long lastChangedRelative(final StoreItem item, public final NumberStatus lastChangedRelative(final StoreItem item,
final long stopWhenFindYoungerRelative) final long stopWhenFindYoungerRelative)
{ {
return localImpl.lastChangedRelative(item, stopWhenFindYoungerRelative); return localImpl.lastChangedRelative(item, stopWhenFindYoungerRelative);
......
...@@ -31,6 +31,7 @@ import ch.systemsx.cisd.common.utilities.ITimeProvider; ...@@ -31,6 +31,7 @@ import ch.systemsx.cisd.common.utilities.ITimeProvider;
import ch.systemsx.cisd.common.utilities.StoreItem; import ch.systemsx.cisd.common.utilities.StoreItem;
import ch.systemsx.cisd.common.utilities.SystemTimeProvider; import ch.systemsx.cisd.common.utilities.SystemTimeProvider;
import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore; import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore;
import ch.systemsx.cisd.datamover.filesystem.intf.NumberStatus;
import ch.systemsx.cisd.datamover.intf.ITimingParameters; import ch.systemsx.cisd.datamover.intf.ITimingParameters;
/** /**
...@@ -149,7 +150,7 @@ public class QuietPeriodFileFilter implements IStoreItemFilter ...@@ -149,7 +150,7 @@ public class QuietPeriodFileFilter implements IStoreItemFilter
final PathCheckRecord checkRecordOrNull = pathMap.get(item); final PathCheckRecord checkRecordOrNull = pathMap.get(item);
if (checkRecordOrNull == null) // new item if (checkRecordOrNull == null) // new item
{ {
pathMap.put(item, new PathCheckRecord(now, fileStore.lastChanged(item, 0L))); saveFirstModificationTime(item, now);
return false; return false;
} }
if (now - checkRecordOrNull.getTimeChecked() < quietPeriodMillis) // no need to check if (now - checkRecordOrNull.getTimeChecked() < quietPeriodMillis) // no need to check
...@@ -158,7 +159,12 @@ public class QuietPeriodFileFilter implements IStoreItemFilter ...@@ -158,7 +159,12 @@ public class QuietPeriodFileFilter implements IStoreItemFilter
return false; return false;
} }
final long oldLastChanged = checkRecordOrNull.getTimeOfLastModification(); final long oldLastChanged = checkRecordOrNull.getTimeOfLastModification();
final long newLastChanged = fileStore.lastChanged(item, oldLastChanged); NumberStatus newStatus = fileStore.lastChanged(item, oldLastChanged);
if (newStatus.isError())
{
return false;
}
final long newLastChanged = newStatus.getResult();
if (newLastChanged != oldLastChanged) if (newLastChanged != oldLastChanged)
{ {
pathMap.put(item, new PathCheckRecord(now, newLastChanged)); pathMap.put(item, new PathCheckRecord(now, newLastChanged));
...@@ -190,6 +196,15 @@ public class QuietPeriodFileFilter implements IStoreItemFilter ...@@ -190,6 +196,15 @@ public class QuietPeriodFileFilter implements IStoreItemFilter
} }
} }
private void saveFirstModificationTime(final StoreItem item, final long now)
{
NumberStatus status = fileStore.lastChanged(item, 0L);
if (status.isError() == false)
{
pathMap.put(item, new PathCheckRecord(now, status.getResult()));
}
}
private void cleanUpVanishedPaths(final long now) private void cleanUpVanishedPaths(final long now)
{ {
final Iterator<Map.Entry<StoreItem, PathCheckRecord>> iter = pathMap.entrySet().iterator(); final Iterator<Map.Entry<StoreItem, PathCheckRecord>> iter = pathMap.entrySet().iterator();
......
...@@ -36,6 +36,7 @@ import ch.systemsx.cisd.common.test.StoringUncaughtExceptionHandler; ...@@ -36,6 +36,7 @@ import ch.systemsx.cisd.common.test.StoringUncaughtExceptionHandler;
import ch.systemsx.cisd.common.utilities.ITerminable; import ch.systemsx.cisd.common.utilities.ITerminable;
import ch.systemsx.cisd.common.utilities.StoreItem; import ch.systemsx.cisd.common.utilities.StoreItem;
import ch.systemsx.cisd.datamover.filesystem.intf.BooleanStatus; import ch.systemsx.cisd.datamover.filesystem.intf.BooleanStatus;
import ch.systemsx.cisd.datamover.filesystem.intf.NumberStatus;
import ch.systemsx.cisd.datamover.filesystem.remote.CopyActivityMonitor.IFileStoreMonitor; import ch.systemsx.cisd.datamover.filesystem.remote.CopyActivityMonitor.IFileStoreMonitor;
import ch.systemsx.cisd.datamover.intf.ITimingParameters; import ch.systemsx.cisd.datamover.intf.ITimingParameters;
...@@ -184,9 +185,12 @@ public class CopyActivityMonitorTest ...@@ -184,9 +185,12 @@ public class CopyActivityMonitorTest
return BooleanStatus.createTrue(); return BooleanStatus.createTrue();
} }
public long lastChanged(StoreItem item, long stopWhenFindYoungerRelative) public NumberStatus lastChangedRelative(StoreItem item,
long stopWhenFindYoungerRelative)
{ {
return checker.lastChangedRelative(item, stopWhenFindYoungerRelative); long lastChanged =
checker.lastChangedRelative(item, stopWhenFindYoungerRelative);
return NumberStatus.create(lastChanged);
} }
@Override @Override
...@@ -452,7 +456,7 @@ public class CopyActivityMonitorTest ...@@ -452,7 +456,7 @@ public class CopyActivityMonitorTest
return BooleanStatus.createFalse(); return BooleanStatus.createFalse();
} }
public long lastChanged(StoreItem item, long stopWhenYoungerThan) public NumberStatus lastChangedRelative(StoreItem item, long stopWhenYoungerThan)
{ {
throw new UnsupportedOperationException(); // should be never called throw new UnsupportedOperationException(); // should be never called
} }
...@@ -469,10 +473,10 @@ public class CopyActivityMonitorTest ...@@ -469,10 +473,10 @@ public class CopyActivityMonitorTest
final StoreItem dummyItem = createDummyItem(); final StoreItem dummyItem = createDummyItem();
final IFileStoreMonitor store = new AlwaysExistsStoreMonitor(dummyItem) final IFileStoreMonitor store = new AlwaysExistsStoreMonitor(dummyItem)
{ {
public long lastChanged(StoreItem item, long stopWhenYoungerThan) public NumberStatus lastChangedRelative(StoreItem item, long stopWhenYoungerThan)
{ {
assertEquals(dummyItem, item); assertEquals(dummyItem, item);
return 0; // signalizes error return NumberStatus.createError("mock: lastChange error");
} }
}; };
checkCopyTerminated(store, dummyItem); checkCopyTerminated(store, dummyItem);
...@@ -489,11 +493,13 @@ public class CopyActivityMonitorTest ...@@ -489,11 +493,13 @@ public class CopyActivityMonitorTest
{ {
private boolean oddCall = true; private boolean oddCall = true;
public long lastChanged(StoreItem item, long stopWhenYoungerThan) public NumberStatus lastChangedRelative(StoreItem item, long stopWhenYoungerThan)
{ {
assertEquals(dummyItem, item); assertEquals(dummyItem, item);
oddCall = !oddCall; oddCall = !oddCall;
return oddCall ? 10 : 0; // error or unchanged value // error or unchanged value
return oddCall ? NumberStatus.create(10) : NumberStatus
.createError("mock: simulate error while getting last change");
} }
}; };
checkCopyTerminated(store, dummyItem); checkCopyTerminated(store, dummyItem);
...@@ -509,9 +515,9 @@ public class CopyActivityMonitorTest ...@@ -509,9 +515,9 @@ public class CopyActivityMonitorTest
{ {
private int counter = 1; private int counter = 1;
public long lastChanged(StoreItem item, long stopWhenYoungerThan) public NumberStatus lastChangedRelative(StoreItem item, long stopWhenYoungerThan)
{ {
return counter++; return NumberStatus.create(counter++);
} }
}; };
checkCopyTerminationStatus(store, dummyItem, false); checkCopyTerminationStatus(store, dummyItem, false);
......
...@@ -35,6 +35,7 @@ import ch.systemsx.cisd.common.test.LogMonitoringAppender; ...@@ -35,6 +35,7 @@ import ch.systemsx.cisd.common.test.LogMonitoringAppender;
import ch.systemsx.cisd.common.utilities.ITimeProvider; import ch.systemsx.cisd.common.utilities.ITimeProvider;
import ch.systemsx.cisd.common.utilities.StoreItem; import ch.systemsx.cisd.common.utilities.StoreItem;
import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore; import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore;
import ch.systemsx.cisd.datamover.filesystem.intf.NumberStatus;
import ch.systemsx.cisd.datamover.intf.ITimingParameters; import ch.systemsx.cisd.datamover.intf.ITimingParameters;
/** /**
...@@ -114,13 +115,10 @@ public class QuietPeriodFileFilterTest ...@@ -114,13 +115,10 @@ public class QuietPeriodFileFilterTest
context.checking(new Expectations() context.checking(new Expectations()
{ {
{ {
one(timeProvider).getTimeInMilliseconds(); prepareLastChanged(nowMillis, 0L, pathLastChangedMillis);
will(returnValue(nowMillis));
one(fileStore).lastChanged(ITEM, 0L);
will(returnValue(pathLastChangedMillis));
} }
}); });
assertFalse(filterUnderTest.accept(ITEM)); assertNoAccept();
context.assertIsSatisfied(); context.assertIsSatisfied();
} }
...@@ -134,19 +132,13 @@ public class QuietPeriodFileFilterTest ...@@ -134,19 +132,13 @@ public class QuietPeriodFileFilterTest
{ {
{ {
// first call // first call
one(timeProvider).getTimeInMilliseconds(); prepareLastChanged(nowMillis1, 0L, pathLastChangedMillis);
will(returnValue(nowMillis1));
one(fileStore).lastChanged(ITEM, 0L);
will(returnValue(pathLastChangedMillis));
// second call // second call
one(timeProvider).getTimeInMilliseconds(); prepareLastChanged(nowMillis2, pathLastChangedMillis, pathLastChangedMillis);
will(returnValue(nowMillis2));
one(fileStore).lastChanged(ITEM, pathLastChangedMillis);
will(returnValue(pathLastChangedMillis));
} }
}); });
assertFalse(filterUnderTest.accept(ITEM)); assertNoAccept();
assertTrue(filterUnderTest.accept(ITEM)); assertFilterAccepts();
context.assertIsSatisfied(); context.assertIsSatisfied();
} }
...@@ -162,7 +154,7 @@ public class QuietPeriodFileFilterTest ...@@ -162,7 +154,7 @@ public class QuietPeriodFileFilterTest
one(timeProvider).getTimeInMilliseconds(); one(timeProvider).getTimeInMilliseconds();
will(returnValue(nowMillis)); will(returnValue(nowMillis));
one(fileStore).lastChanged(ITEM, 0L); one(fileStore).lastChanged(ITEM, 0L);
will(returnValue(pathLastChangedMillis)); will(returnValue(NumberStatus.create(pathLastChangedMillis)));
for (int i = 1; i <= 100; ++i) for (int i = 1; i <= 100; ++i)
{ {
one(timeProvider).getTimeInMilliseconds(); one(timeProvider).getTimeInMilliseconds();
...@@ -171,14 +163,14 @@ public class QuietPeriodFileFilterTest ...@@ -171,14 +163,14 @@ public class QuietPeriodFileFilterTest
// last call - will check only when last check is longer ago than the quiet // last call - will check only when last check is longer ago than the quiet
// period // period
one(fileStore).lastChanged(ITEM, pathLastChangedMillis); one(fileStore).lastChanged(ITEM, pathLastChangedMillis);
will(returnValue(pathLastChangedMillis)); will(returnValue(NumberStatus.create(pathLastChangedMillis)));
} }
}); });
for (int i = 0; i < 100; ++i) for (int i = 0; i < 100; ++i)
{ {
assertFalse(filterUnderTest.accept(ITEM)); assertNoAccept();
} }
assertTrue(filterUnderTest.accept(ITEM)); assertFilterAccepts();
context.assertIsSatisfied(); context.assertIsSatisfied();
} }
...@@ -194,25 +186,16 @@ public class QuietPeriodFileFilterTest ...@@ -194,25 +186,16 @@ public class QuietPeriodFileFilterTest
{ {
{ {
// first call // first call
one(timeProvider).getTimeInMilliseconds(); prepareLastChanged(nowMillis1, 0L, pathLastChangedMillis1);
will(returnValue(nowMillis1));
one(fileStore).lastChanged(ITEM, 0L);
will(returnValue(pathLastChangedMillis1));
// second call // second call
one(timeProvider).getTimeInMilliseconds(); prepareLastChanged(nowMillis2, pathLastChangedMillis1, pathLastChangedMillis2);
will(returnValue(nowMillis2));
one(fileStore).lastChanged(ITEM, pathLastChangedMillis1);
will(returnValue(pathLastChangedMillis2));
// third call // third call
one(timeProvider).getTimeInMilliseconds(); prepareLastChanged(nowMillis3, pathLastChangedMillis2, pathLastChangedMillis2);
will(returnValue(nowMillis3));
one(fileStore).lastChanged(ITEM, pathLastChangedMillis2);
will(returnValue(pathLastChangedMillis2));
} }
}); });
assertFalse(filterUnderTest.accept(ITEM)); assertNoAccept();
assertFalse(filterUnderTest.accept(ITEM)); assertNoAccept();
assertTrue(filterUnderTest.accept(ITEM)); assertFilterAccepts();
context.assertIsSatisfied(); context.assertIsSatisfied();
} }
...@@ -228,33 +211,86 @@ public class QuietPeriodFileFilterTest ...@@ -228,33 +211,86 @@ public class QuietPeriodFileFilterTest
{ {
{ {
// first call // first call
one(timeProvider).getTimeInMilliseconds(); prepareLastChanged(nowMillis1, 0L, pathLastChangedMillis1);
will(returnValue(nowMillis1));
one(fileStore).lastChanged(ITEM, 0L);
will(returnValue(pathLastChangedMillis1));
// second call // second call
one(timeProvider).getTimeInMilliseconds(); prepareLastChanged(nowMillis2, pathLastChangedMillis1, pathLastChangedMillis2);
will(returnValue(nowMillis2));
one(fileStore).lastChanged(ITEM, pathLastChangedMillis1);
will(returnValue(pathLastChangedMillis2));
// third call // third call
one(timeProvider).getTimeInMilliseconds(); prepareLastChanged(nowMillis3, pathLastChangedMillis2, pathLastChangedMillis2);
will(returnValue(nowMillis3));
one(fileStore).lastChanged(ITEM, pathLastChangedMillis2);
will(returnValue(pathLastChangedMillis2));
} }
}); });
final LogMonitoringAppender appender = final LogMonitoringAppender appender =
LogMonitoringAppender.addAppender(LogCategory.MACHINE, Pattern LogMonitoringAppender.addAppender(LogCategory.MACHINE, Pattern
.compile("Last modification time of path '.+' jumped back")); .compile("Last modification time of path '.+' jumped back"));
assertFalse(filterUnderTest.accept(ITEM)); assertNoAccept();
assertFalse(filterUnderTest.accept(ITEM)); assertNoAccept();
assertTrue(filterUnderTest.accept(ITEM)); assertFilterAccepts();
appender.verifyLogHasHappened(); appender.verifyLogHasHappened();
LogMonitoringAppender.removeAppender(appender); LogMonitoringAppender.removeAppender(appender);
context.assertIsSatisfied(); context.assertIsSatisfied();
} }
@Test
public void testErrorInLastChangeDoesNotAccept()
{
// simulate several times an error during acquiring last modification time
int errorRepetitions = 3;
long now = 0;
for (int i = 0; i < errorRepetitions; i++)
{
prepareLastChanged(now, 0L, NumberStatus.createError());
now += QUIET_PERIOD_MILLIS;
}
for (int i = 0; i < errorRepetitions; i++)
{
assertNoAccept();
}
// first time we acquire modification time
NumberStatus lastChange = NumberStatus.create(0);
prepareLastChanged(now, 0L, lastChange);
now += QUIET_PERIOD_MILLIS;
assertNoAccept();
// error again
prepareLastChanged(now, 0L, NumberStatus.createError());
now += QUIET_PERIOD_MILLIS;
assertNoAccept();
// second time we acquire modification time - and nothing change during the quite period, so
// accept should succeed
prepareLastChanged(now, 0L, lastChange);
now += QUIET_PERIOD_MILLIS;
assertFilterAccepts();
context.assertIsSatisfied();
}
private void assertNoAccept()
{
assertFalse(filterUnderTest.accept(ITEM));
}
private void assertFilterAccepts()
{
assertTrue(filterUnderTest.accept(ITEM));
}
private void prepareLastChanged(final long currentTime, final long stopWhenFindYounger,
final long lastChanged)
{
prepareLastChanged(currentTime, stopWhenFindYounger, NumberStatus.create(lastChanged));
}
private void prepareLastChanged(final long currentTime, final long stopWhenFindYounger,
final NumberStatus lastChanged)
{
context.checking(new Expectations()
{
{
one(timeProvider).getTimeInMilliseconds();
will(returnValue(currentTime));
one(fileStore).lastChanged(ITEM, stopWhenFindYounger);
will(returnValue(lastChanged));
}
});
}
@Test @Test
public void testCleanUpVanishedDirectory() public void testCleanUpVanishedDirectory()
{ {
...@@ -271,13 +307,13 @@ public class QuietPeriodFileFilterTest ...@@ -271,13 +307,13 @@ public class QuietPeriodFileFilterTest
one(timeProvider).getTimeInMilliseconds(); one(timeProvider).getTimeInMilliseconds();
will(returnValue(nowMillis1)); will(returnValue(nowMillis1));
allowing(fileStore).lastChanged(vanishingItem, 0L); allowing(fileStore).lastChanged(vanishingItem, 0L);
will(returnValue(pathLastChangedMillis1)); will(returnValue(NumberStatus.create(pathLastChangedMillis1)));
// calls to get the required number of calls for clean up // calls to get the required number of calls for clean up
allowing(timeProvider).getTimeInMilliseconds(); allowing(timeProvider).getTimeInMilliseconds();
will(returnValue(nowMillis2)); will(returnValue(nowMillis2));
allowing(fileStore).lastChanged(with(same(ITEM)), allowing(fileStore).lastChanged(with(same(ITEM)),
with(greaterThanOrEqualTo(0L))); with(greaterThanOrEqualTo(0L)));
will(returnValue(pathLastChangedMillis2)); will(returnValue(NumberStatus.create(pathLastChangedMillis2)));
} }
}); });
final LogMonitoringAppender appender = final LogMonitoringAppender appender =
...@@ -285,7 +321,7 @@ public class QuietPeriodFileFilterTest ...@@ -285,7 +321,7 @@ public class QuietPeriodFileFilterTest
assertFalse(filterUnderTest.accept(vanishingItem)); assertFalse(filterUnderTest.accept(vanishingItem));
for (int i = 0; i < QuietPeriodFileFilter.MAX_CALLS_BEFORE_CLEANUP; ++i) for (int i = 0; i < QuietPeriodFileFilter.MAX_CALLS_BEFORE_CLEANUP; ++i)
{ {
assertFalse(filterUnderTest.accept(ITEM)); assertNoAccept();
} }
appender.verifyLogHasHappened(); appender.verifyLogHasHappened();
LogMonitoringAppender.removeAppender(appender); LogMonitoringAppender.removeAppender(appender);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment