forked from Bytespeicher/Bytebot
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbytebot.py
More file actions
196 lines (154 loc) · 6.18 KB
/
bytebot.py
File metadata and controls
196 lines (154 loc) · 6.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
from twisted.words.protocols import irc
from twisted.internet import reactor, protocol, ssl, task
from twisted.python import logfile, log
from bytebot_config import BYTEBOT_NICK, BYTEBOT_PASSWORD, BYTEBOT_CHANNEL, \
BYTEBOT_SSL, BYTEBOT_PORT, BYTEBOT_SERVER, BYTEBOT_PLUGINS, \
BYTEBOT_LOGPATH, BYTEBOT_LOGLEVEL
from bytebotpluginloader import ByteBotPluginLoader
from bytebot_log import BytebotLogObserver, LOG_ERROR, LOG_INFO, LOG_WARN, \
LOG_DEBUG
class ByteBot(irc.IRCClient):
nickname = BYTEBOT_NICK
password = BYTEBOT_PASSWORD
realname = BYTEBOT_NICK
username = BYTEBOT_NICK
channel = BYTEBOT_CHANNEL
plugins = {}
def registerCommand(self, name, description=''):
self.plugins[name] = description
def connectionMade(self):
irc.IRCClient.connectionMade(self)
self.factory.plugins.run(
'registerCommand',
{'irc': self}
)
def connectionLost(self, reason):
irc.IRCClient.connectionLost(self, reason)
def signedOn(self):
log.msg("[sign on]")
self.join(self.factory.channel)
self.startCron()
def joined(self, channel):
log.msg("[joined channel %s]" % channel)
self.factory.plugins.run('onJoined',
{
'irc': self,
'channel': channel
})
def privmsg(self, user, channel, msg):
self.factory.plugins.run('onPrivmsg',
{
'irc': self,
'user': user,
'channel': channel,
'msg': msg
})
user = user.split("!", 1)[0]
if channel == self.nickname:
""" User whispering to the bot"""
self.msg(
user,
"I don't usually whisper back. But when I do, it's to you."
)
return
if msg.startswith(self.nickname + ":"):
msg = "%s: Ich bin ein Bot. Meine Intelligenz ist limitiert" % user
self.msg(channel, msg)
log.msg("<%s> %s" % (self.nickname, msg))
if msg.startswith('!commands'):
for pid, name in enumerate(self.plugins):
self.msg(channel, "%s. %s:" % (pid+1, name))
self.msg(channel, "\t%s" % self.plugins[name])
def userJoined(self, user, channel):
self.factory.plugins.run('onUserJoined',
{
'irc': self,
'user': user,
'channel': channel
})
def irc_JOIN(self, prefix, params):
self.factory.plugins.run('onIrc_JOIN',
{
'irc': self,
'prefix': prefix,
'params': params
})
def noticed(self, user, channel, message):
"""
This function is called if a NOTICE is received. According to the RFC
one MUST NOT send automatic replies in response to a NOTICE to avoid
loops between clients.
"""
pass
def action(self, user, channel, msg):
user = user.split("!", 1)[0]
def irc_RPL_TOPIC(self, prefix, params):
self.current_topic = params
def alterCollidedNick(self, nickname):
return nickname + "^"
def startCron(self):
def runPerMinute():
log.msg("[running cron - every 60s]")
self.factory.plugins.run('minuteCron', {'irc': self})
def runPerFiveMinutes():
log.msg("[running cron - every 5m]")
self.factory.plugins.run('fiveMinuteCron', {'irc': self})
def runPerHour():
log.msg("[running cron - every 60m]")
self.factory.plugins.run('hourCron', {'irc': self})
def runPerDay():
log.msg("[running cron - every 24h]")
self.factory.plugins.run('dayCron', {'irc': self})
self.minuteCron = task.LoopingCall(runPerMinute)
self.minuteCron.start(60.0)
self.fiveMinuteCron = task.LoopingCall(runPerFiveMinutes)
self.fiveMinuteCron.start(300.0)
self.hourCron = task.LoopingCall(runPerHour)
self.hourCron.start(3600.0)
self.dayCron = task.LoopingCall(runPerDay)
self.dayCron.start(86400.0)
class ByteBotFactory(protocol.ClientFactory):
def __init__(self, nickname, password, channel):
self.nickname = nickname
self.password = password
self.channel = channel
self.plugins = ByteBotPluginLoader(BYTEBOT_PLUGINS)
def buildProtocol(self, addr):
p = ByteBot()
p.factory = self
return p
def clientConnectionLost(self, connector, reason):
connector.connect()
def clientConnectionFailed(self, connector, reason):
log("FATAL: connection failed: %s" % reason, level=LOG_ERROR)
reactor.stop()
if __name__ == '__main__':
# ERROR | WARNING
log_error = logfile.LogFile("error.log", BYTEBOT_LOGPATH,
rotateLength=10000000, maxRotatedFiles=100)
# INFO | DEBUG
log_info = logfile.LogFile("bytebot.log", BYTEBOT_LOGPATH,
rotateLength=10000000, maxRotatedFiles=100)
logger_error = BytebotLogObserver(
log_error,
(BYTEBOT_LOGLEVEL & ~LOG_INFO & ~LOG_DEBUG & ~LOG_WARN)
)
logger_info = BytebotLogObserver(
log_info,
(BYTEBOT_LOGLEVEL & ~LOG_ERROR)
)
log.addObserver(logger_error.emit)
log.addObserver(logger_info.emit)
f = ByteBotFactory(BYTEBOT_NICK, BYTEBOT_PASSWORD, BYTEBOT_CHANNEL)
if BYTEBOT_SSL is True:
reactor.connectSSL(
BYTEBOT_SERVER,
int(BYTEBOT_PORT),
f,
ssl.ClientContextFactory()
)
else:
reactor.connectTCP(BYTEBOT_SERVER, int(BYTEBOT_PORT), f)
reactor.run()