Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gh 3288: Store Graphs in cache in federated POC #3289

Merged
merged 4 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package uk.gov.gchq.gaffer.federated.simple;

import uk.gov.gchq.gaffer.cache.Cache;
import uk.gov.gchq.gaffer.cache.CacheServiceLoader;
import uk.gov.gchq.gaffer.cache.exception.CacheOperationException;
import uk.gov.gchq.gaffer.commonutil.exception.OverwritingException;
import uk.gov.gchq.gaffer.core.exception.GafferRuntimeException;
import uk.gov.gchq.gaffer.data.element.Element;
import uk.gov.gchq.gaffer.data.element.id.EntityId;
Expand Down Expand Up @@ -60,18 +64,21 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static uk.gov.gchq.gaffer.cache.CacheServiceLoader.DEFAULT_SERVICE_NAME;

/**
* The federated store implementation. Provides the set up and required
* methods to enable a {@link Store} that will delegate {@link Operation}s
* to sub graphs then merge the result.
*/
public class FederatedStore extends Store {
private static final String DEFAULT_CACHE_CLASS_FALLBACK = "uk.gov.gchq.gaffer.cache.impl.HashMapCacheService";

// Default graph IDs to execute on
private List<String> defaultGraphIds = new LinkedList<>();

// Cached list of available graphs
private final List<GraphSerialisable> graphs = new LinkedList<>();
// Gaffer cache of available graphs
private Cache<String, GraphSerialisable> graphCache;

// Store specific handlers
public final Map<Class<? extends Operation>, OperationHandler<?>> storeHandlers = Stream.of(
Expand All @@ -89,24 +96,31 @@
* @throws IllegalArgumentException If there is already a graph with the supplied ID
*/
public void addGraph(final GraphSerialisable graph) {
// Make sure graph ID doesn't already exist
if (graphs.stream().map(GraphSerialisable::getGraphId).anyMatch(id -> id.equals(graph.getGraphId()))) {
try {
// Add safely to the cache
graphCache.getCache().putSafe(graph.getGraphId(), graph);
} catch (final CacheOperationException e) {

Check warning on line 102 in store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/FederatedStore.java

View check run for this annotation

Codecov / codecov/patch

store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/FederatedStore.java#L102

Added line #L102 was not covered by tests
// Unknown issue adding to cache
throw new GafferRuntimeException(e.getMessage(), e);

Check warning on line 104 in store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/FederatedStore.java

View check run for this annotation

Codecov / codecov/patch

store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/FederatedStore.java#L104

Added line #L104 was not covered by tests
} catch (final OverwritingException e) {
// Notify that the graph ID is already in use
throw new IllegalArgumentException(
"A graph with Graph ID: '" + graph.getGraphId() + "' has already been added to this store");
"A graph with Graph ID: '" + graph.getGraphId() + "' has already been added to this store", e);
}
graphs.add(new GraphSerialisable(graph.getConfig(), graph.getSchema(), graph.getStoreProperties()));
}

/**
* Remove a graph from the scope of this store.
*
* @param graphId The graph ID of the graph to remove.
*
* @throws IllegalArgumentException If graph not found.
* @throws IllegalArgumentException If graph does not exist.
*/
public void removeGraph(final String graphId) {
GraphSerialisable graphToRemove = getGraph(graphId);
graphs.remove(graphToRemove);
if (!graphCache.contains(graphId)) {
throw new IllegalArgumentException(
"Graph with Graph ID: '" + graphId + "' is not available to this federated store");
}
graphCache.deleteFromCache(graphId);
}

/**
Expand All @@ -115,25 +129,25 @@
*
* @param graphId The graph ID
* @return The {@link GraphSerialisable} relating to the ID.
*
* @throws CacheOperationException If issue getting from cache.
* @throws IllegalArgumentException If graph not found.
*/
public GraphSerialisable getGraph(final String graphId) {
for (final GraphSerialisable graph : graphs) {
if (graph.getGraphId().equals(graphId)) {
return graph;
}
public GraphSerialisable getGraph(final String graphId) throws CacheOperationException {
GraphSerialisable graph = graphCache.getFromCache(graphId);
if (graph == null) {
throw new IllegalArgumentException(
"Graph with Graph ID: '" + graphId + "' is not available to this federated store");
}
throw new IllegalArgumentException("Graph with Graph ID: '" + graphId + "' is not available to this federated store");
return graph;
}

/**
* Returns a list of all the graphs available to this store.
* Returns all the graphs available to this store.
*
* @return List of {@link GraphSerialisable}s
* @return Iterable of {@link GraphSerialisable}s
*/
public List<GraphSerialisable> getAllGraphs() {
return graphs;
public Iterable<GraphSerialisable> getAllGraphs() {
return graphCache.getCache().getAllValues();
}

/**
Expand Down Expand Up @@ -193,6 +207,8 @@
throw new IllegalArgumentException("Federated store should not be initialised with a Schema");
}
super.initialise(graphId, new Schema(), properties);

graphCache = new Cache<>("federatedGraphCache-" + graphId);
}

@Override
Expand Down Expand Up @@ -268,4 +284,16 @@
protected Class<? extends Serialiser> getRequiredParentSerialiserClass() {
return ToBytesSerialiser.class;
}

@Override
protected void startCacheServiceLoader(final StoreProperties properties) {
super.startCacheServiceLoader(properties);
// If default not setup then initialise cache as its needed for storing graphs
if (!CacheServiceLoader.isDefaultEnabled()) {
CacheServiceLoader.initialise(
DEFAULT_SERVICE_NAME,
DEFAULT_CACHE_CLASS_FALLBACK,
properties.getProperties());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ public T apply(final T update, final T state) {
}

// By default just chain iterables together
return (T) IterableUtils.chainedIterable((Iterable<?>) state, updateIterable);
// (need to use the iterator to make sure the FluentIterable under the hood serialises correctly)
Iterable<Object> chained = () -> IterableUtils.chainedIterable((Iterable<?>) state, updateIterable).iterator();
return (T) chained;
}

return update;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package uk.gov.gchq.gaffer.federated.simple.operation.handler;

import uk.gov.gchq.gaffer.cache.exception.CacheOperationException;
import uk.gov.gchq.gaffer.federated.simple.FederatedStore;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.Operation;
Expand Down Expand Up @@ -113,8 +114,9 @@
* @param store The federated store.
* @param operation The operation to execute.
* @return List of {@link GraphSerialisable}s to execute on.
* @throws OperationException Fail to get Graphs.
*/
protected List<GraphSerialisable> getGraphsToExecuteOn(final FederatedStore store, final Operation operation) {
protected List<GraphSerialisable> getGraphsToExecuteOn(final FederatedStore store, final Operation operation) throws OperationException {
List<String> graphIds = store.getDefaultGraphIds();
List<GraphSerialisable> graphsToExecute = new LinkedList<>();
// If user specified graph IDs for this chain parse as comma separated list
Expand All @@ -124,11 +126,16 @@
graphIds = Arrays.asList(operation.getOption(OPT_SHORT_GRAPH_IDS).split(","));
}

// Get the corresponding graph serialisable
for (final String id : graphIds) {
graphsToExecute.add(store.getGraph(id));
try {
// Get the corresponding graph serialisable
for (final String id : graphIds) {
graphsToExecute.add(store.getGraph(id));
}
} catch (final CacheOperationException e) {
throw new OperationException("Failed to get Graphs from cache", e);

Check warning on line 135 in store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/FederatedOperationHandler.java

View check run for this annotation

Codecov / codecov/patch

store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/FederatedOperationHandler.java#L134-L135

Added lines #L134 - L135 were not covered by tests
}


// Keep graphs sorted so results returned are predictable between runs
Collections.sort(graphsToExecute, (g1, g2) -> g1.getGraphId().compareTo(g2.getGraphId()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@

import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class GetAllGraphIdsHandler implements OutputOperationHandler<GetAllGraphIds, Set<String>> {

@Override
public Set<String> doOperation(final GetAllGraphIds operation, final Context context, final Store store) throws OperationException {
// Get all the graphs and convert to a set of just IDs
return ((FederatedStore) store).getAllGraphs().stream()
return StreamSupport.stream(((FederatedStore) store).getAllGraphs().spliterator(), false)
.map(GraphSerialisable::getGraphId)
.collect(Collectors.toSet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package uk.gov.gchq.gaffer.federated.simple;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import uk.gov.gchq.gaffer.cache.CacheServiceLoader;
import uk.gov.gchq.gaffer.data.element.Element;
import uk.gov.gchq.gaffer.data.element.Entity;
import uk.gov.gchq.gaffer.data.element.Properties;
Expand All @@ -44,6 +46,11 @@

class FederatedStoreIT {

@AfterEach
void reset() {
CacheServiceLoader.shutdown();
}

@Test
void shouldFederateElementsByAggregation() throws StoreException, OperationException {
// Given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

package uk.gov.gchq.gaffer.federated.simple;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import uk.gov.gchq.gaffer.cache.CacheServiceLoader;
import uk.gov.gchq.gaffer.cache.exception.CacheOperationException;
import uk.gov.gchq.gaffer.federated.simple.util.ModernDatasetUtils;
import uk.gov.gchq.gaffer.federated.simple.util.ModernDatasetUtils.StoreType;
import uk.gov.gchq.gaffer.graph.Graph;
Expand All @@ -31,6 +34,11 @@

class FederatedStoreTest {

@AfterEach
void reset() {
CacheServiceLoader.shutdown();
}

@Test
void shouldInitialiseNewStore() throws StoreException {
String graphId = "federated";
Expand All @@ -54,7 +62,7 @@ void shouldNotInitialiseWithSchema() {
}

@Test
void shouldAddAndGetGraphsViaStoreInterface() throws StoreException {
void shouldAddAndGetGraphsViaStoreInterface() throws StoreException, CacheOperationException {
// Given
final String federatedGraphId = "federated";
final String graphId1 = "graph1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package uk.gov.gchq.gaffer.federated.simple.operation;

import org.json.JSONObject;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import uk.gov.gchq.gaffer.cache.CacheServiceLoader;
import uk.gov.gchq.gaffer.cache.exception.CacheOperationException;
import uk.gov.gchq.gaffer.exception.SerialisationException;
import uk.gov.gchq.gaffer.federated.simple.FederatedStore;
import uk.gov.gchq.gaffer.graph.GraphConfig;
Expand All @@ -38,8 +41,13 @@

class AddGraphTest {

@AfterEach
void reset() {
CacheServiceLoader.shutdown();
}

@Test
void shouldAddGraphUsingBuilder() throws StoreException, OperationException {
void shouldAddGraphUsingBuilder() throws StoreException, OperationException, CacheOperationException {
// Given
final String federatedGraphId = "federated";
final String graphId = "newGraph";
Expand Down Expand Up @@ -75,7 +83,7 @@ void shouldAddGraphUsingBuilder() throws StoreException, OperationException {
}

@Test
void shouldAddGraphUsingJSONSerialisation() throws StoreException, OperationException, SerialisationException {
void shouldAddGraphUsingJSONSerialisation() throws StoreException, OperationException, SerialisationException, CacheOperationException {
// Given
final String federatedGraphId = "federated";
final String graphId = "newGraph";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package uk.gov.gchq.gaffer.federated.simple.operation;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import uk.gov.gchq.gaffer.cache.CacheServiceLoader;
import uk.gov.gchq.gaffer.data.element.Element;
import uk.gov.gchq.gaffer.data.element.Entity;
import uk.gov.gchq.gaffer.data.element.Properties;
Expand All @@ -38,6 +40,11 @@

class RemoveGraphTest {

@AfterEach
void reset() {
CacheServiceLoader.shutdown();
}

@Test
void shouldRemoveGraphAndPreserveData() throws StoreException, OperationException {
// Given
Expand Down
Loading