Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gh-3308: Specify merge classes federated POC #3311

Merged
merged 5 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,27 @@ public class FederatedStoreProperties extends StoreProperties {
/**
* Property key for the class to use when merging number results
*/
public static final String PROP_MERGE_CLASS_NUMBER = "gaffer.store.federated.mergeClass.number";
public static final String PROP_MERGE_CLASS_NUMBER = "gaffer.store.federated.merge.number.class";
/**
* Property key for the class to use when merging string results
*/
public static final String PROP_MERGE_CLASS_STRING = "gaffer.store.federated.mergeClass.string";
public static final String PROP_MERGE_CLASS_STRING = "gaffer.store.federated.merge.string.class";
/**
* Property key for the class to use when merging boolean results
*/
public static final String PROP_MERGE_CLASS_BOOLEAN = "gaffer.store.federated.mergeClass.boolean";
public static final String PROP_MERGE_CLASS_BOOLEAN = "gaffer.store.federated.merge.boolean.class";
/**
* Property key for the class to use when merging collection results
*/
public static final String PROP_MERGE_CLASS_COLLECTION = "gaffer.store.federated.mergeClass.collection";
public static final String PROP_MERGE_CLASS_COLLECTION = "gaffer.store.federated.merge.collection.class";
/**
* Property key for the class to use when merging values of a Map result
*/
public static final String PROP_MERGE_CLASS_MAP = "gaffer.store.federated.merge.map.class";
/**
* Property key for the class to use when merging elements
*/
public static final String PROP_MERGE_CLASS_ELEMENTS = "gaffer.store.federated.mergeClass.elements";
public static final String PROP_MERGE_CLASS_ELEMENTS = "gaffer.store.federated.merge.elements.class";

public FederatedStoreProperties() {
super(FederatedStore.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
import org.apache.commons.collections4.IterableUtils;

import uk.gov.gchq.gaffer.data.element.Element;
import uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties;

import java.util.Collection;
import java.util.Map;
import java.util.Properties;

/**
* The default result accumulator for merging results from multiple graphs into one.
Expand All @@ -32,7 +33,7 @@ public DefaultResultAccumulator() {
super();
}

public DefaultResultAccumulator(final FederatedStoreProperties properties) {
public DefaultResultAccumulator(final Properties properties) {
super(properties);
}

Expand Down Expand Up @@ -70,6 +71,11 @@ public T apply(final T update, final T state) {
return (T) this.collectionMergeOperator.apply((Collection<Object>) update, (Collection<Object>) state);
}

// Use configured merger for maps
if (update instanceof Map) {
return (T) this.mapMergeOperator.apply((Map<Object, Object>) update, (Map<Object, Object>) state);
}

// If an iterable try merge them
if (update instanceof Iterable<?>) {
Iterable<?> updateIterable = (Iterable<?>) update;
Expand All @@ -89,6 +95,7 @@ public T apply(final T update, final T state) {
return (T) chained;
}

// Fallback just return the update
return update;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,25 @@
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.data.element.Element;
import uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties;
import uk.gov.gchq.gaffer.federated.simple.merge.operator.ElementAggregateOperator;
import uk.gov.gchq.gaffer.store.schema.Schema;
import uk.gov.gchq.koryphe.binaryoperator.BinaryOperatorMap;
import uk.gov.gchq.koryphe.impl.binaryoperator.And;
import uk.gov.gchq.koryphe.impl.binaryoperator.CollectionConcat;
import uk.gov.gchq.koryphe.impl.binaryoperator.Last;
import uk.gov.gchq.koryphe.impl.binaryoperator.StringConcat;
import uk.gov.gchq.koryphe.impl.binaryoperator.Sum;

import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.function.BinaryOperator;

import static uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties.PROP_DEFAULT_MERGE_ELEMENTS;
import static uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties.PROP_MERGE_CLASS_BOOLEAN;
import static uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties.PROP_MERGE_CLASS_COLLECTION;
import static uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties.PROP_MERGE_CLASS_ELEMENTS;
import static uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties.PROP_MERGE_CLASS_MAP;
import static uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties.PROP_MERGE_CLASS_NUMBER;
import static uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties.PROP_MERGE_CLASS_STRING;

Expand All @@ -51,6 +55,9 @@ public abstract class FederatedResultAccumulator<T> implements BinaryOperator<T>
protected BinaryOperator<Boolean> booleanMergeOperator = new And();
protected BinaryOperator<Collection<Object>> collectionMergeOperator = new CollectionConcat<>();
protected BinaryOperator<Iterable<Element>> elementAggregateOperator = new ElementAggregateOperator();
// For map merging define a sub operator for if values are the same
protected BinaryOperator<Object> mapValueMergeOperator = new Last();
protected BinaryOperator<Map<Object, Object>> mapMergeOperator = new BinaryOperatorMap<>(mapValueMergeOperator);

// Should the element aggregation operator be used, can be slower so disabled by default
protected boolean aggregateElements = false;
Expand All @@ -59,7 +66,7 @@ protected FederatedResultAccumulator() {
// Construct with defaults
}

protected FederatedResultAccumulator(final FederatedStoreProperties properties) {
protected FederatedResultAccumulator(final Properties properties) {
// Use the store properties to configure the merging
if (properties.containsKey(PROP_MERGE_CLASS_NUMBER)) {
numberMergeOperator = loadMergeClass(numberMergeOperator, properties.get(PROP_MERGE_CLASS_NUMBER));
Expand All @@ -76,9 +83,13 @@ protected FederatedResultAccumulator(final FederatedStoreProperties properties)
if (properties.containsKey(PROP_MERGE_CLASS_ELEMENTS)) {
elementAggregateOperator = loadMergeClass(elementAggregateOperator, properties.get(PROP_MERGE_CLASS_ELEMENTS));
}
if (properties.containsKey(PROP_MERGE_CLASS_MAP)) {
mapValueMergeOperator = loadMergeClass(mapValueMergeOperator, properties.get(PROP_MERGE_CLASS_MAP));
mapMergeOperator = new BinaryOperatorMap<>(mapValueMergeOperator);
}
// Should we do element aggregation by default
if (properties.containsKey(PROP_DEFAULT_MERGE_ELEMENTS)) {
setAggregateElements(Boolean.parseBoolean(properties.get(PROP_DEFAULT_MERGE_ELEMENTS)));
setAggregateElements(Boolean.parseBoolean((String) properties.get(PROP_DEFAULT_MERGE_ELEMENTS)));
}
}

Expand Down Expand Up @@ -112,10 +123,10 @@ public void setSchema(final Schema schema) {
}

@SuppressWarnings("unchecked")
private <I> BinaryOperator<I> loadMergeClass(final BinaryOperator<I> originalOperator, final String clazzName) {
private <I> BinaryOperator<I> loadMergeClass(final BinaryOperator<I> originalOperator, final Object clazzName) {
BinaryOperator<I> mergeOperator = originalOperator;
try {
Class<?> clazz = Class.forName(clazzName);
Class<?> clazz = Class.forName((String) clazzName);
mergeOperator = (BinaryOperator<I>) clazz.newInstance();
} catch (final ClassCastException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
LOGGER.warn("Failed to load alternative merge function: {} The default will be used instead.", clazzName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* Main default handler for federated operations. Handles delegation to selected
Expand All @@ -58,6 +59,12 @@ public class FederatedOperationHandler<P extends Operation> implements Operation
*/
public static final String OPT_SHORT_GRAPH_IDS = "federated.graphIds";

/**
* Graph IDs to exclude from the execution. If this option is set all graphs
* except the ones specified are executed on.
*/
public static final String OPT_EXCLUDE_GRAPH_IDS = "federated.excludeGraphIds";

/**
* The boolean operation option to specify if element merging should be applied or not.
*/
Expand Down Expand Up @@ -139,6 +146,17 @@ protected List<GraphSerialisable> getGraphsToExecuteOn(final FederatedStore stor
graphIds = Arrays.asList(operation.getOption(OPT_SHORT_GRAPH_IDS).split(","));
}

// If a user has specified to just exclude some graphs then run all but them
if (operation.containsOption(OPT_EXCLUDE_GRAPH_IDS)) {
// Get all the IDs
graphIds = StreamSupport.stream(store.getAllGraphs().spliterator(), false)
.map(GraphSerialisable::getGraphId)
.collect(Collectors.toList());

// Exclude the ones the user has specified
Arrays.asList(operation.getOption(OPT_EXCLUDE_GRAPH_IDS).split(",")).forEach(graphIds::remove);
}

try {
// Get the corresponding graph serialisable
for (final String id : graphIds) {
Expand All @@ -149,7 +167,6 @@ protected List<GraphSerialisable> getGraphsToExecuteOn(final FederatedStore stor
throw new OperationException("Failed to get Graphs from cache", e);
}


// Keep graphs sorted so results returned are predictable between runs
Collections.sort(graphsToExecute, (g1, g2) -> g1.getGraphId().compareTo(g2.getGraphId()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package uk.gov.gchq.gaffer.federated.simple.operation.handler;

import uk.gov.gchq.gaffer.federated.simple.FederatedStore;
import uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties;
import uk.gov.gchq.gaffer.federated.simple.merge.DefaultResultAccumulator;
import uk.gov.gchq.gaffer.federated.simple.merge.FederatedResultAccumulator;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
Expand All @@ -29,6 +28,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
* A sub class operation handler for federation that can process operations that have an
Expand Down Expand Up @@ -57,9 +57,12 @@ public O doOperation(final P operation, final Context context, final Store store
return null;
}

// Merge the store props with the operation options for setting up the accumulator
Properties combinedProps = store.getProperties().getProperties();
combinedProps.putAll(operation.getOptions());

// Set up the result accumulator
FederatedResultAccumulator<O> resultAccumulator =
new DefaultResultAccumulator<>((FederatedStoreProperties) store.getProperties());
FederatedResultAccumulator<O> resultAccumulator = new DefaultResultAccumulator<>(combinedProps);
resultAccumulator.setSchema(((FederatedStore) store).getSchema(graphsToExecute));

if (operation.containsOption(OPT_AGGREGATE_ELEMENTS)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import uk.gov.gchq.gaffer.federated.simple.util.ModernDatasetUtils;
import uk.gov.gchq.gaffer.graph.Graph;
import uk.gov.gchq.gaffer.graph.GraphConfig;
import uk.gov.gchq.gaffer.mapstore.MapStoreProperties;
import uk.gov.gchq.gaffer.operation.OperationChain;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.impl.add.AddElements;
import uk.gov.gchq.gaffer.operation.impl.get.GetAllElements;
import uk.gov.gchq.gaffer.operation.impl.get.GetGraphCreatedTime;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.StoreException;
import uk.gov.gchq.gaffer.store.StoreProperties;
Expand Down Expand Up @@ -175,7 +177,46 @@ void shouldAddAndGetAllGraphs() throws StoreException, OperationException {
final Set<String> graphIds = federatedStore.execute(getAllGraphIds, new Context());

assertThat(graphIds).containsExactly(graphId);
}

@Test
void shouldExecuteOnAllGraphsExceptExcludedOnes() throws StoreException, OperationException {
final String federatedGraphId = "federated";
final String graphId1 = "newGraph1";
final String graphId2 = "newGraph2";
final String graphId3 = "newGraph3";

// When
final FederatedStore federatedStore = new FederatedStore();
federatedStore.initialise(federatedGraphId, null, new StoreProperties());

// AddGraph operations
final AddGraph addGraph1 = new AddGraph.Builder()
.graphConfig(new GraphConfig(graphId1))
.schema(new Schema())
.properties(new MapStoreProperties().getProperties())
.build();
final AddGraph addGraph2 = new AddGraph.Builder()
.graphConfig(new GraphConfig(graphId2))
.schema(new Schema())
.properties(new MapStoreProperties().getProperties())
.build();
final AddGraph addGraph3 = new AddGraph.Builder()
.graphConfig(new GraphConfig(graphId3))
.schema(new Schema())
.properties(new MapStoreProperties().getProperties())
.build();

federatedStore.execute(addGraph1, new Context());
federatedStore.execute(addGraph2, new Context());
federatedStore.execute(addGraph3, new Context());

final GetGraphCreatedTime operation = new GetGraphCreatedTime.Builder()
.option(FederatedOperationHandler.OPT_EXCLUDE_GRAPH_IDS, graphId2)
.build();

Map<String, String> result = federatedStore.execute(operation, new Context());
assertThat(result).containsOnlyKeys(graphId1, graphId3);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,21 @@
import uk.gov.gchq.koryphe.impl.binaryoperator.CollectionIntersect;
import uk.gov.gchq.koryphe.impl.binaryoperator.Or;
import uk.gov.gchq.koryphe.impl.binaryoperator.Product;
import uk.gov.gchq.koryphe.impl.binaryoperator.StringConcat;
import uk.gov.gchq.koryphe.impl.binaryoperator.StringDeduplicateConcat;

import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;

import static uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties.PROP_DEFAULT_MERGE_ELEMENTS;
import static uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties.PROP_MERGE_CLASS_BOOLEAN;
import static uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties.PROP_MERGE_CLASS_COLLECTION;
import static uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties.PROP_MERGE_CLASS_MAP;
import static uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties.PROP_MERGE_CLASS_NUMBER;
import static uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties.PROP_MERGE_CLASS_STRING;

Expand All @@ -55,19 +59,33 @@ static Stream<Arguments> commonDataArgs() {
Arguments.of(
new ArrayList<>(Arrays.asList("a", "b")),
new ArrayList<>(Arrays.asList("c", "d")),
Arrays.asList("a", "b", "c", "d")));
Arrays.asList("a", "b", "c", "d")),
Arguments.of(
Stream.of(new SimpleEntry<String, String>("a", "b"))
.collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)),
Stream.of(new SimpleEntry<String, String>("a", "c"))
.collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)),
Stream.of(new SimpleEntry<String, String>("a", "c"))
.collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue))));
}

