A daemon that receives and sends messages using SleekXMPP.
这是我用于 Orichalcum.X 通讯系统的守护进程。虽然还很不完善,但是可以使用。
This is a daemon that I use for the Orichalcum.X communication system. It’s not complete but works anyway.
限于篇幅,这里只写一些比较重要的代码摘要和解释。如果要获取完整的代码,请访问这个地方 。
Here is only a brief code explanation. Visit here to checkout the complete code.
首先导入一些必要的模块,设定日志。
Import necessary modules, set up logging.
import threading
import time
import copy
import logging
import sleekxmpp
import utils
logger = logging . getLogger ( 'orichalcumX.xmpp' )
Watchdog 是一个没什么用(至今观察到)的机制。我开始是为了试图终止进程而写的,发现并不管用。
Watchdog is a not functioning mechanism. First coded for killing the thread but it never works.
class MyClientXMPP ( sleekxmpp . ClientXMPP ):
def __init__ ( self , jid , password ):
sleekxmpp . ClientXMPP . __init__ ( self , jid , password )
self . feedDog ()
self . watchDog ()
def feedDog ( self ):
self . __last_feed = time . time ()
def watchDog ( self ):
if time . time () - self . __last_feed > 5 :
print "Watchdog barks."
self . abort ()
threading . Timer ( 0.1 , self . watchDog )
下面这个类用于设立一个新的线程。至今为止并没有使用 queue 来交换信息,但是似乎还能工作。
特别注意 def terminate()
下面的写法,终止进程前注意终止ClientXMPP,以免退不出去卡死。
Following class used for setting up a new thread. Up to now no queue module used, but works fine.
Pay attention to the definition of terminate, and kill ClientXMPP before killing self, otherwise it may stuck.
class XMPP ( threading . Thread ):
# XMPP queue stores incoming and outgoing messages.
# Item(s) in the queue are dict(s). Possible keys are: jid, message
outgoing_queue = []
incoming_queue = []
outgoing_lock = threading . Lock ()
incoming_lock = threading . Lock ()
connect_status = 0 # 0-disconnected 1-connecting 2-confirm_connected,
# -1:error
schedule_rec = { 'send_presence' : 0 , 'get_roster' : 0 }
schedule_set = { 'send_presence' : 30 , 'get_roster' : 30 }
def __init__ ( self , jid , password ):
threading . Thread . __init__ ( self )
self . _sig_terminate = threading . Event ()
self . xmpp = MyClientXMPP ( jid , password )
self . xmpp . reconnect_max_attempts = 0
self . xmpp . reconnect_delay = 0
self . jid = jid
self . xmpp . add_event_handler ( "session_start" , self . _onConnected )
self . xmpp . add_event_handler ( "message" , self . _onMessage )
self . xmpp . add_event_handler ( "disconnected" , self . _onDisconnected )
self . xmpp . add_event_handler ( "failed_auth" , self . _onFailedAuth )
self . xmpp . add_event_handler ( "socket_error" , self . _onSocketError )
self . xmpp . add_event_handler ( "connect_failed" , self . _onConnectFailed )
def run ( self ):
logger . info ( "XMPP Client [%s] starting..." % self . jid )
while not self . _sig_terminate . isSet ():
nowtime = time . time ()
if self . connect_status == 0 :
logger . info ( '[%s] client now disconnected.' % self . jid )
try :
self . xmpp . connect ()
self . xmpp . process ( block = False )
self . connect_status = 1
except Exception , e :
logger . error ( "XMPP deliver module: failed attempt to connect: %s" % e )
#self.terminate()
elif self . connect_status == 2 :
# Scheduled to send presence
if ( nowtime - self . schedule_rec [ 'send_presence' ] >
self . schedule_set [ 'send_presence' ]):
logger . info ( "Send a presence." )
self . xmpp . sendPresence ()
self . schedule_rec [ 'send_presence' ] = nowtime
# Scheduled to get roster
if ( nowtime - self . schedule_rec [ 'get_roster' ] >
self . schedule_set [ 'get_roster' ]):
logger . info ( "Try to get roster." )
self . xmpp . getRoster ( block = False )
self . schedule_rec [ 'get_roster' ] = nowtime
# empty send queue
self . outgoing_lock . acquire ()
while self . outgoing_queue :
message = self . outgoing_queue . pop ( 0 )
self . xmpp . sendMessage ( mto = message [ "jid" ],
mbody = message [ "message" ],
mtype = "chat" )
self . outgoing_lock . release ()
self . xmpp . feedDog ()
time . sleep ( 0.1 )
# Exiting
if self . connect_status == 2 :
self . xmpp . disconnect ( wait = True )
self . xmpp . stop . set ()
return
def _onSocketError ( self , event ):
logger . warning ( "Socket Error!" )
self . xmpp . disconnect ( wait = False )
def _onFailedAuth ( self , event ):
logger . warning ( "Authentication failed." )
self . connect_status = - 1
self . terminate ()
def _onConnected ( self , event ):
logger . info ( "Connected." )
self . xmpp . sendPresence ()
if self . connect_status >= 0 :
self . connect_status = 2
def _onDisconnected ( self , event ):
logger . info ( "Disconnected." )
if self . connect_status >= 0 :
self . connect_status = 0
def _onMessage ( self , message ):
logger . info ( "Got Message" )
self . incoming_lock . acquire ()
if message [ "type" ] in ( "chat" , "normal" ):
self . incoming_queue . append ({ "jid" : message [ "from" ],
"message" : message [ "body" ]})
self . incoming_lock . release ()
def _onConnectFailed ( self , event ):
logger . info ( "Connect failed." )
self . connect_status = - 1
def setMessage ( self , jid , message ):
self . outgoing_lock . acquire ()
self . outgoing_queue . append ({ "jid" : jid , "message" : message })
self . outgoing_lock . release ()
def getMessage ( self ):
self . incoming_lock . acquire ()
ret = copy . copy ( self . incoming_queue )
self . incoming_queue = []
self . incoming_lock . release ()
return ret
def terminate ( self ):
self . xmpp . stop . set ()
self . _sig_terminate . set ()
下面是测试代码部分。####
请用您自己的两个XMPP帐号名代替。
Now the testing code. Replace ####
with your own two XMPP account names.
if __name__ == '__main__' :
pwd = raw_input ( 'password:' )
x = XMPP ( '####' , pwd )
x . start ()
while True :
cmd = raw_input ( 'COMMAND: t, new, read, roster' )
if cmd == 't' :
x . terminate ()
x . join ()
del x
exit ()
if cmd == 'new' :
receiver = '####'
message = raw_input ( 'message?' )
x . setMessage ( receiver , message )
if cmd == 'roster' :
try :
print x . xmpp . client_roster
except :
pass
if cmd == 'read' :
print x . getMessage ()