|
| 1 | +"""Test embedding different file formats and different encodings within the <Data> tag.""" |
| 2 | + |
| 3 | +import unittest |
| 4 | +import os |
| 5 | +from pywps import get_ElementMakerForVersion, E |
| 6 | +from pywps.app.basic import get_xpath_ns |
| 7 | +from pywps import Service, Process, ComplexInput, ComplexOutput, FORMATS |
| 8 | +from pywps.tests import client_for, assert_response_success |
| 9 | +from owslib.wps import WPSExecution, ComplexDataInput |
| 10 | +from lxml import etree |
| 11 | + |
| 12 | +VERSION = "1.0.0" |
| 13 | +WPS, OWS = get_ElementMakerForVersion(VERSION) |
| 14 | +xpath_ns = get_xpath_ns(VERSION) |
| 15 | + |
| 16 | + |
| 17 | +def get_resource(path): |
| 18 | + return os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data', path) |
| 19 | + |
| 20 | + |
| 21 | +test_fmts = {'json': (get_resource('json/point.geojson'), FORMATS.JSON), |
| 22 | + 'geojson': (get_resource('json/point.geojson'), FORMATS.GEOJSON), |
| 23 | + 'netcdf': (get_resource('netcdf/time.nc'), FORMATS.NETCDF), |
| 24 | + 'geotiff': (get_resource('geotiff/dem.tiff'), FORMATS.GEOTIFF), |
| 25 | + 'gml': (get_resource('gml/point.gml'), FORMATS.GML), |
| 26 | + 'shp': (get_resource('shp/point.shp.zip'), FORMATS.SHP), |
| 27 | + 'txt': (get_resource('text/unsafe.txt'), FORMATS.TEXT), |
| 28 | + } |
| 29 | + |
| 30 | + |
| 31 | +def create_fmt_process(name, fn, fmt): |
| 32 | + """Create a dummy process comparing the input file on disk and the data that was passed in the request.""" |
| 33 | + def handler(request, response): |
| 34 | + # Load output from file and convert to data |
| 35 | + response.outputs['complex'].file = fn |
| 36 | + o = response.outputs['complex'].data |
| 37 | + |
| 38 | + # Get input data from the request |
| 39 | + i = request.inputs['complex'][0].data |
| 40 | + |
| 41 | + assert i == o |
| 42 | + return response |
| 43 | + |
| 44 | + return Process(handler=handler, |
| 45 | + identifier='test-fmt', |
| 46 | + title='Complex fmt test process', |
| 47 | + inputs=[ComplexInput('complex', 'Complex input', |
| 48 | + supported_formats=(fmt, ))], |
| 49 | + outputs=[ComplexOutput('complex', 'Complex output', |
| 50 | + supported_formats=(fmt, ))]) |
| 51 | + |
| 52 | + |
| 53 | +def get_data(fn, encoding=None): |
| 54 | + """Read the data from file and encode.""" |
| 55 | + import base64 |
| 56 | + mode = 'rb' if encoding == 'base64' else 'r' |
| 57 | + with open(fn, mode) as fp: |
| 58 | + data = fp.read() |
| 59 | + |
| 60 | + if encoding == 'base64': |
| 61 | + data = base64.b64encode(data) |
| 62 | + |
| 63 | + if isinstance(data, bytes): |
| 64 | + return data.decode('utf-8') |
| 65 | + else: |
| 66 | + return data |
| 67 | + |
| 68 | + |
| 69 | +class RawInput(unittest.TestCase): |
| 70 | + |
| 71 | + def make_request(self, name, fn, fmt): |
| 72 | + """Create XML request embedding encoded data.""" |
| 73 | + data = get_data(fn, fmt.encoding) |
| 74 | + |
| 75 | + doc = WPS.Execute( |
| 76 | + OWS.Identifier('test-fmt'), |
| 77 | + WPS.DataInputs( |
| 78 | + WPS.Input( |
| 79 | + OWS.Identifier('complex'), |
| 80 | + WPS.Data( |
| 81 | + WPS.ComplexData(data, mimeType=fmt.mime_type, encoding=fmt.encoding)))), |
| 82 | + version='1.0.0') |
| 83 | + |
| 84 | + return doc |
| 85 | + |
| 86 | + def compare_io(self, name, fn, fmt): |
| 87 | + """Start the dummy process, post the request and check the response matches the input data.""" |
| 88 | + |
| 89 | + # Note that `WPSRequest` calls `get_inputs_from_xml` which converts base64 input to bytes |
| 90 | + # See `_get_rawvalue_value` |
| 91 | + client = client_for(Service(processes=[create_fmt_process(name, fn, fmt)])) |
| 92 | + data = get_data(fn, fmt.encoding) |
| 93 | + |
| 94 | + wps = WPSExecution() |
| 95 | + doc = wps.buildRequest('test-fmt', |
| 96 | + inputs=[('complex', ComplexDataInput(data, mimeType=fmt.mime_type, |
| 97 | + encoding=fmt.encoding))], |
| 98 | + mode='sync') |
| 99 | + resp = client.post_xml(doc=doc) |
| 100 | + assert_response_success(resp) |
| 101 | + wps.parseResponse(resp.xml) |
| 102 | + out = wps.processOutputs[0].data[0] |
| 103 | + |
| 104 | + if 'gml' in fmt.mime_type: |
| 105 | + xml_orig = etree.tostring(etree.fromstring(data.encode('utf-8'))).decode('utf-8') |
| 106 | + xml_out = etree.tostring(etree.fromstring(out.decode('utf-8'))).decode('utf-8') |
| 107 | + # Not equal because the output includes additional namespaces compared to the origin. |
| 108 | + # self.assertEqual(xml_out, xml_orig) |
| 109 | + |
| 110 | + else: |
| 111 | + self.assertEqual(out.strip(), data.strip()) |
| 112 | + |
| 113 | + def test_json(self): |
| 114 | + key = 'json' |
| 115 | + self.compare_io(key, *test_fmts[key]) |
| 116 | + |
| 117 | + def test_geojson(self): |
| 118 | + key = 'geojson' |
| 119 | + self.compare_io(key, *test_fmts[key]) |
| 120 | + |
| 121 | + def test_geotiff(self): |
| 122 | + key = 'geotiff' |
| 123 | + self.compare_io(key, *test_fmts[key]) |
| 124 | + |
| 125 | + def test_netcdf(self): |
| 126 | + key = 'netcdf' |
| 127 | + self.compare_io(key, *test_fmts[key]) |
| 128 | + |
| 129 | + def test_gml(self): |
| 130 | + key = 'gml' |
| 131 | + self.compare_io(key, *test_fmts[key]) |
| 132 | + |
| 133 | + def test_shp(self): |
| 134 | + key = 'shp' |
| 135 | + self.compare_io(key, *test_fmts[key]) |
| 136 | + |
| 137 | + def test_txt(self): |
| 138 | + key = 'txt' |
| 139 | + self.compare_io(key, *test_fmts[key]) |
0 commit comments