Skip to content

Commit 6e2e72b

Browse files
authored
Fixed ingest pipeline script issue (opensearch-project#11725)
Signed-off-by: vikasvb90 <[email protected]>
1 parent 71f1fab commit 6e2e72b

File tree

6 files changed

+146
-2
lines changed

6 files changed

+146
-2
lines changed

modules/ingest-common/src/main/java/org/opensearch/ingest/common/ScriptProcessor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,11 @@ public IngestDocument execute(IngestDocument document) {
102102
} else {
103103
ingestScript = precompiledIngestScript;
104104
}
105-
ingestScript.execute(document.getSourceAndMetadata());
106-
CollectionUtils.ensureNoSelfReferences(document.getSourceAndMetadata(), "ingest script");
105+
IngestDocument mutableDocument = new IngestDocument(document);
106+
ingestScript.execute(mutableDocument.getSourceAndMetadata());
107+
CollectionUtils.ensureNoSelfReferences(mutableDocument.getSourceAndMetadata(), "ingest script");
108+
document.getSourceAndMetadata().clear();
109+
document.getSourceAndMetadata().putAll(mutableDocument.getSourceAndMetadata());
107110
return document;
108111
}
109112

modules/ingest-common/src/test/java/org/opensearch/ingest/common/ScriptProcessorTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,4 +105,16 @@ private void assertIngestDocument(IngestDocument ingestDocument) {
105105
int bytesTotal = ingestDocument.getFieldValue("bytes_in", Integer.class) + ingestDocument.getFieldValue("bytes_out", Integer.class);
106106
assertThat(ingestDocument.getSourceAndMetadata().get("bytes_total"), is(bytesTotal));
107107
}
108+
109+
public void testScriptingWithSelfReferencingSourceMetadata() {
110+
ScriptProcessor processor = new ScriptProcessor(randomAlphaOfLength(10), null, script, null, scriptService);
111+
IngestDocument originalIngestDocument = randomDocument();
112+
String index = originalIngestDocument.getSourceAndMetadata().get(IngestDocument.Metadata.INDEX.getFieldName()).toString();
113+
String id = originalIngestDocument.getSourceAndMetadata().get(IngestDocument.Metadata.ID.getFieldName()).toString();
114+
Map<String, Object> sourceMetadata = originalIngestDocument.getSourceAndMetadata();
115+
originalIngestDocument.getSourceAndMetadata().put("_source", sourceMetadata);
116+
IngestDocument ingestDocument = new IngestDocument(index, id, null, null, null, originalIngestDocument.getSourceAndMetadata());
117+
expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
118+
}
119+
108120
}

modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/190_script_processor.yml

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,3 +202,79 @@ teardown:
202202
id: 1
203203
- match: { _source.source_field: "foo%20bar" }
204204
- match: { _source.target_field: "foo bar" }
205+
206+
---
207+
"Test self referencing source with ignore failure":
208+
- do:
209+
ingest.put_pipeline:
210+
id: "my_pipeline"
211+
body: >
212+
{
213+
"description": "_description",
214+
"processors": [
215+
{
216+
"script" : {
217+
"lang": "painless",
218+
"source" : "ctx.foo['foo']=ctx.foo;ctx['test-field']='test-value'",
219+
"ignore_failure": true
220+
}
221+
},
222+
{
223+
"script" : {
224+
"lang": "painless",
225+
"source" : "ctx.target_field = Processors.uppercase(ctx.source_field)"
226+
}
227+
}
228+
]
229+
}
230+
- match: { acknowledged: true }
231+
232+
- do:
233+
index:
234+
index: test
235+
id: 1
236+
pipeline: "my_pipeline"
237+
body: {source_field: "fooBar", foo: {foo: "bar"}}
238+
239+
- do:
240+
get:
241+
index: test
242+
id: 1
243+
- match: { _source.source_field: "fooBar" }
244+
- match: { _source.target_field: "FOOBAR"}
245+
- match: { _source.test-field: null}
246+
247+
---
248+
"Test self referencing source without ignoring failure":
249+
- do:
250+
ingest.put_pipeline:
251+
id: "my_pipeline"
252+
body: >
253+
{
254+
"description": "_description",
255+
"processors": [
256+
{
257+
"script" : {
258+
"lang": "painless",
259+
"source" : "ctx.foo['foo']=ctx.foo;ctx['test-field']='test-value'"
260+
}
261+
},
262+
{
263+
"script" : {
264+
"lang": "painless",
265+
"source" : "ctx.target_field = Processors.uppercase(ctx.source_field)"
266+
}
267+
}
268+
]
269+
}
270+
- match: { acknowledged: true }
271+
272+
- do:
273+
catch: bad_request
274+
index:
275+
index: test
276+
id: 1
277+
pipeline: "my_pipeline"
278+
body: {source_field: "fooBar", foo: {foo: "bar"}}
279+
- match: { error.root_cause.0.type: "illegal_argument_exception" }
280+
- match: { error.root_cause.0.reason: "Iterable object is self-referencing itself (ingest script)" }

modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1113,3 +1113,48 @@ teardown:
11131113
- match: { status: 400 }
11141114
- match: { error.root_cause.0.type: "illegal_argument_exception" }
11151115
- match: { error.root_cause.0.reason: "Failed to parse parameter [_if_primary_term], only int or long is accepted" }
1116+
1117+
---
1118+
"Test simulate with pipeline with ignore failure and cyclic field assignments in script":
1119+
- do:
1120+
ingest.simulate:
1121+
verbose: true
1122+
body: >
1123+
{
1124+
"pipeline": {
1125+
"description": "_description",
1126+
"processors": [
1127+
{
1128+
"script" : {
1129+
"ignore_failure" : true,
1130+
"lang": "painless",
1131+
"source": "ctx.foo['foo']=ctx.foo;ctx.tag='recursive'"
1132+
}
1133+
},
1134+
{
1135+
"script" : {
1136+
"lang": "painless",
1137+
"source" : "ctx.target_field = Processors.uppercase(ctx.foo.foo)"
1138+
}
1139+
}
1140+
]
1141+
},
1142+
"docs": [
1143+
{
1144+
"_source": {
1145+
"foo": {
1146+
"foo": "bar"
1147+
}
1148+
}
1149+
}
1150+
]
1151+
}
1152+
- length: { docs: 1 }
1153+
- length: { docs.0.processor_results: 2 }
1154+
- match: { docs.0.processor_results.0.status: "error_ignored" }
1155+
- match: { docs.0.processor_results.0.ignored_error.error.type: "illegal_argument_exception" }
1156+
- match: { docs.0.processor_results.0.doc._source.tag: null }
1157+
- match: { docs.0.processor_results.1.doc._source.target_field: "BAR" }
1158+
- match: { docs.0.processor_results.1.doc._source.foo.foo: "bar" }
1159+
- match: { docs.0.processor_results.1.status: "success" }
1160+
- match: { docs.0.processor_results.1.processor_type: "script" }

server/src/main/java/org/opensearch/ingest/IngestDocument.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
package org.opensearch.ingest;
3434

3535
import org.opensearch.core.common.Strings;
36+
import org.opensearch.core.common.util.CollectionUtils;
3637
import org.opensearch.index.VersionType;
3738
import org.opensearch.index.mapper.IdFieldMapper;
3839
import org.opensearch.index.mapper.IndexFieldMapper;
@@ -752,6 +753,7 @@ public Map<String, Object> getSourceAndMetadata() {
752753

753754
@SuppressWarnings("unchecked")
754755
public static <K, V> Map<K, V> deepCopyMap(Map<K, V> source) {
756+
CollectionUtils.ensureNoSelfReferences(source, "IngestDocument: Self reference present in object.");
755757
return (Map<K, V>) deepCopy(source);
756758
}
757759

server/src/test/java/org/opensearch/ingest/IngestDocumentTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,12 @@ public void setTestIngestDocument() {
9595
ingestDocument = new IngestDocument("index", "id", null, null, null, document);
9696
}
9797

98+
public void testSelfReferencingSource() {
99+
Map<String, Object> value = new HashMap<>();
100+
value.put("foo", value);
101+
expectThrows(IllegalArgumentException.class, () -> IngestDocument.deepCopyMap(value));
102+
}
103+
98104
public void testSimpleGetFieldValue() {
99105
assertThat(ingestDocument.getFieldValue("foo", String.class), equalTo("bar"));
100106
assertThat(ingestDocument.getFieldValue("int", Integer.class), equalTo(123));

0 commit comments

Comments
 (0)