static Stream<Arguments> customMergeDataArgs() {
return Stream.of(
Arguments.of(2, 3, 6),
Arguments.of("s1", "s1", "s1"),
Arguments.of(true, false, true),
Arguments.of(true, true, true),
Arguments.of(
new ArrayList<>(Arrays.asList("a", "c")),
new ArrayList<>(Arrays.asList("c", "d")),
Arrays.asList("c")));
Arguments.of(2, 3, 6),
Arguments.of("s1", "s1", "s1"),
Arguments.of(true, false, true),
Arguments.of(true, true, true),
Arguments.of(
new ArrayList<>(Arrays.asList("a", "c")),
new ArrayList<>(Arrays.asList("c", "d")),
Arrays.asList("c")),
Arguments.of(
Stream.of(new SimpleEntry<String, String>("a", "b"))
.collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)),
Stream.of(new SimpleEntry<String, String>("a", "c"))
.collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)),
Stream.of(new SimpleEntry<String, String>("a", "b,c"))
.collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue))));
}

@DisplayName("Should provide default merge methods for common data types")
Expand All @@ -83,11 +101,12 @@ <T> void shouldAccumulatePrimitiveDataByDefault(T result1, T result2, T expected
@MethodSource("customMergeDataArgs")
<T> void shouldAllowCustomOperatorsForPrimitiveData(T result1, T result2, T expected) {
// Set properties to update the operators
FederatedStoreProperties properties = new FederatedStoreProperties();
properties.set(PROP_MERGE_CLASS_NUMBER, Product.class.getName());
properties.set(PROP_MERGE_CLASS_STRING, StringDeduplicateConcat.class.getName());
properties.set(PROP_MERGE_CLASS_BOOLEAN, Or.class.getName());
properties.set(PROP_MERGE_CLASS_COLLECTION, CollectionIntersect.class.getName());
java.util.Properties properties = new java.util.Properties();
properties.setProperty(PROP_MERGE_CLASS_NUMBER, Product.class.getName());
properties.setProperty(PROP_MERGE_CLASS_STRING, StringDeduplicateConcat.class.getName());
properties.setProperty(PROP_MERGE_CLASS_BOOLEAN, Or.class.getName());
properties.setProperty(PROP_MERGE_CLASS_COLLECTION, CollectionIntersect.class.getName());
properties.setProperty(PROP_MERGE_CLASS_MAP, StringConcat.class.getName());

// Init the accumulator with custom properties
FederatedResultAccumulator<T> accumulator = new DefaultResultAccumulator<>(properties);
Expand All @@ -113,7 +132,7 @@ void shouldSetElementAggregationFromProperties() {
properties.set(PROP_DEFAULT_MERGE_ELEMENTS, "true");

// When/Then
FederatedResultAccumulator<?> accumulator = new DefaultResultAccumulator<>(properties);
FederatedResultAccumulator<?> accumulator = new DefaultResultAccumulator<>(properties.getProperties());
assertThat(accumulator.aggregateElements()).isTrue();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private ModernDatasetUtils() {
public static final MapStoreProperties MAP_STORE_PROPERTIES = MapStoreProperties
.loadStoreProperties("/map-store.properties");
public static final AccumuloProperties ACCUMULO_STORE_PROPERTIES = AccumuloProperties
.loadStoreProperties("/accumulo-store.properties");
.loadStoreProperties("/accumulo-store.properties");

public static Graph getBlankGraphWithModernSchema(Class<?> clazz, String graphId, StoreType storeType) {
return new Graph.Builder()
Expand Down
Loading