-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmove_task.py
69 lines (59 loc) · 2.23 KB
/
move_task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import psycopg
from qgis.core import QgsTask
class MoveTask(QgsTask):
def __init__(self, description, query, project_title, db, finished_fnc,
failed_fnc):
super(MoveTask, self).__init__(description, QgsTask.CanCancel)
self.query = query
self.project_title = project_title
self.db = db
self.finished_fnc = finished_fnc
self.failed_fnc = failed_fnc
self.result_params = None
self.error_msg = None
def finished(self, result):
if result:
self.finished_fnc(self.db, self.query, self.result_params)
else:
self.failed_fnc(self.error_msg)
class MoveGeomTask(MoveTask):
def __init__(self, description, query, project_title, db, finished_fnc,
failed_fnc):
super(MoveGeomTask, self).__init__(description, query, project_title,
db, finished_fnc, failed_fnc)
def run(self):
try:
view_name, col_names, srids, geom_types = self.query.create_geom_view(
self.project_title, self.db)
self.result_params = {
'view_name': view_name,
'col_names': col_names,
'srids': srids,
'geom_types': geom_types
}
except psycopg.Error as e:
self.error_msg = str(e)
return False
except ValueError as e:
self.error_msg = str(e)
return False
return True
class MoveTTask(MoveTask):
def __init__(self, description, query, project_title, db, col_id,
finished_fnc, failed_fnc):
super(MoveTTask, self).__init__(description, query, project_title, db,
finished_fnc, failed_fnc)
self.col_id = col_id
def run(self):
try:
view_name, srid = self.query.create_temporal_view(
self.project_title, self.db, self.col_id)
self.result_params = {
'col_id': self.col_id,
'view_name': view_name,
'srid': srid
}
except psycopg.Error as e:
self.error_msg = e.diag.message_primary
return False
return True