diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestCtxMap.java b/server/src/main/java/org/elasticsearch/ingest/IngestCtxMap.java new file mode 100644 index 0000000000000..51a6178087fab --- /dev/null +++ b/server/src/main/java/org/elasticsearch/ingest/IngestCtxMap.java @@ -0,0 +1,88 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.script.CtxMap; +import org.elasticsearch.script.Metadata; + +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Map containing ingest source and metadata. + * + * The Metadata values in {@link IngestDocument.Metadata} are validated when put in the map. + * _index, _id and _routing must be a String or null + * _version_type must be a lower case VersionType or null + * _version must be representable as a long without loss of precision or null + * _dyanmic_templates must be a map + * _if_seq_no must be a long or null + * _if_primary_term must be a long or null + * + * The map is expected to be used by processors, server code should the typed getter and setters where possible. + */ +class IngestCtxMap extends CtxMap { + + /** + * Create an IngestCtxMap with the given metadata, source and default validators + */ + IngestCtxMap( + String index, + String id, + long version, + String routing, + VersionType versionType, + ZonedDateTime timestamp, + Map source + ) { + super(new HashMap<>(source), new Metadata(index, id, version, routing, versionType, timestamp)); + } + + /** + * Create IngestCtxMap from a source and metadata + * + * @param source the source document map + * @param metadata the metadata map + */ + IngestCtxMap(Map source, Metadata metadata) { + super(source, metadata); + } + + /** + * Returns a new metadata map and the existing source map with metadata removed. + */ + public static Tuple, Map> splitSourceAndMetadata(Map sourceAndMetadata) { + return CtxMap.splitSourceAndMetadata( + sourceAndMetadata, + Arrays.stream(IngestDocument.Metadata.values()).map(IngestDocument.Metadata::getFieldName).collect(Collectors.toSet()) + ); + } + + /** + * Fetch the timestamp from the ingestMetadata, if it exists + * @return the timestamp for the document or null + */ + public static ZonedDateTime getTimestamp(Map ingestMetadata) { + if (ingestMetadata == null) { + return null; + } + Object ts = ingestMetadata.get(IngestDocument.TIMESTAMP); + if (ts instanceof ZonedDateTime timestamp) { + return timestamp; + } else if (ts instanceof String str) { + return ZonedDateTime.parse(str); + } + return null; + } +} diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index a77d5d57b3170..dcb5b4e090567 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -49,7 +49,7 @@ public final class IngestDocument { static final String TIMESTAMP = "timestamp"; - private final IngestSourceAndMetadata sourceAndMetadata; + private final IngestCtxMap sourceAndMetadata; private final Map ingestMetadata; // Contains all pipelines that have been executed for this document @@ -58,15 +58,7 @@ public final class IngestDocument { private boolean doNoSelfReferencesCheck = false; public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map source) { - this.sourceAndMetadata = new IngestSourceAndMetadata( - index, - id, - version, - routing, - versionType, - ZonedDateTime.now(ZoneOffset.UTC), - source - ); + this.sourceAndMetadata = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source); this.ingestMetadata = new HashMap<>(); this.ingestMetadata.put(TIMESTAMP, sourceAndMetadata.getMetadata().getTimestamp()); } @@ -76,7 +68,7 @@ public IngestDocument(String index, String id, long version, String routing, Ver */ public IngestDocument(IngestDocument other) { this( - new IngestSourceAndMetadata(deepCopyMap(other.sourceAndMetadata.getSource()), other.sourceAndMetadata.getMetadata().clone()), + new IngestCtxMap(deepCopyMap(other.sourceAndMetadata.getSource()), other.sourceAndMetadata.getMetadata().clone()), deepCopyMap(other.ingestMetadata) ); } @@ -85,10 +77,10 @@ public IngestDocument(IngestDocument other) { * Constructor to create an IngestDocument from its constituent maps. The maps are shallow copied. */ public IngestDocument(Map sourceAndMetadata, Map ingestMetadata) { - Tuple, Map> sm = IngestSourceAndMetadata.splitSourceAndMetadata(sourceAndMetadata); - this.sourceAndMetadata = new IngestSourceAndMetadata( + Tuple, Map> sm = IngestCtxMap.splitSourceAndMetadata(sourceAndMetadata); + this.sourceAndMetadata = new IngestCtxMap( sm.v1(), - new org.elasticsearch.script.Metadata(sm.v2(), IngestSourceAndMetadata.getTimestamp(ingestMetadata)) + new org.elasticsearch.script.Metadata(sm.v2(), IngestCtxMap.getTimestamp(ingestMetadata)) ); this.ingestMetadata = new HashMap<>(ingestMetadata); this.ingestMetadata.computeIfPresent(TIMESTAMP, (k, v) -> { @@ -102,7 +94,7 @@ public IngestDocument(Map sourceAndMetadata, Map /** * Constructor to create an IngestDocument from its constituent maps */ - IngestDocument(IngestSourceAndMetadata sourceAndMetadata, Map ingestMetadata) { + IngestDocument(IngestCtxMap sourceAndMetadata, Map ingestMetadata) { this.sourceAndMetadata = sourceAndMetadata; this.ingestMetadata = ingestMetadata; } @@ -724,9 +716,9 @@ public Map getSourceAndMetadata() { } /** - * Get source and metadata map as {@link IngestSourceAndMetadata} + * Get source and metadata map as {@link IngestCtxMap} */ - public IngestSourceAndMetadata getIngestSourceAndMetadata() { + public IngestCtxMap getIngestSourceAndMetadata() { return sourceAndMetadata; } @@ -763,7 +755,7 @@ public static Object deepCopy(Object value) { for (Map.Entry entry : mapValue.entrySet()) { copy.put(entry.getKey(), deepCopy(entry.getValue())); } - // TODO(stu): should this check for IngestSourceAndMetadata in addition to Map? + // TODO(stu): should this check for IngestCtxMap in addition to Map? return copy; } else if (value instanceof List listValue) { List copy = new ArrayList<>(listValue.size()); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestSourceAndMetadata.java b/server/src/main/java/org/elasticsearch/script/CtxMap.java similarity index 76% rename from server/src/main/java/org/elasticsearch/ingest/IngestSourceAndMetadata.java rename to server/src/main/java/org/elasticsearch/script/CtxMap.java index a041891d51371..3d2cd684b651c 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestSourceAndMetadata.java +++ b/server/src/main/java/org/elasticsearch/script/CtxMap.java @@ -6,15 +6,12 @@ * Side Public License, v 1. */ -package org.elasticsearch.ingest; +package org.elasticsearch.script; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Tuple; -import org.elasticsearch.index.VersionType; -import org.elasticsearch.script.Metadata; -import java.time.ZonedDateTime; import java.util.AbstractCollection; import java.util.AbstractMap; import java.util.AbstractSet; @@ -26,46 +23,17 @@ import java.util.Set; import java.util.stream.Collectors; -/** - * Map containing ingest source and metadata. - * - * The Metadata values in {@link IngestDocument.Metadata} are validated when put in the map. - * _index, _id and _routing must be a String or null - * _version_type must be a lower case VersionType or null - * _version must be representable as a long without loss of precision or null - * _dyanmic_templates must be a map - * _if_seq_no must be a long or null - * _if_primary_term must be a long or null - * - * The map is expected to be used by processors, server code should the typed getter and setters where possible. - */ -class IngestSourceAndMetadata extends AbstractMap { - +public class CtxMap extends AbstractMap { protected final Map source; protected final Metadata metadata; /** - * Create an IngestSourceAndMetadata with the given metadata, source and default validators - */ - IngestSourceAndMetadata( - String index, - String id, - long version, - String routing, - VersionType versionType, - ZonedDateTime timestamp, - Map source - ) { - this(new HashMap<>(source), new Metadata(index, id, version, routing, versionType, timestamp)); - } - - /** - * Create IngestSourceAndMetadata from a source and metadata + * Create CtxMap from a source and metadata * * @param source the source document map * @param metadata the metadata map */ - IngestSourceAndMetadata(Map source, Metadata metadata) { + protected CtxMap(Map source, Metadata metadata) { this.source = source != null ? source : new HashMap<>(); this.metadata = metadata; Set badKeys = Sets.intersection(this.metadata.keySet(), this.source.keySet()); @@ -81,38 +49,25 @@ class IngestSourceAndMetadata extends AbstractMap { /** * Returns a new metadata map and the existing source map with metadata removed. */ - public static Tuple, Map> splitSourceAndMetadata(Map sourceAndMetadata) { - if (sourceAndMetadata instanceof IngestSourceAndMetadata ingestSourceAndMetadata) { - return new Tuple<>(new HashMap<>(ingestSourceAndMetadata.source), new HashMap<>(ingestSourceAndMetadata.metadata.getMap())); + public static Tuple, Map> splitSourceAndMetadata( + Map sourceAndMetadata, + Set metadataKeys + ) { + if (sourceAndMetadata instanceof CtxMap ctxMap) { + return new Tuple<>(new HashMap<>(ctxMap.source), new HashMap<>(ctxMap.metadata.getMap())); } - Map metadata = Maps.newHashMapWithExpectedSize(IngestDocument.Metadata.values().length); + + Map metadata = Maps.newHashMapWithExpectedSize(metadataKeys.size()); Map source = new HashMap<>(sourceAndMetadata); - for (IngestDocument.Metadata ingestDocumentMetadata : IngestDocument.Metadata.values()) { - String metadataName = ingestDocumentMetadata.getFieldName(); - if (sourceAndMetadata.containsKey(metadataName)) { - metadata.put(metadataName, source.remove(metadataName)); + + for (String metadataKey : metadataKeys) { + if (sourceAndMetadata.containsKey(metadataKey)) { + metadata.put(metadataKey, source.remove(metadataKey)); } } return new Tuple<>(source, metadata); } - /** - * Fetch the timestamp from the ingestMetadata, if it exists - * @return the timestamp for the document or null - */ - public static ZonedDateTime getTimestamp(Map ingestMetadata) { - if (ingestMetadata == null) { - return null; - } - Object ts = ingestMetadata.get(IngestDocument.TIMESTAMP); - if (ts instanceof ZonedDateTime timestamp) { - return timestamp; - } else if (ts instanceof String str) { - return ZonedDateTime.parse(str); - } - return null; - } - /** * get the source map, if externally modified then the guarantees of this class are not enforced */ @@ -326,10 +281,10 @@ public Object setValue(Object value) { @Override public boolean equals(Object o) { if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if ((o instanceof CtxMap) == false) return false; if (super.equals(o) == false) return false; - IngestSourceAndMetadata that = (IngestSourceAndMetadata) o; - return Objects.equals(source, that.source) && Objects.equals(metadata, that.metadata); + CtxMap ctxMap = (CtxMap) o; + return source.equals(ctxMap.source) && metadata.equals(ctxMap.metadata); } @Override diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestSourceAndMetadataTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestCtxMapTests.java similarity index 91% rename from server/src/test/java/org/elasticsearch/ingest/IngestSourceAndMetadataTests.java rename to server/src/test/java/org/elasticsearch/ingest/IngestCtxMapTests.java index 34a05e9ef2e03..bbb8d8b7a4e28 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestSourceAndMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestCtxMapTests.java @@ -22,9 +22,9 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; -public class IngestSourceAndMetadataTests extends ESTestCase { +public class IngestCtxMapTests extends ESTestCase { - IngestSourceAndMetadata map; + IngestCtxMap map; Metadata md; public void testSettersAndGetters() { @@ -37,7 +37,7 @@ public void testSettersAndGetters() { metadata.put("_if_primary_term", 10000); metadata.put("_version_type", "internal"); metadata.put("_dynamic_templates", Map.of("foo", "bar")); - map = new IngestSourceAndMetadata(new HashMap<>(), new Metadata(metadata, null)); + map = new IngestCtxMap(new HashMap<>(), new Metadata(metadata, null)); md = map.getMetadata(); assertEquals("myIndex", md.getIndex()); md.setIndex("myIndex2"); @@ -70,7 +70,7 @@ public void testInvalidMetadata() { metadata.put("_version", Double.MAX_VALUE); IllegalArgumentException err = expectThrows( IllegalArgumentException.class, - () -> new IngestSourceAndMetadata(new HashMap<>(), new Metadata(metadata, null)) + () -> new IngestCtxMap(new HashMap<>(), new Metadata(metadata, null)) ); assertThat(err.getMessage(), containsString("_version may only be set to an int or a long but was [")); assertThat(err.getMessage(), containsString("] with type [java.lang.Double]")); @@ -81,7 +81,7 @@ public void testSourceInMetadata() { source.put("_version", 25); IllegalArgumentException err = expectThrows( IllegalArgumentException.class, - () -> new IngestSourceAndMetadata(source, new Metadata(source, null)) + () -> new IngestCtxMap(source, new Metadata(source, null)) ); assertEquals("unexpected metadata [_version:25] in source", err.getMessage()); } @@ -93,7 +93,7 @@ public void testExtraMetadata() { metadata.put("routing", "myRouting"); IllegalArgumentException err = expectThrows( IllegalArgumentException.class, - () -> new IngestSourceAndMetadata(new HashMap<>(), new Metadata(metadata, null)) + () -> new IngestCtxMap(new HashMap<>(), new Metadata(metadata, null)) ); assertEquals("Unexpected metadata keys [routing:myRouting, version:567]", err.getMessage()); } @@ -102,7 +102,7 @@ public void testPutSource() { Map metadata = new HashMap<>(); metadata.put("_version", 123); Map source = new HashMap<>(); - map = new IngestSourceAndMetadata(source, new Metadata(metadata, null)); + map = new IngestCtxMap(source, new Metadata(metadata, null)); } public void testRemove() { @@ -110,7 +110,7 @@ public void testRemove() { String canRemove = "canRemove"; Map metadata = new HashMap<>(); metadata.put(cannotRemove, "value"); - map = new IngestSourceAndMetadata(new HashMap<>(), new TestMetadata(metadata, Map.of(cannotRemove, (o, k, v) -> { + map = new IngestCtxMap(new HashMap<>(), new TestMetadata(metadata, Map.of(cannotRemove, (o, k, v) -> { if (v == null) { throw new IllegalArgumentException(k + " cannot be null or removed"); } @@ -175,7 +175,7 @@ public void testEntryAndIterator() { source.put("foo", "bar"); source.put("baz", "qux"); source.put("noz", "zon"); - map = new IngestSourceAndMetadata(source, TestMetadata.withNullableVersion(metadata)); + map = new IngestCtxMap(source, TestMetadata.withNullableVersion(metadata)); md = map.getMetadata(); for (Map.Entry entry : map.entrySet()) { @@ -215,7 +215,7 @@ public void testEntryAndIterator() { } public void testContainsValue() { - map = new IngestSourceAndMetadata(Map.of("myField", "fieldValue"), new Metadata(Map.of("_version", 5678), null)); + map = new IngestCtxMap(Map.of("myField", "fieldValue"), new Metadata(Map.of("_version", 5678), null)); assertTrue(map.containsValue(5678)); assertFalse(map.containsValue(5679)); assertTrue(map.containsValue("fieldValue")); @@ -223,7 +223,7 @@ public void testContainsValue() { } public void testValidators() { - map = new IngestSourceAndMetadata("myIndex", "myId", 1234, "myRouting", VersionType.EXTERNAL, null, new HashMap<>()); + map = new IngestCtxMap("myIndex", "myId", 1234, "myRouting", VersionType.EXTERNAL, null, new HashMap<>()); md = map.getMetadata(); IllegalArgumentException err = expectThrows(IllegalArgumentException.class, () -> map.put("_index", 555)); assertEquals("_index must be null or a String but was [555] with type [java.lang.Integer]", err.getMessage()); @@ -286,7 +286,7 @@ public void testValidators() { public void testHandlesAllVersionTypes() { Map mdRawMap = new HashMap<>(); mdRawMap.put("_version", 1234); - map = new IngestSourceAndMetadata(new HashMap<>(), new Metadata(mdRawMap, null)); + map = new IngestCtxMap(new HashMap<>(), new Metadata(mdRawMap, null)); md = map.getMetadata(); assertNull(md.getVersionType()); for (VersionType vt : VersionType.values()) { diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 39d93a0691856..6182e4709a6b8 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -2228,7 +2228,7 @@ private class IngestDocumentMatcher implements ArgumentMatcher { @Override public boolean matches(IngestDocument other) { - // ingest metadata and IngestSourceAndMetadata will not be the same (timestamp differs every time) + // ingest metadata and IngestCtxMap will not be the same (timestamp differs every time) return Objects.equals(ingestDocument.getSource(), other.getSource()) && Objects.equals(ingestDocument.getMetadata().getMap(), other.getMetadata().getMap()); } diff --git a/test/framework/src/main/java/org/elasticsearch/ingest/TestIngestDocument.java b/test/framework/src/main/java/org/elasticsearch/ingest/TestIngestDocument.java index ffd9ca324f63b..58afd55d38480 100644 --- a/test/framework/src/main/java/org/elasticsearch/ingest/TestIngestDocument.java +++ b/test/framework/src/main/java/org/elasticsearch/ingest/TestIngestDocument.java @@ -37,8 +37,8 @@ public static IngestDocument withNullableVersion(Map sourceAndMe * _versions. Normally null _version is not allowed, but many tests don't care about that invariant. */ public static IngestDocument ofIngestWithNullableVersion(Map sourceAndMetadata, Map ingestMetadata) { - Tuple, Map> sm = IngestSourceAndMetadata.splitSourceAndMetadata(sourceAndMetadata); - return new IngestDocument(new IngestSourceAndMetadata(sm.v1(), TestMetadata.withNullableVersion(sm.v2())), ingestMetadata); + Tuple, Map> sm = IngestCtxMap.splitSourceAndMetadata(sourceAndMetadata); + return new IngestDocument(new IngestCtxMap(sm.v1(), TestMetadata.withNullableVersion(sm.v2())), ingestMetadata); } /** @@ -57,7 +57,7 @@ public static IngestDocument withDefaultVersion(Map sourceAndMet * can observe changes to the map directly. */ public static IngestDocument ofMetadataWithValidator(Map metadata, Map validators) { - return new IngestDocument(new IngestSourceAndMetadata(new HashMap<>(), new TestMetadata(metadata, validators)), new HashMap<>()); + return new IngestDocument(new IngestCtxMap(new HashMap<>(), new TestMetadata(metadata, validators)), new HashMap<>()); } /**