Skip to content
Snippets Groups Projects
Commit 76c0c0a8 authored by buczekp's avatar buczekp
Browse files

[LMS-1578] introduced FullTextIndexUpdater

SVN: 16383
parent a11a28fc
No related branches found
No related tags found
No related merge requests found
......@@ -37,10 +37,11 @@ import ch.systemsx.cisd.common.logging.LogFactory;
/**
* A default {@link IFullTextIndexer} which knows how to perform an efficient full text index.
* <p>
* Taken from <i>Hibernate Search</i> documentation page.
* Partly taken from <i>Hibernate Search</i> documentation page.
* </p>
*
* @author Christian Ribeaud
* @author Piotr Buczek
*/
final class DefaultFullTextIndexer implements IFullTextIndexer
{
......@@ -78,7 +79,7 @@ final class DefaultFullTextIndexer implements IFullTextIndexer
// we index entities in batches loading them in groups restricted by id:
// [ ids[index], ids[min(index+batchSize, maxIndex))] )
final Transaction transaction = hibernateSession.beginTransaction();
final Transaction transaction = fullTextSession.beginTransaction();
final List<Long> ids = getAllIds(fullTextSession, clazz);
final int idsSize = ids.size();
operationLog.info(String.format("... got %d '%s' ids...", idsSize, clazz.getSimpleName()));
......@@ -98,13 +99,13 @@ final class DefaultFullTextIndexer implements IFullTextIndexer
createCriteriaWithRestrictedId(fullTextSession, clazz, minId, maxId).list();
for (Object object : results)
{
indexEntity(hibernateSession, fullTextSession, object);
indexEntity(fullTextSession, object);
index++;
}
operationLog.info(String.format("%d '%s' have been indexed...", index, clazz
operationLog.info(String.format("%d %ss have been indexed...", index, clazz
.getSimpleName()));
fullTextSession.flushToIndexes();
hibernateSession.clear();
fullTextSession.clear();
}
fullTextSession.getSearchFactory().optimize(clazz);
transaction.commit();
......@@ -112,6 +113,41 @@ final class DefaultFullTextIndexer implements IFullTextIndexer
clazz.getSimpleName(), index));
}
public <T> void doFullTextIndexUpdate(final Session hibernateSession, final Class<T> clazz,
final List<Long> ids) throws DataAccessException
{
operationLog.info(String.format("Reindexing %s %ss...", ids.size(), clazz.getSimpleName()));
final FullTextSession fullTextSession = Search.getFullTextSession(hibernateSession);
fullTextSession.setFlushMode(FlushMode.MANUAL);
fullTextSession.setCacheMode(CacheMode.IGNORE);
// we index entities in batches loading them in groups by id
final Transaction transaction = fullTextSession.beginTransaction();
final int maxIndex = ids.size();
int index = 0;
// need to increment last id because we use 'lt' condition
while (index < maxIndex)
{
final int nextIndex = getNextIndex(index, maxIndex);
List<Long> subList = ids.subList(index, nextIndex);
final List<?> results =
createCriteriaWithRestrictedId(fullTextSession, clazz, subList).list();
for (Object object : results)
{
indexEntity(fullTextSession, object);
index++;
}
operationLog.info(String.format("%d %ss have been reindexed...", index, clazz
.getSimpleName()));
fullTextSession.flushToIndexes();
fullTextSession.clear();
}
fullTextSession.getSearchFactory().optimize(clazz);
transaction.commit();
operationLog.info(String.format("'%s' index is updated. %d entities have been reindexed.",
clazz.getSimpleName(), index));
}
private int getNextIndex(int index, int maxIndex)
{
int result = index + batchSize;
......@@ -142,13 +178,18 @@ final class DefaultFullTextIndexer implements IFullTextIndexer
.add(Restrictions.lt(ID_PROPERTY_NAME, maxId));
}
private <T> Criteria createCriteriaWithRestrictedId(final FullTextSession fullTextSession,
final Class<T> clazz, final List<Long> ids)
{
return createCriteria(fullTextSession, clazz).add(Restrictions.in(ID_PROPERTY_NAME, ids));
}
private <T> Criteria createCriteria(final FullTextSession fullTextSession, final Class<T> clazz)
{
return fullTextSession.createCriteria(clazz);
}
private <T> void indexEntity(final Session hibernateSession,
final FullTextSession fullTextSession, T object)
private <T> void indexEntity(final FullTextSession fullTextSession, T object)
{
if (operationLog.isDebugEnabled())
{
......
/*
* Copyright 2010 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.openbis.generic.server.dataaccess.db.search;
import java.io.Serializable;
import java.util.List;
import ch.systemsx.cisd.common.collections.CollectionUtils;
/**
* @author Piotr Buczek
*/
public class EntitiesToUpdate implements Serializable
{
private static final long serialVersionUID = 1L;
private final Class<?> clazz;
private final List<Long> ids;
public EntitiesToUpdate(Class<?> clazz, List<Long> ids)
{
this.clazz = clazz;
this.ids = ids;
}
public Class<?> getClazz()
{
return clazz;
}
public List<Long> getIds()
{
return ids;
}
@Override
public String toString()
{
return clazz.getName() + ": " + CollectionUtils.abbreviate(ids, 10);
}
}
/*
* Copyright 2008 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.openbis.generic.server.dataaccess.db.search;
import java.io.File;
import org.apache.commons.lang.time.StopWatch;
import org.apache.log4j.Logger;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.springframework.orm.hibernate3.support.HibernateDaoSupport;
import ch.systemsx.cisd.common.Constants;
import ch.systemsx.cisd.common.collections.ExtendedBlockingQueueFactory;
import ch.systemsx.cisd.common.collections.IExtendedBlockingQueue;
import ch.systemsx.cisd.common.logging.LogCategory;
import ch.systemsx.cisd.common.logging.LogFactory;
/**
* A <i>full-text</i> index updater.
*
* @author Piotr Buczek
*/
public final class FullTextIndexUpdater extends HibernateDaoSupport implements
IFullTextIndexUpdater
{
public final static String FULL_TEXT_INDEX_UPDATER_QUEUE_FILENAME =
Constants.MARKER_PREFIX + ".index_updater_queue";
private static final Logger operationLog =
LogFactory.getLogger(LogCategory.OPERATION, FullTextIndexUpdater.class);
private static final Logger notificationLog =
LogFactory.getLogger(LogCategory.NOTIFY, FullTextIndexUpdater.class);
private final HibernateSearchContext context;
private final IFullTextIndexer fullTextIndexer;
private final IExtendedBlockingQueue<EntitiesToUpdate> updaterQueue;
public FullTextIndexUpdater(final SessionFactory sessionFactory,
final HibernateSearchContext context)
{
assert context != null : "Unspecified hibernate search context.";
setSessionFactory(sessionFactory);
this.context = context;
operationLog.debug(String.format("Hibernate search context: %s.", context));
fullTextIndexer = new DefaultFullTextIndexer(2); // context.getBatchSize()); FIXME
File queueFile = getUpdaterQueueFile(context);
operationLog.debug(String.format("Updater queue file: %s.", queueFile));
updaterQueue =
ExtendedBlockingQueueFactory.<EntitiesToUpdate> createPersistRecordBased(queueFile);
}
private static File getUpdaterQueueFile(HibernateSearchContext context)
{
final File indexBase = new File(context.getIndexBase());
final File queueFile = new File(indexBase, FULL_TEXT_INDEX_UPDATER_QUEUE_FILENAME);
return queueFile;
}
public void start()
{
Thread thread = new Thread(new FullTextIndexUpdaterRunnable(), "Full Text Index Updater");
thread.setDaemon(true);
thread.setPriority(Thread.MIN_PRIORITY);
thread.start();
}
public void clear()
{
updaterQueue.clear();
if (operationLog.isInfoEnabled())
{
operationLog.info("Cleared updater queue.");
}
}
public void scheduleUpdate(EntitiesToUpdate entities)
{
if (operationLog.isDebugEnabled())
{
operationLog.debug("Scheduling update: " + entities);
}
updaterQueue.add(entities);
}
/**
* {@link Runnable} performing updates in a loop.
*
* @author Piotr Buczek
*/
private class FullTextIndexUpdaterRunnable implements Runnable
{
public final void run()
{
final IndexMode indexMode = context.getIndexMode();
if (indexMode == IndexMode.NO_INDEX) // sanity check
{
operationLog.debug(String.format("Stopping index updater process as "
+ " '%s' mode was configured.", indexMode));
return;
}
try
{
while (true)
{
final EntitiesToUpdate entities = updaterQueue.peekWait();
if (operationLog.isInfoEnabled())
{
operationLog.info("Updating " + entities);
}
final StopWatch stopWatch = new StopWatch();
stopWatch.start();
Session session = null;
try
{
session = getSession();
// FIXME clear session?
fullTextIndexer.doFullTextIndexUpdate(getSession(), entities.getClazz(),
entities.getIds());
stopWatch.stop();
} catch (RuntimeException e)
{
notificationLog.error("Error updating " + entities + ".", e);
} finally
{
if (session != null)
{
releaseSession(session);
}
}
if (operationLog.isInfoEnabled())
{
operationLog.info("Finished updating " + entities + " after " + stopWatch);
}
updaterQueue.take();
}
} catch (final Throwable th)
{
notificationLog.error("A problem has occurred while updating index.", th);
}
}
}
}
/*
* Copyright 2008 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.openbis.generic.server.dataaccess.db.search;
/**
* Each implementation is able to schedule a <i>full-text</i> index update. Updates are expected to
* be executed asynchronously in the order they had been scheduled. The thread executing updates
* will run with low priority.
*
* @author Piotr Buczek
*/
public interface IFullTextIndexUpdateScheduler
{
/**
* Schedules update of index for specified entities.
*/
void scheduleUpdate(EntitiesToUpdate entities);
}
\ No newline at end of file
/*
* Copyright 2008 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.openbis.generic.server.dataaccess.db.search;
/**
* Each implementation is able to perform a <i>full-text</i> index update. Updates are expected to
* be executed asynchronously in the order they had been scheduled. The thread executing updates
* will run with low priority.
*
* @author Piotr Buczek
*/
public interface IFullTextIndexUpdater extends IFullTextIndexUpdateScheduler
{
/**
* Starts up updater.
*/
void start();
/**
* Clears the schedule.
*/
void clear();
}
......@@ -16,6 +16,8 @@
package ch.systemsx.cisd.openbis.generic.server.dataaccess.db.search;
import java.util.List;
import org.hibernate.Session;
import org.springframework.dao.DataAccessException;
......@@ -23,14 +25,22 @@ import org.springframework.dao.DataAccessException;
* Each implementation is able to perform a <i>full-text</i> index.
*
* @author Christian Ribeaud
* @author Piotr Buczek
*/
public interface IFullTextIndexer
{
/**
* Performs a <i>full-text</i> index on given <var>clazz</var> using given <i>Hibernate</i>
* session.
* Performs a <i>full-text</i> index on entities of given <var>clazz</var> using given
* <i>Hibernate</i> session.
*/
public <T> void doFullTextIndex(final Session hibernateSession, final Class<T> clazz)
throws DataAccessException;
/**
* Performs a <i>full-text</i> index update on entities of given <var>clazz</var> with given
* <var>ids</var> using given <i>Hibernate</i> session.
*/
public <T> void doFullTextIndexUpdate(final Session hibernateSession, final Class<T> clazz,
final List<Long> ids) throws DataAccessException;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment