From b20a08a24fbd5ed2b46f3fec9b1467cc1032de4b Mon Sep 17 00:00:00 2001 From: brinn <brinn> Date: Sun, 18 Dec 2011 09:55:35 +0000 Subject: [PATCH] Improve logging when messages arrive for interrupted and terminated conversations. SVN: 24044 --- .../server/BidirectionalServiceMessenger.java | 5 + .../server/ConversationMap.java | 95 +++++++++++++++++++ .../server/ServiceConversationServer.java | 37 ++++---- 3 files changed, 121 insertions(+), 16 deletions(-) create mode 100644 common/source/java/ch/systemsx/cisd/common/serviceconversation/server/ConversationMap.java diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/BidirectionalServiceMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/BidirectionalServiceMessenger.java index b9b2d29d3be..21acb14b669 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/BidirectionalServiceMessenger.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/BidirectionalServiceMessenger.java @@ -131,4 +131,9 @@ class BidirectionalServiceMessenger { interrupted.set(true); } + + public boolean isMarkedAsInterrupted() + { + return interrupted.get(); + } } \ No newline at end of file diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/ConversationMap.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/ConversationMap.java new file mode 100644 index 00000000000..0e34cf0417a --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/ConversationMap.java @@ -0,0 +1,95 @@ +/* + * Copyright 2011 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.serviceconversation.server; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + +/** + * A set of recently seen service conversation ids. + * + * @author Bernd Rinn + */ +class ConversationMap +{ + private final int MAX_SIZE_INSPECT = 100; + + private final long MAX_AGE_MILLIS = 60000L; + + private final Map<String, ServiceConversationRecord> conversations = + new ConcurrentHashMap<String, ServiceConversationRecord>(); + + private final Map<String, Long> recentlySeenMap = new ConcurrentHashMap<String, Long>(); + + private void addHistoric(String serviceConversationId) + { + final long now = System.currentTimeMillis(); + recentlySeenMap.put(serviceConversationId, now); + cleanUpOld(now); + } + + private void cleanUpOld(final long now) + { + if (recentlySeenMap.size() > MAX_SIZE_INSPECT) + { + final Iterator<Entry<String, Long>> it = recentlySeenMap.entrySet().iterator(); + while (it.hasNext()) + { + final Entry<String, Long> entry = it.next(); + if (now - entry.getValue() > MAX_AGE_MILLIS) + { + it.remove(); + } + } + } + } + + boolean recentlySeen(String serviceConversationId) + { + return recentlySeenMap.containsKey(serviceConversationId); + } + + ServiceConversationRecord get(Object key) + { + return conversations.get(key); + } + + ServiceConversationRecord put(String key, ServiceConversationRecord value) + { + return conversations.put(key, value); + } + + ServiceConversationRecord remove(String serviceConversationId) + { + addHistoric(serviceConversationId); + return conversations.remove(serviceConversationId); + } + + boolean containsKey(Object key) + { + return conversations.containsKey(key); + } + + Collection<ServiceConversationRecord> values() + { + return conversations.values(); + } + +} diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/ServiceConversationServer.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/ServiceConversationServer.java index a1510f9e29a..57cf326ec9f 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/ServiceConversationServer.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/ServiceConversationServer.java @@ -61,9 +61,8 @@ public class ServiceConversationServer private final Map<String, IServiceMessageTransport> responseMessageMap = new ConcurrentHashMap<String, IServiceMessageTransport>(); - private final Map<String, ServiceConversationRecord> conversations = - new ConcurrentHashMap<String, ServiceConversationRecord>(); - + private final ConversationMap conversations = new ConversationMap(); + private final Random rng = new Random(); private final IServiceMessageTransport incomingTransport = new IServiceMessageTransport() @@ -74,8 +73,11 @@ public class ServiceConversationServer final ServiceConversationRecord record = conversations.get(conversationId); if (record == null) { - operationLog.error(String.format( - "Message for unknown service conversation '%s'", conversationId)); + if (conversations.recentlySeen(conversationId) == false) + { + operationLog.error(String.format( + "Message for unknown service conversation '%s'", conversationId)); + } return; } if (message.hasPayload()) @@ -83,19 +85,22 @@ public class ServiceConversationServer record.getMessenger().sendToService(message); } else { - if (message.isException()) - { - operationLog.error(String.format( - "[id: %s] Client execution exception.\n%s", conversationId, - message.tryGetExceptionDescription())); - } else + if (record.getMessenger().isMarkedAsInterrupted() == false) { - operationLog.error(String.format( - "[id: %s] Client requests termination of service conversation.", - conversationId)); + if (message.isException()) + { + operationLog.error(String.format( + "[id: %s] Client execution exception.\n%s", conversationId, + message.tryGetExceptionDescription())); + } else + { + operationLog.error(String.format( + "[id: %s] Client requests termination of service conversation.", + conversationId)); + } + record.getMessenger().markAsInterrupted(); + record.getController().cancel(true); } - record.getMessenger().markAsInterrupted(); - record.getController().cancel(true); } } }; -- GitLab