From 690d84fa7de3919cdb81d7edaee1c26cc9b7f38b Mon Sep 17 00:00:00 2001 From: "joel.clement" Date: Wed, 7 Feb 2024 13:50:58 +0100 Subject: [PATCH] fix permission #96 --- backend/gn_modulator/module/breadcrumbs.py | 9 +++-- .../gn_modulator/routes/utils/repository.py | 34 +++++++++++++----- backend/gn_modulator/schema/repositories.py | 36 ++++++++++++++++--- backend/gn_modulator/tests/data/commons.py | 19 +++++++++- backend/gn_modulator/tests/test_repository.py | 8 +++++ backend/gn_modulator/tests/test_rest_api.py | 19 ++++++++-- backend/gn_modulator/tests/utils/rest.py | 21 +++++++++-- 7 files changed, 125 insertions(+), 21 deletions(-) diff --git a/backend/gn_modulator/module/breadcrumbs.py b/backend/gn_modulator/module/breadcrumbs.py index 2a98cd10..0bb4ec94 100644 --- a/backend/gn_modulator/module/breadcrumbs.py +++ b/backend/gn_modulator/module/breadcrumbs.py @@ -1,4 +1,4 @@ -from geonature.utils.env import db +from sqlalchemy.orm.exc import NoResultFound from gn_modulator.schema import SchemaMethods from gn_modulator.utils.commons import getAttr from gn_modulator import MODULE_CODE @@ -39,7 +39,12 @@ def breadcrumbs(cls, module_code, page_code, data): q = sm.get_row( data[sm.Model().pk_field_name()], module_code=module_code, params={} ) - m = q.one() + # patch apres delete... + try: + m = q.one() + except NoResultFound: + return parent_breadcrumbs + data_label = sm.serialize(m, fields=[sm.label_field_name()]) # label_page = f"{sm.label()} {data_label[sm.label_field_name()]}" label_page = f"{sm.label()} {getAttr(data_label, sm.label_field_name())}" diff --git a/backend/gn_modulator/routes/utils/repository.py b/backend/gn_modulator/routes/utils/repository.py index 416827ac..8270b01a 100644 --- a/backend/gn_modulator/routes/utils/repository.py +++ b/backend/gn_modulator/routes/utils/repository.py @@ -13,6 +13,8 @@ def get_list_rest(module_code, object_code, additional_params={}): schema_code = ModuleMethods.schema_code(module_code, object_code) sm = SchemaMethods(schema_code) + id_role = g.current_user.id_role + # on peut redéfinir le module_code pour le choix des droits permission_module_code = object_definition.get("module_code", module_code) params = {**parse_request_args(object_definition), **additional_params} @@ -22,7 +24,11 @@ def get_list_rest(module_code, object_code, additional_params={}): {} if params.get("no_info") else sm.get_query_infos( - module_code=permission_module_code, action=action, params=params, url=request.url + module_code=permission_module_code, + action=action, + params=params, + url=request.url, + id_role=id_role, ) ) @@ -32,11 +38,12 @@ def get_list_rest(module_code, object_code, additional_params={}): action=action, params=params, query_type="select", + id_role=id_role, ) if params.get("sql"): # test si droit admin - if not has_any_permissions("R", g.current_user.id_role, "MODULATOR", "ADMIN"): + if not has_any_permissions("R", id_role, "MODULATOR", "ADMIN"): return ( "Vous n'avez pas les droit pour effectuer des actions d'admin pour le module MODULATOR", 403, @@ -70,7 +77,7 @@ def get_one_rest(module_code, object_code, value): object_definition = ModuleMethods.object_config(module_code, object_code) schema_code = ModuleMethods.schema_code(module_code, object_code) sm = SchemaMethods(schema_code) - + id_role = g.current_user.id_role params = parse_request_args(object_definition) permission_module_code = object_definition.get("module_code", module_code) @@ -82,6 +89,7 @@ def get_one_rest(module_code, object_code, value): module_code=permission_module_code, action="R", params=params, + id_role=id_role, ) m = q.one() @@ -119,6 +127,7 @@ def patch_rest(module_code, object_code, value): object_definition = ModuleMethods.object_config(module_code, object_code) schema_code = ModuleMethods.schema_code(module_code, object_code) sm = SchemaMethods(schema_code) + id_role = g.current_user.id_role permission_module_code = object_definition.get("module_code", module_code) @@ -136,6 +145,7 @@ def patch_rest(module_code, object_code, value): params=params, authorized_write_fields=authorized_write_fields, commit=True, + id_role=id_role, ) except sm.errors.SchemaUnsufficientCruvedRigth as e: @@ -150,6 +160,8 @@ def delete_rest(module_code, object_code, value): object_definition = ModuleMethods.object_config(module_code, object_code) schema_code = ModuleMethods.schema_code(module_code, object_code) sm = SchemaMethods(schema_code) + id_role = g.current_user.id_role + permission_module_code = object_definition.get("module_code", module_code) params = parse_request_args(object_definition) @@ -164,7 +176,11 @@ def delete_rest(module_code, object_code, value): try: sm.delete_row( - value, module_code=module_code, field_name=params.get("field_name"), commit=True + value, + module_code=module_code, + field_name=params.get("field_name"), + commit=True, + id_role=id_role, ) except sm.errors.SchemaUnsufficientCruvedRigth as e: @@ -172,19 +188,21 @@ def delete_rest(module_code, object_code, value): return dict_out - pass - def get_page_number_and_list(module_code, object_code, value): object_definition = ModuleMethods.object_config(module_code, object_code) schema_code = ModuleMethods.schema_code(module_code, object_code) sm = SchemaMethods(schema_code) - + id_role = g.current_user.id_role permission_module_code = object_definition.get("module_code", module_code) params = parse_request_args(object_definition) page_number = sm.get_page_number( - value, permission_module_code, params.get("action") or "R", params + value, + permission_module_code, + params.get("action") or "R", + params, + id_role=id_role, ) return get_list_rest(module_code, object_code, additional_params={"page": page_number}) diff --git a/backend/gn_modulator/schema/repositories.py b/backend/gn_modulator/schema/repositories.py index 6d065834..d95b5fbe 100644 --- a/backend/gn_modulator/schema/repositories.py +++ b/backend/gn_modulator/schema/repositories.py @@ -49,6 +49,7 @@ def get_row( action="R", params={}, query_type="all", + id_role=None, ): """ return query get one row (Model. == value) @@ -76,6 +77,7 @@ def get_row( action=action, params=params_query, query_type=query_type, + id_role=id_role, ) return query @@ -173,6 +175,7 @@ def update_row( params={}, authorized_write_fields=None, commit=True, + id_role=None, ): """ update row (Model. == value) with data @@ -188,6 +191,7 @@ def update_row( action="U", params=params, query_type="update", + id_role=id_role, ) m = q.one() @@ -210,6 +214,7 @@ def delete_row( params={}, commit=True, multiple=False, + id_role=None, ): """ delete row (Model. == value) @@ -221,25 +226,47 @@ def delete_row( action="D", params=params, query_type="delete", + id_role=id_role, ) # https://stackoverflow.com/questions/49794899/flask-sqlalchemy-delete-query-failing-with-could-not-evaluate-current-criteria?noredirect=1&lq=1 if not multiple: subquery_delete.one() - subquery_delete.delete(synchronize_session=False) + + res = subquery_delete.all() + + if not res: + return + + Model = self.Model() + + q_delete = Model.query + ors = [] + for r in res: + ands = [] + for pk_field_name in Model.pk_field_names(): + f = getattr(Model, pk_field_name) == getattr(r, pk_field_name) + ands.append(f) + ors.append(f) + + q_delete = q_delete.filter(sa.or_(*ors)) + q_delete.delete(synchronize_session=False) db.session.flush() if commit: db.session.commit() return None - def get_query_infos(self, module_code=MODULE_CODE, action="R", params={}, url=None): + def get_query_infos( + self, module_code=MODULE_CODE, action="R", params={}, url=None, id_role=None + ): subquery_count_total = query_list( self.Model(), module_code=module_code, action=action, params=params, query_type="total", + id_role=id_role, ) count_total = subquery_count_total.count() @@ -250,6 +277,7 @@ def get_query_infos(self, module_code=MODULE_CODE, action="R", params={}, url=No action=action, params=params, query_type="filtered", + id_role=id_role, ) count_filtered = subquery_count_filtered.count() @@ -301,11 +329,11 @@ def get_query_infos(self, module_code=MODULE_CODE, action="R", params={}, url=No return query_infos - def get_page_number(self, value, module_code, action, params): + def get_page_number(self, value, module_code, action, params, id_role): params["fields"] = ["row_number"] sub_query_list = query_list( - self.Model(), module_code, action, params, "page_number" + self.Model(), module_code, action, params, "page_number", id_role=id_role ).subquery() row_number = ( diff --git a/backend/gn_modulator/tests/data/commons.py b/backend/gn_modulator/tests/data/commons.py index 869d8478..d6acb6b8 100644 --- a/backend/gn_modulator/tests/data/commons.py +++ b/backend/gn_modulator/tests/data/commons.py @@ -2,6 +2,8 @@ Données exemple pour les test """ +from gn_modulator import SchemaMethods + def module(): return { @@ -18,11 +20,26 @@ def module_update(): return {"module_label": "TEST_PYTEST_UPDATE"} -def pf(): +def pf(user): + + sm_nom = SchemaMethods("ref_nom.nomenclature") + id_nomenclature_type_actor = sm_nom.get_row_as_dict( + ["PF_TYPE_ACTOR", "CON"], + ["nomenclature_type.mnemonique", "cd_nomenclature"], + fields=["id_nomenclature"], + )["id_nomenclature"] + return { "uuid_passage_faune": "f5e5dd42-dcc1-4cfd-97ec-04699d78cb9b", "nom_usuel_passage_faune": "TEST_PF", "geom": {"type": "Point", "coordinates": [0, 45]}, + "id_digitiser": user.id_role, + "actors": [ + { + "id_organism": user.id_organisme, + "id_nomenclature_type_actor": id_nomenclature_type_actor, + } + ], } diff --git a/backend/gn_modulator/tests/test_repository.py b/backend/gn_modulator/tests/test_repository.py index 54fced52..ffaf7ce1 100644 --- a/backend/gn_modulator/tests/test_repository.py +++ b/backend/gn_modulator/tests/test_repository.py @@ -7,6 +7,7 @@ - list ?? """ +import os import pytest from .utils.repository import test_schema_repository from .data import commons as data_commons @@ -41,6 +42,7 @@ def test_repo_gn_meta_ca(self): def test_repo_gn_meta_jdd(self): test_schema_repository("meta.jdd", data_meta.jdd(), data_meta.jdd_update()) + # @pytest.mark.skip() def test_repo_diag(self, users, passages_faune_with_diagnostic): sm = SchemaMethods("m_sipaf.diag") fields = ["scope", "id_diagnostic"] @@ -61,6 +63,7 @@ def test_repo_diag(self, users, passages_faune_with_diagnostic): assert True + # @pytest.mark.skip() def test_repo_pf_update(self, passages_faune_with_diagnostic): sm = SchemaMethods("m_sipaf.pf") @@ -72,6 +75,7 @@ def test_repo_pf_update(self, passages_faune_with_diagnostic): assert sm.is_new_data(m, data) is False sm.update_row(uuid_pf, data, "uuid_passage_faune", "m_sipaf") + # @pytest.mark.skip() def test_repo_diag_cloture(self, passages_faune_with_diagnostic): sm = SchemaMethods("m_sipaf.diag") sm_org = SchemaMethods("user.organisme") @@ -130,6 +134,7 @@ def test_repo_diag_cloture(self, passages_faune_with_diagnostic): assert sm.is_new_data(m, data) sm.update_row(m.id_diagnostic, data) + # @pytest.mark.skip() def test_repo_pf_rel(self, passages_faune_with_diagnostic, users): sm = SchemaMethods("m_sipaf.pf") uuids_filter_value = ";".join( @@ -261,6 +266,7 @@ def test_repo_pf_nomenclature_spe(self): res_nom = res["nomenclature_ouvrage_specificite"] assert res_nom is None + # @pytest.mark.skip() def test_repo_pf_cruved(self, passages_faune_with_diagnostic, users): sm = SchemaMethods("m_sipaf.pf") uuids_filter_value = ";".join( @@ -326,6 +332,7 @@ def test_repo_pf_filter_has_diagnostic(self, passages_faune_with_diagnostic, use res = sm.serialize_list(m_list, fields) assert len(res) == 2 + # @pytest.mark.skip() def test_repo_synthese_d_within( self, passages_faune_with_diagnostic, synthese_data, users, g_permissions ): @@ -377,6 +384,7 @@ def test_repo_synthese_scope(self, synthese_data, users, datasets): assert len(res[user]) == 9 assert all(r["scope"] == 2 for r in res[user]) + # @pytest.mark.skip() def test_repo_synthese_permission(self, synthese_sensitive_data, users, g_permissions): for key in synthese_sensitive_data: s = synthese_sensitive_data[key] diff --git a/backend/gn_modulator/tests/test_rest_api.py b/backend/gn_modulator/tests/test_rest_api.py index 74f2c337..e472393a 100644 --- a/backend/gn_modulator/tests/test_rest_api.py +++ b/backend/gn_modulator/tests/test_rest_api.py @@ -34,13 +34,26 @@ class TestRest: # data_commons.module_update(), # ) - def test_m_sipaf_pf(self, client, users): + def test_rest_m_sipaf_pf_admin(self, client, users): + user = users["admin_user"] test_schema_rest( client, - users["admin_user"], + user, "m_sipaf", "site", - data_commons.pf(), + data_commons.pf(user), + data_commons.pf_update(), + breadcrumbs_page_code="site_details", + ) + + def test_rest_m_sipaf_pf_user(self, client, users): + user = users["user"] + test_schema_rest( + client, + user, + "m_sipaf", + "site", + data_commons.pf(user), data_commons.pf_update(), breadcrumbs_page_code="site_details", ) diff --git a/backend/gn_modulator/tests/utils/rest.py b/backend/gn_modulator/tests/utils/rest.py index 3f443281..9dd93079 100644 --- a/backend/gn_modulator/tests/utils/rest.py +++ b/backend/gn_modulator/tests/utils/rest.py @@ -5,6 +5,21 @@ from pypnusershub.tests.utils import set_logged_user_cookie, unset_logged_user_cookie +def get_fields(data_post): + """TODO à ajouter aux SchemaMethods ?""" + fields = [] + for key, value in data_post.items(): + if not isinstance(value, list): + fields.append(key) + continue + for item in value: + for item_key in item.keys(): + whole_key = f"{key}.{item_key}" + if whole_key not in fields: + fields.append(whole_key) + return fields + + @pytest.mark.skip() def test_schema_rest( client, user, module_code, object_code, data_post, data_update, breadcrumbs_page_code=None @@ -43,7 +58,7 @@ def test_schema_rest( assert r.status_code == 404, "La donnée ne devrait pas exister" # POST - fields = list(data_post.keys()) + fields = get_fields(data_post) fields.append(sm.Model().pk_field_name()) r = client.post( @@ -56,7 +71,7 @@ def test_schema_rest( data=data_post, ) - assert r.status_code == 200, "Erreur avec POST" + assert r.status_code == 200, f"Erreur avec POST : {r.status_code} {r.response}" data_from_post = r.json assert all(data_post[k] == data_from_post[k] for k in list(data_post.keys())) @@ -110,7 +125,7 @@ def test_schema_rest( "modulator.api_breadcrumbs", module_code=module_code, page_code=breadcrumbs_page_code, - **data_from_post + **data_from_post, ), data=data_update, )