Skip to content
Snippets Groups Projects
Commit a72229c4 authored by juanf's avatar juanf Committed by vkovtun
Browse files

SSDM-13251: Refactoring new data store code

parent cfc21e24
No related branches found
No related tags found
1 merge request!40SSDM-13578 : 2PT : Database and V3 Implementation - include the new AFS "free"...
/*
* Copyright ETH 2022 - 2023 Zürich, Scientific IT Services
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.ethz.sis.afsserver.http.impl;
import ch.ethz.sis.afsserver.http.HttpResponse;
import ch.ethz.sis.afsserver.http.HttpServerHandler;
import ch.ethz.sis.shared.log.LogManager;
import ch.ethz.sis.shared.log.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.*;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import static io.netty.handler.codec.http.HttpMethod.*;
public class NettyHttpHandlerV2 extends ChannelInboundHandlerAdapter
{
private static final Logger logger = LogManager.getLogger(NettyHttpServer.class);
private static final byte[] NOT_FOUND = "404 NOT FOUND".getBytes();
private static final ByteBuf NOT_FOUND_BUFFER = Unpooled.wrappedBuffer(NOT_FOUND);
private static final Set<HttpMethod> allowedMethods = Set.of(GET, POST, PUT, DELETE);
private final String uri;
private final HttpServerHandler httpServerHandler;
public NettyHttpHandlerV2(String uri, HttpServerHandler httpServerHandler)
{
this.uri = uri;
this.httpServerHandler = httpServerHandler;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
if (msg instanceof FullHttpRequest)
{
final FullHttpRequest request = (FullHttpRequest) msg;
QueryStringDecoder queryStringDecoderForPath = new QueryStringDecoder(request.uri(), true);
if (queryStringDecoderForPath.path().equals(uri) &&
allowedMethods.contains(request.method()))
{
FullHttpResponse response = null;
ByteBuf content = request.content();
try
{
QueryStringDecoder queryStringDecoderForParameters = null;
byte[] array = new byte[content.readableBytes()];
content.readBytes(array);
if (GET.equals(request.method())) {
queryStringDecoderForParameters = queryStringDecoderForPath;
} else {
queryStringDecoderForParameters = new QueryStringDecoder(new String(array, StandardCharsets.UTF_8), StandardCharsets.UTF_8, false);
}
HttpResponse apiResponse = httpServerHandler.process(request.method(),
queryStringDecoderForParameters.parameters(), null);
HttpResponseStatus status = (!apiResponse.isError()) ?
HttpResponseStatus.OK :
HttpResponseStatus.BAD_REQUEST;
response = getHttpResponse(
status,
apiResponse.getContentType(),
Unpooled.wrappedBuffer(apiResponse.getBody()),
apiResponse.getBody().length);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
} finally
{
content.release();
}
} else
{
FullHttpResponse response = getHttpResponse(
HttpResponseStatus.NOT_FOUND,
"text/plain",
NOT_FOUND_BUFFER,
NOT_FOUND.length);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
} else
{
super.channelRead(ctx, msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
{
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
logger.catching(cause);
byte[] causeBytes = cause.getMessage().getBytes();
FullHttpResponse response = getHttpResponse(
HttpResponseStatus.INTERNAL_SERVER_ERROR,
"text/plain",
Unpooled.wrappedBuffer(causeBytes),
causeBytes.length
);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
public FullHttpResponse getHttpResponse(
HttpResponseStatus status,
String contentType,
ByteBuf content,
int contentLength)
{
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
status,
content
);
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, contentLength);
response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
response.headers().set(HttpHeaderNames.CONTENT_TYPE, contentType);
response.headers().set(HttpHeaderNames.CONNECTION, "close");
return response;
}
}
/*
* Copyright ETH 2022 - 2023 Zürich, Scientific IT Services
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.ethz.sis.afsserver.server.impl;
import ch.ethz.sis.afsjson.JsonObjectMapper;
import ch.ethz.sis.afsserver.exception.HTTPExceptions;
import ch.ethz.sis.afsserver.http.HttpResponse;
import ch.ethz.sis.afsserver.http.HttpServerHandler;
import ch.ethz.sis.afsserver.server.APIServer;
import ch.ethz.sis.afsserver.server.APIServerException;
import ch.ethz.sis.afsserver.server.Request;
import ch.ethz.sis.afsserver.server.Response;
import ch.ethz.sis.afsserver.server.performance.Event;
import ch.ethz.sis.afsserver.server.performance.PerformanceAuditor;
import ch.ethz.sis.shared.log.LogManager;
import ch.ethz.sis.shared.log.Logger;
import io.netty.handler.codec.http.HttpMethod;
import java.nio.charset.StandardCharsets;
import java.util.*;
import static io.netty.handler.codec.http.HttpMethod.*;
/*
* This class is supposed to be called by a TCP or HTTP transport class
*/
public class ApiServerAdapterV2<CONNECTION, API> implements HttpServerHandler
{
private static final Logger logger = LogManager.getLogger(ApiServerAdapterV2.class);
private final APIServer<CONNECTION, Request, Response, API> server;
private final JsonObjectMapper jsonObjectMapper;
private final ApiResponseBuilder apiResponseBuilder;
public ApiServerAdapterV2(
APIServer<CONNECTION, Request, Response, API> server,
JsonObjectMapper jsonObjectMapper)
{
this.server = server;
this.jsonObjectMapper = jsonObjectMapper;
this.apiResponseBuilder = new ApiResponseBuilder();
}
public static HttpMethod getHttpMethod(String apiMethod)
{
switch (apiMethod)
{
case "list":
case "read":
case "isSessionValid":
return GET; // all parameters from GET methods come on the query string
case "write":
case "move":
case "copy":
case "login":
case "logout":
case "begin":
case "prepare":
case "commit":
case "rollback":
case "recover":
return POST; // all parameters from POST methods come on the body
case "delete":
return HttpMethod.DELETE; // all parameters from DELETE methods come on the body
}
throw new UnsupportedOperationException("This line SHOULD NOT be unreachable!");
}
public boolean isValidMethod(HttpMethod givenMethod, String apiMethod)
{
HttpMethod correctMethod = getHttpMethod(apiMethod);
return correctMethod == givenMethod;
}
public HttpResponse process(HttpMethod httpMethod, Map<String, List<String>> parameters,
byte[] requestBody)
{
try
{
logger.traceAccess(null);
PerformanceAuditor performanceAuditor = new PerformanceAuditor();
if (httpMethod != GET && httpMethod != POST && httpMethod != DELETE)
{
return getHTTPResponse(new ApiResponse("1", null,
HTTPExceptions.INVALID_HTTP_METHOD.getCause()));
}
String method = null;
String sessionToken = null;
String interactiveSessionKey = null;
String transactionManagerKey = null;
Map<String, Object> methodParameters = new HashMap<>();
for (Map.Entry<String, List<String>> entry : parameters.entrySet())
{
String value = null;
if (entry.getValue() != null)
{
if (entry.getValue().size() == 1)
{
value = entry.getValue().get(0);
} else if (entry.getValue().size() > 1)
{
return getHTTPResponse(new ApiResponse("1", null,
HTTPExceptions.INVALID_PARAMETERS.getCause()));
}
}
try
{
switch (entry.getKey())
{
case "method":
method = value;
if (!isValidMethod(httpMethod, method))
{
return getHTTPResponse(new ApiResponse("1", null,
HTTPExceptions.INVALID_HTTP_METHOD.getCause()));
}
break;
case "sessionToken":
sessionToken = value;
break;
case "interactiveSessionKey":
interactiveSessionKey = value;
break;
case "transactionManagerKey":
transactionManagerKey = value;
break;
case "transactionId":
methodParameters.put(entry.getKey(), UUID.fromString(value));
break;
case "recursively":
methodParameters.put(entry.getKey(), Boolean.valueOf(value));
break;
case "offset":
methodParameters.put(entry.getKey(), Long.valueOf(value));
break;
case "limit":
methodParameters.put(entry.getKey(), Integer.valueOf(value));
break;
case "data":
methodParameters.put(entry.getKey(), Base64.getDecoder().decode(value));
break;
case "md5Hash":
methodParameters.put(entry.getKey(), Base64.getDecoder().decode(value));
break;
default:
methodParameters.put(entry.getKey(), value);
break;
}
} catch (Exception e)
{
logger.catching(e);
return getHTTPResponse(new ApiResponse("1", null,
HTTPExceptions.INVALID_PARAMETERS.getCause(
e.getClass().getSimpleName(),
e.getMessage())));
}
}
ApiRequest apiRequest = new ApiRequest("1", method, methodParameters, sessionToken,
interactiveSessionKey, transactionManagerKey);
Response response = server.processOperation(apiRequest, apiResponseBuilder, performanceAuditor);
HttpResponse httpResponse = getHTTPResponse(response);
performanceAuditor.audit(Event.WriteResponse);
logger.traceExit(performanceAuditor);
logger.traceExit(httpResponse);
return httpResponse;
} catch (APIServerException e)
{
logger.catching(e);
switch (e.getType())
{
case MethodNotFound:
case IncorrectParameters:
case InternalError:
try
{
return getHTTPResponse(new ApiResponse("1", null, e.getData()));
} catch (Exception ex)
{
logger.catching(ex);
}
}
} catch (Exception e)
{
logger.catching(e);
try
{
return getHTTPResponse(new ApiResponse("1", null,
HTTPExceptions.UNKNOWN.getCause(e.getClass().getSimpleName(),
e.getMessage())));
} catch (Exception ex)
{
logger.catching(ex);
}
}
return null; // This should never happen, it would mean an error writing the Unknown error happened.
}
public HttpResponse getHTTPResponse(Response response)
throws Exception
{
boolean error = response.getError() != null;
String contentType = null;
byte[] body = null;
if (response.getResult() instanceof List) {
contentType = HttpResponse.CONTENT_TYPE_JSON;
body = jsonObjectMapper.writeValue(response);
} else if(response.getResult() instanceof byte[]) {
contentType = HttpResponse.CONTENT_TYPE_BINARY_DATA;
body = (byte[]) response.getResult();
} else {
contentType = HttpResponse.CONTENT_TYPE_TEXT;
body = String.valueOf(response.getResult()).getBytes(StandardCharsets.UTF_8);
}
return new HttpResponse(error, contentType, body);
}
}
\ No newline at end of file
/*
* Copyright ETH 2022 - 2023 Zürich, Scientific IT Services
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.ethz.sis.afsserver.core;
import ch.ethz.sis.afsapi.api.PublicAPI;
import ch.ethz.sis.afsapi.dto.File;
import lombok.NonNull;
import java.util.List;
import java.util.Map;
import java.util.UUID;
public abstract class AbstractPublicAPIWrapperV2 implements PublicAPI
{
public abstract <E> E process(Class<E> responseType, String method, Map<String, Object> params);
@Override
public List<File> list(@NonNull String owner, @NonNull String source,
@NonNull Boolean recursively) throws Exception
{
Map<String, Object> args = Map.of(
"owner", owner,
"source", source,
"recursively", recursively);
return process(List.class, "list", args);
}
@Override
public byte[] read(@NonNull String owner, @NonNull String source, @NonNull Long offset,
@NonNull Integer limit) throws Exception
{
Map<String, Object> args = Map.of(
"owner", owner,
"source", source,
"offset", offset,
"limit", limit);
return process(byte[].class,"read", args);
}
@Override
public Boolean write(@NonNull String owner, @NonNull String source, @NonNull Long offset,
@NonNull byte[] data, @NonNull byte[] md5Hash) throws Exception
{
Map<String, Object> args = Map.of(
"owner", owner,
"source", source,
"offset", offset,
"data", data,
"md5Hash", md5Hash);
return process(Boolean.class, "write", args);
}
@Override
public Boolean delete(@NonNull String owner, @NonNull String source) throws Exception
{
Map<String, Object> args = Map.of(
"owner", owner,
"source", source);
return process(Boolean.class, "delete", args);
}
@Override
public Boolean copy(@NonNull String sourceOwner, @NonNull String source,
@NonNull String targetOwner, @NonNull String target) throws Exception
{
Map<String, Object> args = Map.of(
"sourceOwner", sourceOwner,
"source", source,
"targetOwner", targetOwner,
"target", target);
return process(Boolean.class,"copy", args);
}
@Override
public Boolean move(@NonNull String sourceOwner, @NonNull String source,
@NonNull String targetOwner, @NonNull String target) throws Exception
{
Map<String, Object> args = Map.of(
"sourceOwner", sourceOwner,
"source", source,
"targetOwner", targetOwner,
"target", target);
return process(Boolean.class, "move", args);
}
@Override
public void begin(UUID transactionId) throws Exception
{
//TODO: Unused
}
@Override
public Boolean prepare() throws Exception
{
//TODO: Unused
return true;
}
@Override
public void commit() throws Exception
{
//TODO: Unused
}
@Override
public void rollback() throws Exception
{
//TODO: Unused
}
@Override
public List<UUID> recover() throws Exception
{
//TODO: Unused
return null;
}
@Override
public String login(String userId, String password) throws Exception
{
//TODO: Unused
return null;
}
@Override
public Boolean isSessionValid() throws Exception
{
//TODO: Unused
return null;
}
@Override
public Boolean logout() throws Exception
{
//TODO: Unused
return null;
}
}
/*
* Copyright ETH 2022 - 2023 Zürich, Scientific IT Services
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.ethz.sis.afsserver.impl;
import ch.ethz.sis.afsclient.client.AfsClientV2;
import ch.ethz.sis.afsserver.core.AbstractPublicAPIWrapperV2;
import ch.ethz.sis.afsserver.http.HttpResponse;
import ch.ethz.sis.afsserver.server.impl.ApiServerAdapter;
import ch.ethz.sis.shared.io.IOUtils;
import ch.ethz.sis.shared.log.LogManager;
import ch.ethz.sis.shared.log.Logger;
import io.netty.handler.codec.http.HttpMethod;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
public class APIServerAdapterWrapperV2 extends AbstractPublicAPIWrapperV2
{
private static final Logger logger = LogManager.getLogger(APIServerAdapterWrapperV2.class);
private ApiServerAdapter apiServerAdapter;
public APIServerAdapterWrapperV2(ApiServerAdapter apiServerAdapter)
{
this.apiServerAdapter = apiServerAdapter;
}
public Map<String, List<String>> getURIParameters(Map<String, Object> args)
{
Map<String, List<String>> result = new HashMap<>(args.size());
for (Map.Entry<String, Object> entry : args.entrySet())
{
if (entry.getValue() instanceof byte[])
{
result.put(entry.getKey(), List.of(IOUtils.encodeBase64((byte[]) entry.getValue())));
} else
{
result.put(entry.getKey(), List.of(String.valueOf(entry.getValue())));
}
}
return result;
}
public <E> E process(Class<E> responseType, String apiMethod, Map<String, Object> params)
{
try
{
HttpMethod httpMethod = ApiServerAdapter.getHttpMethod(apiMethod);
Map<String, List<String>> requestParameters = getURIParameters(params);
requestParameters.put("sessionToken", List.of(UUID.randomUUID().toString()));
requestParameters.put("method", List.of(apiMethod));
byte[] requestBody = null;
if (HttpMethod.GET.equals(httpMethod))
{
// Do nothing
} else if (HttpMethod.POST.equals(httpMethod) || HttpMethod.DELETE.equals(httpMethod))
{
// Do nothing
} else
{
throw new IllegalArgumentException("Not supported HTTP method type!");
}
HttpResponse response = apiServerAdapter.process(httpMethod, requestParameters, null);
String contentType = response.getContentType();
byte[] body = response.getBody();
return AfsClientV2.getResponseResult(responseType, contentType, body);
} catch (Throwable throwable)
{
throw new RuntimeException(throwable);
}
}
}
/*
* Copyright ETH 2022 - 2023 Zürich, Scientific IT Services
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.ethz.sis.afsserver.impl;
import ch.ethz.sis.afsserver.core.AbstractPublicAPIWrapperV2;
import ch.ethz.sis.afsserver.server.APIServer;
import ch.ethz.sis.afsserver.server.Response;
import ch.ethz.sis.afsserver.server.impl.ApiRequest;
import ch.ethz.sis.afsserver.server.impl.ApiResponseBuilder;
import ch.ethz.sis.afsserver.server.performance.PerformanceAuditor;
import ch.ethz.sis.shared.log.LogManager;
import ch.ethz.sis.shared.log.Logger;
import java.util.Map;
import java.util.UUID;
public class APIServerWrapperV2 extends AbstractPublicAPIWrapperV2
{
private static final Logger logger = LogManager.getLogger(APIServerWrapperV2.class);
private APIServer apiServer;
private final ApiResponseBuilder apiResponseBuilder;
public APIServerWrapperV2(APIServer apiServer) {
this.apiServer = apiServer;
this.apiResponseBuilder = new ApiResponseBuilder();
}
public <E> E process(Class<E> responseType, String method, Map<String, Object> params) {
PerformanceAuditor performanceAuditor = new PerformanceAuditor();
// Random Session token just works for tests with dummy authentication
ApiRequest request = new ApiRequest("test", method, params, UUID.randomUUID().toString(), null, null);
try {
Response response = apiServer.processOperation(request, apiResponseBuilder, performanceAuditor);
return (E) response.getResult();
} catch (Throwable throwable) {
throw new RuntimeException(throwable);
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment