diff --git a/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/FederatedStore.java b/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/FederatedStore.java index ad874f8c810..11962a4b502 100644 --- a/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/FederatedStore.java +++ b/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/FederatedStore.java @@ -16,6 +16,10 @@ package uk.gov.gchq.gaffer.federated.simple; +import uk.gov.gchq.gaffer.cache.Cache; +import uk.gov.gchq.gaffer.cache.CacheServiceLoader; +import uk.gov.gchq.gaffer.cache.exception.CacheOperationException; +import uk.gov.gchq.gaffer.commonutil.exception.OverwritingException; import uk.gov.gchq.gaffer.core.exception.GafferRuntimeException; import uk.gov.gchq.gaffer.data.element.Element; import uk.gov.gchq.gaffer.data.element.id.EntityId; @@ -60,18 +64,21 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static uk.gov.gchq.gaffer.cache.CacheServiceLoader.DEFAULT_SERVICE_NAME; + /** * The federated store implementation. Provides the set up and required * methods to enable a {@link Store} that will delegate {@link Operation}s * to sub graphs then merge the result. */ public class FederatedStore extends Store { + private static final String DEFAULT_CACHE_CLASS_FALLBACK = "uk.gov.gchq.gaffer.cache.impl.HashMapCacheService"; // Default graph IDs to execute on private List defaultGraphIds = new LinkedList<>(); - // Cached list of available graphs - private final List graphs = new LinkedList<>(); + // Gaffer cache of available graphs + private Cache graphCache; // Store specific handlers public final Map, OperationHandler> storeHandlers = Stream.of( @@ -89,24 +96,31 @@ public class FederatedStore extends Store { * @throws IllegalArgumentException If there is already a graph with the supplied ID */ public void addGraph(final GraphSerialisable graph) { - // Make sure graph ID doesn't already exist - if (graphs.stream().map(GraphSerialisable::getGraphId).anyMatch(id -> id.equals(graph.getGraphId()))) { + try { + // Add safely to the cache + graphCache.getCache().putSafe(graph.getGraphId(), graph); + } catch (final CacheOperationException e) { + // Unknown issue adding to cache + throw new GafferRuntimeException(e.getMessage(), e); + } catch (final OverwritingException e) { + // Notify that the graph ID is already in use throw new IllegalArgumentException( - "A graph with Graph ID: '" + graph.getGraphId() + "' has already been added to this store"); + "A graph with Graph ID: '" + graph.getGraphId() + "' has already been added to this store", e); } - graphs.add(new GraphSerialisable(graph.getConfig(), graph.getSchema(), graph.getStoreProperties())); } /** * Remove a graph from the scope of this store. * * @param graphId The graph ID of the graph to remove. - * - * @throws IllegalArgumentException If graph not found. + * @throws IllegalArgumentException If graph does not exist. */ public void removeGraph(final String graphId) { - GraphSerialisable graphToRemove = getGraph(graphId); - graphs.remove(graphToRemove); + if (!graphCache.contains(graphId)) { + throw new IllegalArgumentException( + "Graph with Graph ID: '" + graphId + "' is not available to this federated store"); + } + graphCache.deleteFromCache(graphId); } /** @@ -115,25 +129,25 @@ public void removeGraph(final String graphId) { * * @param graphId The graph ID * @return The {@link GraphSerialisable} relating to the ID. - * + * @throws CacheOperationException If issue getting from cache. * @throws IllegalArgumentException If graph not found. */ - public GraphSerialisable getGraph(final String graphId) { - for (final GraphSerialisable graph : graphs) { - if (graph.getGraphId().equals(graphId)) { - return graph; - } + public GraphSerialisable getGraph(final String graphId) throws CacheOperationException { + GraphSerialisable graph = graphCache.getFromCache(graphId); + if (graph == null) { + throw new IllegalArgumentException( + "Graph with Graph ID: '" + graphId + "' is not available to this federated store"); } - throw new IllegalArgumentException("Graph with Graph ID: '" + graphId + "' is not available to this federated store"); + return graph; } /** - * Returns a list of all the graphs available to this store. + * Returns all the graphs available to this store. * - * @return List of {@link GraphSerialisable}s + * @return Iterable of {@link GraphSerialisable}s */ - public List getAllGraphs() { - return graphs; + public Iterable getAllGraphs() { + return graphCache.getCache().getAllValues(); } /** @@ -193,6 +207,8 @@ public void initialise(final String graphId, final Schema unused, final StorePro throw new IllegalArgumentException("Federated store should not be initialised with a Schema"); } super.initialise(graphId, new Schema(), properties); + + graphCache = new Cache<>("federatedGraphCache-" + graphId); } @Override @@ -268,4 +284,16 @@ protected OutputOperationHandler> getGetTraitsHandler protected Class getRequiredParentSerialiserClass() { return ToBytesSerialiser.class; } + + @Override + protected void startCacheServiceLoader(final StoreProperties properties) { + super.startCacheServiceLoader(properties); + // If default not setup then initialise cache as its needed for storing graphs + if (!CacheServiceLoader.isDefaultEnabled()) { + CacheServiceLoader.initialise( + DEFAULT_SERVICE_NAME, + DEFAULT_CACHE_CLASS_FALLBACK, + properties.getProperties()); + } + } } diff --git a/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/merge/DefaultResultAccumulator.java b/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/merge/DefaultResultAccumulator.java index 5c5c7b09834..b9d35cb13d5 100644 --- a/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/merge/DefaultResultAccumulator.java +++ b/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/merge/DefaultResultAccumulator.java @@ -75,7 +75,9 @@ public T apply(final T update, final T state) { } // By default just chain iterables together - return (T) IterableUtils.chainedIterable((Iterable) state, updateIterable); + // (need to use the iterator to make sure the FluentIterable under the hood serialises correctly) + Iterable chained = () -> IterableUtils.chainedIterable((Iterable) state, updateIterable).iterator(); + return (T) chained; } return update; diff --git a/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/FederatedOperationHandler.java b/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/FederatedOperationHandler.java index df43aa73323..fe1f8d0ff2f 100644 --- a/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/FederatedOperationHandler.java +++ b/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/FederatedOperationHandler.java @@ -16,6 +16,7 @@ package uk.gov.gchq.gaffer.federated.simple.operation.handler; +import uk.gov.gchq.gaffer.cache.exception.CacheOperationException; import uk.gov.gchq.gaffer.federated.simple.FederatedStore; import uk.gov.gchq.gaffer.graph.GraphSerialisable; import uk.gov.gchq.gaffer.operation.Operation; @@ -113,8 +114,9 @@ public Object doOperation(final P operation, final Context context, final Store * @param store The federated store. * @param operation The operation to execute. * @return List of {@link GraphSerialisable}s to execute on. + * @throws OperationException Fail to get Graphs. */ - protected List getGraphsToExecuteOn(final FederatedStore store, final Operation operation) { + protected List getGraphsToExecuteOn(final FederatedStore store, final Operation operation) throws OperationException { List graphIds = store.getDefaultGraphIds(); List graphsToExecute = new LinkedList<>(); // If user specified graph IDs for this chain parse as comma separated list @@ -124,11 +126,16 @@ protected List getGraphsToExecuteOn(final FederatedStore stor graphIds = Arrays.asList(operation.getOption(OPT_SHORT_GRAPH_IDS).split(",")); } - // Get the corresponding graph serialisable - for (final String id : graphIds) { - graphsToExecute.add(store.getGraph(id)); + try { + // Get the corresponding graph serialisable + for (final String id : graphIds) { + graphsToExecute.add(store.getGraph(id)); + } + } catch (final CacheOperationException e) { + throw new OperationException("Failed to get Graphs from cache", e); } + // Keep graphs sorted so results returned are predictable between runs Collections.sort(graphsToExecute, (g1, g2) -> g1.getGraphId().compareTo(g2.getGraphId())); diff --git a/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/get/GetAllGraphIdsHandler.java b/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/get/GetAllGraphIdsHandler.java index df48568c773..a3f393b9885 100644 --- a/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/get/GetAllGraphIdsHandler.java +++ b/store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/get/GetAllGraphIdsHandler.java @@ -26,13 +26,14 @@ import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; public class GetAllGraphIdsHandler implements OutputOperationHandler> { @Override public Set doOperation(final GetAllGraphIds operation, final Context context, final Store store) throws OperationException { // Get all the graphs and convert to a set of just IDs - return ((FederatedStore) store).getAllGraphs().stream() + return StreamSupport.stream(((FederatedStore) store).getAllGraphs().spliterator(), false) .map(GraphSerialisable::getGraphId) .collect(Collectors.toSet()); } diff --git a/store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/FederatedStoreIT.java b/store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/FederatedStoreIT.java index cf531e26b3a..59c073e2ceb 100644 --- a/store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/FederatedStoreIT.java +++ b/store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/FederatedStoreIT.java @@ -16,8 +16,10 @@ package uk.gov.gchq.gaffer.federated.simple; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import uk.gov.gchq.gaffer.cache.CacheServiceLoader; import uk.gov.gchq.gaffer.data.element.Element; import uk.gov.gchq.gaffer.data.element.Entity; import uk.gov.gchq.gaffer.data.element.Properties; @@ -44,6 +46,11 @@ class FederatedStoreIT { + @AfterEach + void reset() { + CacheServiceLoader.shutdown(); + } + @Test void shouldFederateElementsByAggregation() throws StoreException, OperationException { // Given diff --git a/store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/FederatedStoreTest.java b/store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/FederatedStoreTest.java index 1330a9e91c7..5413f21de29 100644 --- a/store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/FederatedStoreTest.java +++ b/store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/FederatedStoreTest.java @@ -16,8 +16,11 @@ package uk.gov.gchq.gaffer.federated.simple; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import uk.gov.gchq.gaffer.cache.CacheServiceLoader; +import uk.gov.gchq.gaffer.cache.exception.CacheOperationException; import uk.gov.gchq.gaffer.federated.simple.util.ModernDatasetUtils; import uk.gov.gchq.gaffer.federated.simple.util.ModernDatasetUtils.StoreType; import uk.gov.gchq.gaffer.graph.Graph; @@ -31,6 +34,11 @@ class FederatedStoreTest { + @AfterEach + void reset() { + CacheServiceLoader.shutdown(); + } + @Test void shouldInitialiseNewStore() throws StoreException { String graphId = "federated"; @@ -54,7 +62,7 @@ void shouldNotInitialiseWithSchema() { } @Test - void shouldAddAndGetGraphsViaStoreInterface() throws StoreException { + void shouldAddAndGetGraphsViaStoreInterface() throws StoreException, CacheOperationException { // Given final String federatedGraphId = "federated"; final String graphId1 = "graph1"; diff --git a/store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/operation/AddGraphTest.java b/store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/operation/AddGraphTest.java index 4c9ae0a4fa6..4ac4d3ebfe6 100644 --- a/store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/operation/AddGraphTest.java +++ b/store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/operation/AddGraphTest.java @@ -17,8 +17,11 @@ package uk.gov.gchq.gaffer.federated.simple.operation; import org.json.JSONObject; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import uk.gov.gchq.gaffer.cache.CacheServiceLoader; +import uk.gov.gchq.gaffer.cache.exception.CacheOperationException; import uk.gov.gchq.gaffer.exception.SerialisationException; import uk.gov.gchq.gaffer.federated.simple.FederatedStore; import uk.gov.gchq.gaffer.graph.GraphConfig; @@ -38,8 +41,13 @@ class AddGraphTest { + @AfterEach + void reset() { + CacheServiceLoader.shutdown(); + } + @Test - void shouldAddGraphUsingBuilder() throws StoreException, OperationException { + void shouldAddGraphUsingBuilder() throws StoreException, OperationException, CacheOperationException { // Given final String federatedGraphId = "federated"; final String graphId = "newGraph"; @@ -75,7 +83,7 @@ void shouldAddGraphUsingBuilder() throws StoreException, OperationException { } @Test - void shouldAddGraphUsingJSONSerialisation() throws StoreException, OperationException, SerialisationException { + void shouldAddGraphUsingJSONSerialisation() throws StoreException, OperationException, SerialisationException, CacheOperationException { // Given final String federatedGraphId = "federated"; final String graphId = "newGraph"; diff --git a/store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/operation/RemoveGraphTest.java b/store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/operation/RemoveGraphTest.java index 3102ac55340..03691f79da1 100644 --- a/store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/operation/RemoveGraphTest.java +++ b/store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/operation/RemoveGraphTest.java @@ -16,8 +16,10 @@ package uk.gov.gchq.gaffer.federated.simple.operation; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import uk.gov.gchq.gaffer.cache.CacheServiceLoader; import uk.gov.gchq.gaffer.data.element.Element; import uk.gov.gchq.gaffer.data.element.Entity; import uk.gov.gchq.gaffer.data.element.Properties; @@ -38,6 +40,11 @@ class RemoveGraphTest { + @AfterEach + void reset() { + CacheServiceLoader.shutdown(); + } + @Test void shouldRemoveGraphAndPreserveData() throws StoreException, OperationException { // Given