-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfsm.py
331 lines (223 loc) · 9.62 KB
/
fsm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# -*- coding: utf-8 -*-
import logging as log
import re
import telegram as tm
from pidgey import convert_num_sys
import texts as tx
# -- base types -----------------------------------------------------------
class State(object):
def on_trigger(self, trigger):
pass
def _on_trigger(self, trigger):
log.debug('== ' + str(self))
return self.on_trigger(trigger)
def on_enter(self, trigger):
pass
def _on_enter(self, trigger):
log.debug('-> ' + str(self))
return self.on_enter(trigger)
def on_exit(self, trigger):
pass
def _on_exit(self, trigger):
log.debug('<- ' + str(self))
return self.on_exit(trigger)
class Filter(object):
def __init__(self):
pass
def on_process(self, current_state, trigger):
pass
def _on_process(self, current_state, trigger):
log.debug(':: ' + type(self).__name__)
return self.on_process(current_state, trigger)
class StateMachine(object):
def __init__(self, user):
self.state = BootStrapState()
self.user = user
self.filters = [StartFilter(),
FeedbackFilter(),
PoliteFilter()]
def fire(self, trigger):
trigger.user = self.user
for f in self.filters:
filtered_state = f._on_process(self.state, trigger)
if filtered_state:
self.to_state(filtered_state, trigger)
return
new_state = self.state._on_trigger(trigger)
self.to_state(new_state, trigger)
def to_state(self, new_state, trigger):
if not new_state:
return self.state
if new_state == self.state:
reenter_state = self.state._on_enter(trigger)
self.to_state(reenter_state, trigger)
return
exit_state = self.state._on_exit(trigger)
if exit_state:
self.to_state(exit_state, trigger)
return
self.state = new_state
enter_state = self.state._on_enter(trigger)
if enter_state:
self.to_state(enter_state, trigger)
return
class TelegramTrigger(object):
def __init__(self):
self.user = None
self.bot = None
self.update = None
def get_chat_id(self):
return self.update.message.chat_id if self.update else None
def get_txt(self):
return self.update.message.text if self.update else None
def get_name(self):
user = self.update.message.from_user
return user.first_name if user.first_name else user.username
def send_msg(self, txt):
self.bot.sendMessage(chat_id=self.chat_id,
text=txt,
disable_web_page_preview=True,
parse_mode=tm.ParseMode.MARKDOWN)
def send_keys(self, txt, keyboard):
reply_markup = tm.ReplyKeyboardMarkup(keyboard=keyboard,
resize_keyboard=True,
one_time_keyboard=True)
self.bot.sendMessage(chat_id=self.chat_id,
text=txt,
disable_web_page_preview=True,
parse_mode=tm.ParseMode.MARKDOWN,
reply_markup=reply_markup)
def send_photo(self, src):
self.bot.sendPhoto(chat_id=self.chat_id, photo=src)
# will call 'get_chat_id' when accessing like obj.chat_id
chat_id = property(get_chat_id)
txt = property(get_txt)
name = property(get_name)
# -- states ---------------------------------------------------------------
class BootStrapState(State):
def on_trigger(self, trigger):
return RootState()
class RootState(State):
def on_enter(self, trigger):
trigger.send_keys(u'Из какой системы будем переводить?',
[[u'2ичной', u'8иричной',
u'10ичной', u'16еричной']])
def on_trigger(self, trigger):
f_num_sys = None
msg = (u'двоичной|2|2ичной|bin')
if tx.equals(trigger.txt, msg):
f_num_sys = 2
msg = (u'восьмеричной|8|8ичной|oct')
if tx.equals(trigger.txt, msg):
f_num_sys = 8
msg = (u'десятичной|10|10ичной|dec')
if tx.equals(trigger.txt, msg):
f_num_sys = 10
msg = (u'шестнадцатеричной|16|16ичной|hex')
if tx.equals(trigger.txt, msg):
f_num_sys = 16
if not f_num_sys:
trigger.send_msg(u'Извини, не понял')
return self
if f_num_sys:
return AskSystemState(f_num_sys)
return self
class AskSystemState(State):
def __init__(self, f_num_sys):
self.f_num_sys = f_num_sys
def on_enter(self, trigger):
trigger.send_keys(u'В какую?',
[[u'2ичной', u'8иричной',
u'10ичной', u'16еричной']])
def on_trigger(self, trigger):
t_num_sys = None
msg = (u'двоичную|2|2ичную|bin')
if tx.equals(trigger.txt, msg):
t_num_sys = 2
msg = (u'восьмеричную|8|8ичную|oct')
if tx.equals(trigger.txt, msg):
t_num_sys = 8
msg = (u'десятичную|10|10ичную|dec')
if tx.equals(trigger.txt, msg):
t_num_sys = 10
msg = (u'шестнадцатеричную|16|16ичную|hex')
if tx.equals(trigger.txt, msg):
t_num_sys = 16
if not t_num_sys:
trigger.send_msg(u'Извини, не понял')
return self
if t_num_sys:
return AskNumberState(self.f_num_sys, t_num_sys)
return self
class AskNumberState(State):
numbers_rgx = re.compile(r'[^0-9]')
def __init__(self, f_num_sys, t_num_sys):
self.f_num_sys = f_num_sys
self.t_num_sys = t_num_sys
def on_enter(self, trigger):
trigger.send_msg(u'Введи число')
def on_trigger(self, trigger):
try:
val = int(AskNumberState.numbers_rgx.sub('', trigger.txt))
return NumberCalculation(val, self.f_num_sys, self.t_num_sys)
except ValueError:
return self
class NumberCalculation(State):
def __init__(self, val, f_num_sys, t_num_sys):
self.val = val
self.f_num_sys = f_num_sys
self.t_num_sys = t_num_sys
def on_enter(self, trigger):
if self.f_num_sys and self.t_num_sys:
try:
self.val = str(self.val)
msg = convert_num_sys(self.val, self.f_num_sys, self.t_num_sys)
trigger.send_msg(u'Результат: ')
trigger.send_msg(str(msg))
trigger.send_keys(u'Ещё хочешь? :)',
[[u'Да', u'Хватит']])
except ValueError:
trigger.send_msg('Странное число. Попробуй ввести ещё раз')
return AskNumberState(self.f_num_sys, self.t_num_sys)
except TypeError:
trigger.send_msg('Это не число. Меня обманули')
return AskNumberState(self.f_num_sys, self.t_num_sys)
def on_trigger(self, trigger):
if tx.equals(trigger.txt, u'хватит|нет|н|-'):
return trigger.send_msg('Cya, {}! :3'.format(trigger.name))
if tx.equals(trigger.txt, u'покажи ещё|покажи еще|ещё|еще|да|+|д'):
return RootState()
class FeedbackState(State):
def on_enter(self, trigger):
trigger.send_msg(u'В целях улучшения качества обслуживания '
u'все разговоры записываются ;) Напиши свои мысли')
def on_trigger(self, trigger):
log.warn('feedback: ' + str(trigger.update))
trigger.send_msg(u'Спасибо!')
return RootState()
# -- filters -------------------------------------------------------------
class StartFilter(Filter):
def on_process(self, current_state, trigger):
if tx.is_command(trigger.txt, '/start|/help'):
trigger.send_msg(u'Если у тебя что-то пошло не так '
u'или ты хочешь поделиться с нами своими '
u'мыслями - просто напиши в чате /feedback '
u'и опиши, что случилось. '
u'Мы обязательно что-нибудь придумаем!')
return RootState()
class PoliteFilter(Filter):
def on_process(self, current_state, trigger):
if tx.equals(trigger.txt, u'привет|здравствуй|хай|hello|hallo|hi'):
trigger.send_msg(u'Привет, {}! ^^'.format(trigger.name))
if type(current_state) == BootStrapState:
return RootState()
return current_state
byes = (u'пока|до свидания|бб|66|бай-бай|пока-пока'
u'|goodbye|спокойной ночи')
if tx.equals(trigger.txt, byes):
trigger.send_msg(u'Пока, {}! :3'.format(trigger.name))
return BootStrapState()
class FeedbackFilter(Filter):
def on_process(self, current_state, trigger):
if tx.is_command(trigger.txt, '/feedback'):
return FeedbackState()