-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathchatbot.py
415 lines (339 loc) · 13.2 KB
/
chatbot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
"""Python implementation of a Tinode chatbot."""
# For compatibility between python 2 and 3
from __future__ import print_function
import argparse
import base64
from concurrent import futures
from datetime import datetime
import json
import os
import pkg_resources
import platform
try:
import Queue as queue
except ImportError:
import queue
import random
import signal
import sys
import time
import grpc
from google.protobuf.json_format import MessageToDict
# Import generated grpc modules
from tinode_grpc import pb
from tinode_grpc import pbx
# For compatibility with python2
if sys.version_info[0] >= 3:
unicode = str
APP_NAME = "Tino-chatbot"
APP_VERSION = "1.2.2"
LIB_VERSION = pkg_resources.get_distribution("tinode_grpc").version
# Maximum length of string to log. Shorten longer strings.
MAX_LOG_LEN = 64
# User ID of the current user
botUID = None
# Dictionary wich contains lambdas to be executed when server response is received
onCompletion = {}
# This is needed for gRPC ssl to work correctly.
os.environ["GRPC_SSL_CIPHER_SUITES"] = "HIGH+ECDSA"
def log(*args):
print(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], *args)
# Add bundle for future execution
def add_future(tid, bundle):
onCompletion[tid] = bundle
# Shorten long strings for logging.
def clip_long_string(obj):
if isinstance(obj, unicode) or isinstance(obj, str):
if len(obj) > MAX_LOG_LEN:
return '<' + str(len(obj)) + ' bytes: ' + obj[:12] + '...' + obj[-12:] + '>'
return obj
elif isinstance(obj, (list, tuple)):
return [clip_long_string(item) for item in obj]
elif isinstance(obj, dict):
return dict((key, clip_long_string(val)) for key, val in obj.items())
else:
return obj
def to_json(msg):
return json.dumps(clip_long_string(MessageToDict(msg)))
# Resolve or reject the future
def exec_future(tid, code, text, params):
bundle = onCompletion.get(tid)
if bundle != None:
del onCompletion[tid]
try:
if code >= 200 and code < 400:
arg = bundle.get('arg')
bundle.get('onsuccess')(arg, params)
else:
log("Error: {} {} ({})".format(code, text, tid))
onerror = bundle.get('onerror')
if onerror:
onerror(bundle.get('arg'), {'code': code, 'text': text})
except Exception as err:
log("Error handling server response", err)
# List of active subscriptions
subscriptions = {}
def add_subscription(topic):
subscriptions[topic] = True
def del_subscription(topic):
subscriptions.pop(topic, None)
def subscription_failed(topic, errcode):
if topic == 'me':
# Failed 'me' subscription means the bot is disfunctional.
if errcode.get('code') == 502:
# Cluster unreachable. Break the loop and retry in a few seconds.
client_post(None)
else:
exit(1)
def login_error(unused, errcode):
# Check for 409 "already authenticated".
if errcode.get('code') != 409:
exit(1)
def server_version(params):
if params == None:
return
log("Server:", params['build'].decode('ascii'), params['ver'].decode('ascii'))
def next_id():
next_id.tid += 1
return str(next_id.tid)
next_id.tid = 100
# Quotes from the fortune cookie file
quotes = []
def next_quote():
idx = random.randrange(0, len(quotes))
# Make sure quotes are not repeated
while idx == next_quote.idx:
idx = random.randrange(0, len(quotes))
next_quote.idx = idx
return quotes[idx]
next_quote.idx = 0
# This is the class for the server-side gRPC endpoints
class Plugin(pbx.PluginServicer):
def Account(self, acc_event, context):
action = None
if acc_event.action == pb.CREATE:
action = "created"
# TODO: subscribe to the new user.
elif acc_event.action == pb.UPDATE:
action = "updated"
elif acc_event.action == pb.DELETE:
action = "deleted"
else:
action = "unknown"
log("Account", action, ":", acc_event.user_id, acc_event.public)
return pb.Unused()
queue_out = queue.Queue()
def client_generate():
while True:
msg = queue_out.get()
if msg == None:
return
log("out:", to_json(msg))
yield msg
def client_post(msg):
queue_out.put(msg)
def client_reset():
# Drain the queue
try:
while queue_out.get(False) != None:
pass
except queue.Empty:
pass
def hello():
tid = next_id()
add_future(tid, {
'onsuccess': lambda unused, params: server_version(params),
})
return pb.ClientMsg(hi=pb.ClientHi(id=tid, user_agent=APP_NAME + "/" + APP_VERSION + " (" +
platform.system() + "/" + platform.release() + "); gRPC-python/" + LIB_VERSION,
ver=LIB_VERSION, lang="EN"))
def login(cookie_file_name, scheme, secret):
tid = next_id()
add_future(tid, {
'arg': cookie_file_name,
'onsuccess': lambda fname, params: on_login(fname, params),
'onerror': lambda unused, errcode: login_error(unused, errcode),
})
return pb.ClientMsg(login=pb.ClientLogin(id=tid, scheme=scheme, secret=secret))
def subscribe(topic):
tid = next_id()
add_future(tid, {
'arg': topic,
'onsuccess': lambda topicName, unused: add_subscription(topicName),
'onerror': lambda topicName, errcode: subscription_failed(topicName, errcode),
})
return pb.ClientMsg(sub=pb.ClientSub(id=tid, topic=topic))
def leave(topic):
tid = next_id()
add_future(tid, {
'arg': topic,
'onsuccess': lambda topicName, unused: del_subscription(topicName)
})
return pb.ClientMsg(leave=pb.ClientLeave(id=tid, topic=topic))
def publish(topic, text):
tid = next_id()
return pb.ClientMsg(pub=pb.ClientPub(id=tid, topic=topic, no_echo=True,
head={"auto": json.dumps(True).encode('utf-8')}, content=json.dumps(text).encode('utf-8')))
def note_read(topic, seq):
return pb.ClientMsg(note=pb.ClientNote(topic=topic, what=pb.READ, seq_id=seq))
def init_server(listen):
# Launch plugin server: accept connection(s) from the Tinode server.
server = grpc.server(futures.ThreadPoolExecutor(max_workers=16))
pbx.add_PluginServicer_to_server(Plugin(), server)
server.add_insecure_port(listen)
server.start()
log("Plugin server running at '"+listen+"'")
return server
def init_client(addr, schema, secret, cookie_file_name, secure, ssl_host):
log("Connecting to", "secure" if secure else "", "server at", addr,
"SNI="+ssl_host if ssl_host else "")
channel = None
if secure:
opts = (('grpc.ssl_target_name_override', ssl_host),) if ssl_host else None
channel = grpc.secure_channel(addr, grpc.ssl_channel_credentials(), opts)
else:
channel = grpc.insecure_channel(addr)
# Call the server
stream = pbx.NodeStub(channel).MessageLoop(client_generate())
# Session initialization sequence: {hi}, {login}, {sub topic='me'}
client_post(hello())
client_post(login(cookie_file_name, schema, secret))
return stream
def client_message_loop(stream):
try:
# Read server responses
for msg in stream:
log(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], "in:", to_json(msg))
if msg.HasField("ctrl"):
# Run code on command completion
exec_future(msg.ctrl.id, msg.ctrl.code, msg.ctrl.text, msg.ctrl.params)
elif msg.HasField("data"):
# log("message from:", msg.data.from_user_id)
# Protection against the bot talking to self from another session.
if msg.data.from_user_id != botUID:
# Respond to message.
# Mark received message as read
client_post(note_read(msg.data.topic, msg.data.seq_id))
# Insert a small delay to prevent accidental DoS self-attack.
time.sleep(0.1)
# Respond with a witty quote
client_post(publish(msg.data.topic, next_quote()))
elif msg.HasField("pres"):
# log("presence:", msg.pres.topic, msg.pres.what)
# Wait for peers to appear online and subscribe to their topics
if msg.pres.topic == 'me':
if (msg.pres.what == pb.ServerPres.ON or msg.pres.what == pb.ServerPres.MSG) \
and subscriptions.get(msg.pres.src) == None:
client_post(subscribe(msg.pres.src))
elif msg.pres.what == pb.ServerPres.OFF and subscriptions.get(msg.pres.src) != None:
client_post(leave(msg.pres.src))
else:
# Ignore everything else
pass
except grpc._channel._Rendezvous as err:
log("Disconnected:", err)
def read_auth_cookie(cookie_file_name):
"""Read authentication token from a file"""
cookie = open(cookie_file_name, 'r')
params = json.load(cookie)
cookie.close()
schema = params.get("schema")
secret = None
if schema == None:
return None, None
if schema == 'token':
secret = base64.b64decode(params.get('secret').encode('utf-8'))
else:
secret = params.get('secret').encode('utf-8')
return schema, secret
def on_login(cookie_file_name, params):
global botUID
client_post(subscribe('me'))
"""Save authentication token to file"""
if params == None or cookie_file_name == None:
return
if 'user' in params:
botUID = params['user'].decode("ascii")[1:-1]
# Protobuf map 'params' is not a python object or dictionary. Convert it.
nice = {'schema': 'token'}
for key_in in params:
if key_in == 'token':
key_out = 'secret'
else:
key_out = key_in
nice[key_out] = json.loads(params[key_in].decode('utf-8'))
try:
cookie = open(cookie_file_name, 'w')
json.dump(nice, cookie)
cookie.close()
except Exception as err:
log("Failed to save authentication cookie", err)
def load_quotes(file_name):
with open(file_name) as f:
for line in f:
quotes.append(line.strip())
return len(quotes)
def run(args):
schema = None
secret = None
if args.login_token:
"""Use token to login"""
schema = 'token'
secret = args.login_token.encode('ascii')
log("Logging in with token", args.login_token)
elif args.login_basic:
"""Use username:password"""
schema = 'basic'
secret = args.login_basic.encode('utf-8')
log("Logging in with login:password", args.login_basic)
else:
"""Try reading the cookie file"""
try:
schema, secret = read_auth_cookie(args.login_cookie)
log("Logging in with cookie file", args.login_cookie)
except Exception as err:
log("Failed to read authentication cookie", err)
if schema:
# Load random quotes from file
log("Loaded {} quotes".format(load_quotes(args.quotes)))
# Start Plugin server
server = init_server(args.listen)
# Initialize and launch client
client = init_client(args.host, schema, secret, args.login_cookie, args.ssl, args.ssl_host)
# Setup closure for graceful termination
def exit_gracefully(signo, stack_frame):
log("Terminated with signal", signo)
server.stop(0)
client.cancel()
sys.exit(0)
# Add signal handlers
signal.signal(signal.SIGINT, exit_gracefully)
signal.signal(signal.SIGTERM, exit_gracefully)
# Run blocking message loop in a cycle to handle
# server being down.
while True:
client_message_loop(client)
time.sleep(3)
client_reset()
client = init_client(args.host, schema, secret, args.login_cookie, args.ssl, args.ssl_host)
# Close connections gracefully before exiting
server.stop(None)
client.cancel()
else:
log("Error: authentication scheme not defined")
if __name__ == '__main__':
"""Parse command-line arguments. Extract server host name, listen address, authentication scheme"""
random.seed()
purpose = "Tino, Tinode's chatbot."
log(purpose)
parser = argparse.ArgumentParser(description=purpose)
parser.add_argument('--host', default='localhost:16060', help='address of Tinode server gRPC endpoint')
parser.add_argument('--ssl', action='store_true', help='use SSL to connect to the server')
parser.add_argument('--ssl-host', help='SSL host name to use instead of default (useful for connecting to localhost)')
parser.add_argument('--listen', default='0.0.0.0:40051', help='address to listen on for incoming Plugin API calls')
parser.add_argument('--login-basic', help='login using basic authentication username:password')
parser.add_argument('--login-token', help='login using token authentication')
parser.add_argument('--login-cookie', default='.tn-cookie', help='read credentials from the provided cookie file')
parser.add_argument('--quotes', default='quotes.txt', help='file with messages for the chatbot to use, one message per line')
args = parser.parse_args()
run(args)