-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmodule.py
277 lines (227 loc) · 11.6 KB
/
module.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
import re
import json
import os
class Module:
class CommandError(Exception):
def __init__(self, message=None):
self.message = message
super().__init__(message)
class CommandSuccess(Exception):
def __init__(self, message=None):
self.message = message
super().__init__(message)
class CommandNotAllowed(Exception):
def __init__(self, message=None):
self.message = message
super().__init__(message)
class CommandDM(Exception):
def __init__(self, direct_message, public_message=None):
self.direct_message = direct_message
self.public_message = public_message
super().__init__(direct_message)
async def send_error(self, message, comment=None, comment_wrapping=True):
await message.add_reaction('❌')
if comment:
if comment_wrapping:
comment = f'```diff\n- {comment}\n```'
await message.channel.send(comment)
async def send_success(self, message, comment=None, comment_wrapping=True):
await message.add_reaction('✅')
if comment:
if comment_wrapping:
comment = f'```diff\n+ {comment}\n```'
await message.channel.send(comment)
async def send_not_allowed(self, message, comment=None, comment_wrapping=True):
await message.add_reaction('🔒')
if comment:
if comment_wrapping:
comment = f'```diff\n- {comment}\n```'
await message.channel.send(comment)
async def send_dm(self, message, direct_message, public_message=None):
await message.add_reaction('📨')
await message.author.send(direct_message)
if public_message:
await message.channel.send(public_message)
async def send_internal_error(self, message, exception):
await message.add_reaction('⚠')
await message.channel.send('```diff\n- An internal error occurred (this isn\'t your fault)\n```')
if self.lemmy.config['notify_admins_about_errors']:
for user_id in self.lemmy.config['admins']:
if admin := message.channel.guild.get_member(user_id):
message = (f'Exception in {message.channel.guild.name}#{message.channel.name}:\n'
f'{message.author.name}: `{message.content[:50]}{"..." if len(message.content) > 50 else ""}`\n'
f'-> {type(exception).__name__}: {str(exception)}')
await admin.send(message)
def __init__(self, lemmy):
self.lemmy = lemmy
self.client = lemmy.client # this line is arguably bad taste code, but the name binding removes a *lot* of typing
self._commands = { function[4:]: getattr(self, function) for function in dir(self) if function.startswith('cmd_') }
async def on_ready(self):
pass
async def on_message(self, message):
if message.author != self.client.user:
await self.call_functions(message)
async def on_voice_state_update(self, member, before, after):
pass
async def on_reaction_add(self, reaction, user):
pass
async def call_functions(self, message):
# get parsed message
terms = Module.deconstruct_message(message)
args = terms['args']
kwargs = terms['kwargs']
# resolve command symbol
symbol = self.lemmy.resolve_symbol(message.channel)
# call command and handle result
if len(args) > 0 and args[0].startswith(symbol):
args[0] = args[0][len(symbol):]
command = args[0].replace('-', '_')
if command in self._commands:
# check if the user is permitted to use this command
if self.get_docs_attr(command, 'admin_only', default=False) and not message.author.id in self.lemmy.config['admins']:
await self.send_not_allowed(message)
elif self.get_docs_attr(command, 'moderator_only', default=False) and not await self.lemmy.user_has_mod_privs(message.author.id):
await self.send_not_allowed(message, comment='you must be a Lemmy moderator to use this command')
else:
try:
await self._commands[command](message, args[1:], kwargs)
except Module.CommandError as e:
usage_message = ('Usage: `' + self.get_docs_attr(command, 'usage') + '`') if self.get_docs_attr(command, 'usage') else None
await self.send_error(message, e.message or usage_message)
except Module.CommandSuccess as e:
await self.send_success(message, e.message)
except Module.CommandNotAllowed as e:
await self.send_not_allowed(message, e.message)
except Module.CommandDM as e:
await self.send_dm(message, e.direct_message, e.public_message)
except Exception as e:
await self.send_internal_error(message, e)
raise e
@staticmethod
def deconstruct_message(message):
# separate message into terms whitespace-delimited terms, preserving quoted sections
quote = None
terms = ['']
for char in message.content:
# conditions -> actions:
# quote closed, non-quote character -> add to most recent term
# quote closed, quote character -> set quote
# quote closed, whitespace character -> add new term if most recent isn't empty
# quote open, non-quote character -> add to most recent term
# quote open, matching quote -> clear quote
# quote open, non-matching quote -> add to most recent term
# quote open, whitespace character -> add to most recent term
# these need to be evaluated before action is taken
set_quote = quote is None and char in ['\'', '"', '`']
clear_quote = quote is not None and char == quote
add_new_term = quote is None and char.isspace() and not terms[-1] == ''
add_to_most_recent = not any([set_quote, clear_quote, add_new_term])
if add_to_most_recent:
terms[-1] += char
if set_quote:
quote = char
if clear_quote:
quote = None
if add_new_term:
terms.append('')
# remove trailing empty term
while len(terms) > 0 and terms[-1] == '':
del terms[-1]
kwarg_match = re.compile('([a-z]+)=(\S+)|[\'"`](.+)[\'"`]')
args = [ term for term in terms if not re.fullmatch(kwarg_match, term) ]
kwargs = { match.group(1): match.group(2) for match in filter(lambda match: match is not None, map(lambda term: re.fullmatch(kwarg_match, term), terms)) }
return {
'args': args,
'kwargs': kwargs
}
def get_docs_attr(self, command, attr, default=None):
try:
return getattr(self, f'docs_{command}')[attr]
except (AttributeError, KeyError):
return default
def get_module_docs_attr(self, attr, default=None):
try:
return getattr(self, 'docs')[attr]
except (AttributeError, KeyError):
return default
def get_help_text(self, command=None, symbol=''):
# if a command is being inspected, verify that it exists
if command and not hasattr(self, f'cmd_{command}'):
raise AttributeError(f'Module \'{type(self).__name__}\' has no command \'{command}\'')
lines = []
# asking about a specific command
if command:
# title
lines.append(f'```apacheconf\n' + symbol + (self.get_docs_attr(command, 'usage') or command) + '\n```')
# description
lines.append(self.get_docs_attr(command, 'description'))
# examples
examples = self.get_docs_attr(command, 'examples')
if examples:
lines.append('Examples:\n ' + '\n '.join(f'`{symbol}{example}`' for example in examples))
# asking about the module generally
# the resultant text should be wrapped in a diff code block by whatever calls for it
else:
# title
name = type(self).__name__
module_description = self.get_module_docs_attr('description')
module_description = f' - {module_description}' if module_description else ''
lines.append(f'+ {name}{module_description}')
# commands
for command_name in self._commands.keys():
# we want to append the command description if available
command_description = self.get_docs_attr(command_name, 'description')
command_description = ' - ' + command_description if command_description else ''
lock_or_spaces = '🔒' if self.get_docs_attr(command_name, 'admin_only', default=False) else ' '
lines.append(f' {lock_or_spaces} {symbol}{command_name}{command_description}')
# filter Nones out and put the rest into a list
lines = list(filter(lambda line: line is not None, lines))
# convert to string and return
return '\n'.join(lines)
def _load(self, file_location, default=None, static=False, bytes=False):
storage_type = 'static' if static else 'data'
full_path = f'{storage_type}/{self.__class__.__name__}/{file_location}'
directory = '/'.join(full_path.split('/')[:-1])
# check that directory exists
if not os.path.isdir(directory):
os.makedirs(directory)
# check that file exists
if not os.path.isfile(full_path):
if default is not None:
with open(full_path, 'w') as f:
f.write(default)
# get data
with open(full_path, 'rb' if bytes else 'r') as f:
return f.read()
def _save(self, file_location, content, static=False, bytes=False):
storage_type = 'static' if static else 'data'
full_path = f'{storage_type}/{self.__class__.__name__}/{file_location}'
directory = '/'.join(full_path.split('/')[:-1])
# check that directory exists
if not os.path.isdir(directory):
os.makedirs(directory)
# no need to check that file exists
# save data
with open(full_path, 'wb' if bytes else 'w') as f:
f.write(content)
def load_data(self, document_name, static=False, default='{}'):
return json.loads(self._load(f'{document_name}.json', static=static, default=default))
def save_data(self, document_name, data, static=False):
self._save(f'{document_name}.json', json.dumps(data, indent=4), static=static)
def load_image(self, file_location, static=False):
return self._load(file_location, static=static, bytes=True)
def list_files(self, path, static=False):
storage_type = 'static' if static else 'data'
directory = f'{storage_type}/{self.__class__.__name__}/{path}'
# check that directory exists
if not os.path.isdir(directory):
os.makedirs(directory)
return os.listdir(directory)
def data_exists(self, file_location, static=False):
storage_type = 'static' if static else 'data'
full_path = f'{storage_type}/{self.__class__.__name__}/{file_location}'
directory = '/'.join(full_path.split('/')[:-1])
# check that directory exists
if not os.path.isdir(directory):
os.makedirs(directory)
return os.path.exists(full_path)