uwsgidecorators.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. from functools import partial
  2. import sys
  3. from threading import Thread
  4. try:
  5. import cPickle as pickle
  6. except:
  7. import pickle
  8. import uwsgi
  9. if uwsgi.masterpid() == 0:
  10. raise Exception(
  11. "you have to enable the uWSGI master process to use this module")
  12. spooler_functions = {}
  13. mule_functions = {}
  14. postfork_chain = []
  15. # Python3 compatibility
  16. def _encode1(val):
  17. if sys.version_info >= (3, 0) and isinstance(val, str):
  18. return val.encode('utf-8')
  19. else:
  20. return val
  21. def _decode1(val):
  22. if sys.version_info >= (3, 0) and isinstance(val, bytes):
  23. return val.decode('utf-8')
  24. else:
  25. return val
  26. def _encode_to_spooler(vars):
  27. return dict((_encode1(K), _encode1(V)) for (K, V) in vars.items())
  28. def _decode_from_spooler(vars):
  29. return dict((_decode1(K), _decode1(V)) for (K, V) in vars.items())
  30. def get_free_signal():
  31. for signum in range(0, 256):
  32. if not uwsgi.signal_registered(signum):
  33. return signum
  34. raise Exception("No free uwsgi signal available")
  35. def manage_spool_request(vars):
  36. # To check whether 'args' is in vals or not - decode the keys first,
  37. # because in python3 all keys in 'vals' are have 'byte' types
  38. vars = dict((_decode1(K), V) for (K, V) in vars.items())
  39. if 'args' in vars:
  40. for k in ('args', 'kwargs'):
  41. vars[k] = pickle.loads(vars.pop(k))
  42. vars = _decode_from_spooler(vars)
  43. f = spooler_functions[vars['ud_spool_func']]
  44. if 'args' in vars:
  45. ret = f(*vars['args'], **vars['kwargs'])
  46. else:
  47. ret = f(vars)
  48. return int(vars.get('ud_spool_ret', ret))
  49. def postfork_chain_hook():
  50. for f in postfork_chain:
  51. f()
  52. uwsgi.spooler = manage_spool_request
  53. uwsgi.post_fork_hook = postfork_chain_hook
  54. class postfork(object):
  55. def __init__(self, f):
  56. if callable(f):
  57. self.wid = 0
  58. self.f = f
  59. else:
  60. self.f = None
  61. self.wid = f
  62. postfork_chain.append(self)
  63. def __call__(self, *args, **kwargs):
  64. if self.f:
  65. if self.wid > 0 and self.wid != uwsgi.worker_id():
  66. return
  67. return self.f()
  68. self.f = args[0]
  69. class _spoolraw(object):
  70. def __call__(self, *args, **kwargs):
  71. arguments = self.base_dict.copy()
  72. if not self.pass_arguments:
  73. if len(args) > 0:
  74. arguments.update(args[0])
  75. if kwargs:
  76. arguments.update(kwargs)
  77. else:
  78. spooler_args = {}
  79. for key in ('message_dict', 'spooler', 'priority', 'at', 'body'):
  80. if key in kwargs:
  81. spooler_args.update({key: kwargs.pop(key)})
  82. arguments.update(spooler_args)
  83. arguments.update(
  84. {'args': pickle.dumps(args), 'kwargs': pickle.dumps(kwargs)})
  85. return uwsgi.spool(_encode_to_spooler(arguments))
  86. # For backward compatibility (uWSGI < 1.9.13)
  87. def spool(self, *args, **kwargs):
  88. return self.__class__.__call__(self, *args, **kwargs)
  89. def __init__(self, f, pass_arguments):
  90. if not 'spooler' in uwsgi.opt:
  91. raise Exception(
  92. "you have to enable the uWSGI spooler to use @%s decorator" % self.__class__.__name__)
  93. self.f = f
  94. spooler_functions[self.f.__name__] = self.f
  95. # For backward compatibility (uWSGI < 1.9.13)
  96. self.f.spool = self.__call__
  97. self.pass_arguments = pass_arguments
  98. self.base_dict = {'ud_spool_func': self.f.__name__}
  99. class _spool(_spoolraw):
  100. def __call__(self, *args, **kwargs):
  101. self.base_dict['ud_spool_ret'] = str(uwsgi.SPOOL_OK)
  102. return _spoolraw.__call__(self, *args, **kwargs)
  103. class _spoolforever(_spoolraw):
  104. def __call__(self, *args, **kwargs):
  105. self.base_dict['ud_spool_ret'] = str(uwsgi.SPOOL_RETRY)
  106. return _spoolraw.__call__(self, *args, **kwargs)
  107. def spool_decorate(f=None, pass_arguments=False, _class=_spoolraw):
  108. if not f:
  109. return partial(_class, pass_arguments=pass_arguments)
  110. return _class(f, pass_arguments)
  111. def spoolraw(f=None, pass_arguments=False):
  112. return spool_decorate(f, pass_arguments)
  113. def spool(f=None, pass_arguments=False):
  114. return spool_decorate(f, pass_arguments, _spool)
  115. def spoolforever(f=None, pass_arguments=False):
  116. return spool_decorate(f, pass_arguments, _spoolforever)
  117. class mulefunc(object):
  118. def __init__(self, f):
  119. if callable(f):
  120. self.fname = f.__name__
  121. self.mule = 0
  122. mule_functions[f.__name__] = f
  123. else:
  124. self.mule = f
  125. self.fname = None
  126. def real_call(self, *args, **kwargs):
  127. uwsgi.mule_msg(pickle.dumps(
  128. {
  129. 'service': 'uwsgi_mulefunc',
  130. 'func': self.fname,
  131. 'args': args,
  132. 'kwargs': kwargs
  133. }
  134. ), self.mule)
  135. def __call__(self, *args, **kwargs):
  136. if not self.fname:
  137. self.fname = args[0].__name__
  138. mule_functions[self.fname] = args[0]
  139. return self.real_call
  140. return self.real_call(*args, **kwargs)
  141. def mule_msg_dispatcher(message):
  142. msg = pickle.loads(message)
  143. if msg['service'] == 'uwsgi_mulefunc':
  144. return mule_functions[msg['func']](*msg['args'], **msg['kwargs'])
  145. uwsgi.mule_msg_hook = mule_msg_dispatcher
  146. class rpc(object):
  147. def __init__(self, name):
  148. self.name = name
  149. def __call__(self, f):
  150. uwsgi.register_rpc(self.name, f)
  151. return f
  152. class farm_loop(object):
  153. def __init__(self, f, farm):
  154. self.f = f
  155. self.farm = farm
  156. def __call__(self):
  157. if uwsgi.mule_id() == 0:
  158. return
  159. if not uwsgi.in_farm(self.farm):
  160. return
  161. while True:
  162. message = uwsgi.farm_get_msg()
  163. if message:
  164. self.f(message)
  165. class farm(object):
  166. def __init__(self, name=None, **kwargs):
  167. self.name = name
  168. def __call__(self, f):
  169. postfork_chain.append(farm_loop(f, self.name))
  170. class mule_brain(object):
  171. def __init__(self, f, num):
  172. self.f = f
  173. self.num = num
  174. def __call__(self):
  175. if uwsgi.mule_id() == self.num:
  176. try:
  177. self.f()
  178. except:
  179. exc = sys.exc_info()
  180. sys.excepthook(exc[0], exc[1], exc[2])
  181. sys.exit(1)
  182. class mule_brainloop(mule_brain):
  183. def __call__(self):
  184. if uwsgi.mule_id() == self.num:
  185. while True:
  186. try:
  187. self.f()
  188. except:
  189. exc = sys.exc_info()
  190. sys.excepthook(exc[0], exc[1], exc[2])
  191. sys.exit(1)
  192. class mule(object):
  193. def __init__(self, num):
  194. self.num = num
  195. def __call__(self, f):
  196. postfork_chain.append(mule_brain(f, self.num))
  197. class muleloop(mule):
  198. def __call__(self, f):
  199. postfork_chain.append(mule_brainloop(f, self.num))
  200. class mulemsg_loop(object):
  201. def __init__(self, f, num):
  202. self.f = f
  203. self.num = num
  204. def __call__(self):
  205. if uwsgi.mule_id() == self.num:
  206. while True:
  207. message = uwsgi.mule_get_msg()
  208. if message:
  209. self.f(message)
  210. class mulemsg(object):
  211. def __init__(self, num):
  212. self.num = num
  213. def __call__(self, f):
  214. postfork_chain.append(mulemsg_loop(f, self.num))
  215. class signal(object):
  216. def __init__(self, num, **kwargs):
  217. self.num = num
  218. self.target = kwargs.get('target', '')
  219. def __call__(self, f):
  220. uwsgi.register_signal(self.num, self.target, f)
  221. return f
  222. class timer(object):
  223. def __init__(self, secs, **kwargs):
  224. self.num = kwargs.get('signum', get_free_signal())
  225. self.secs = secs
  226. self.target = kwargs.get('target', '')
  227. def __call__(self, f):
  228. uwsgi.register_signal(self.num, self.target, f)
  229. uwsgi.add_timer(self.num, self.secs)
  230. return f
  231. class cron(object):
  232. def __init__(self, minute, hour, day, month, dayweek, **kwargs):
  233. self.num = kwargs.get('signum', get_free_signal())
  234. self.minute = minute
  235. self.hour = hour
  236. self.day = day
  237. self.month = month
  238. self.dayweek = dayweek
  239. self.target = kwargs.get('target', '')
  240. def __call__(self, f):
  241. uwsgi.register_signal(self.num, self.target, f)
  242. uwsgi.add_cron(self.num, self.minute, self.hour,
  243. self.day, self.month, self.dayweek)
  244. return f
  245. class rbtimer(object):
  246. def __init__(self, secs, **kwargs):
  247. self.num = kwargs.get('signum', get_free_signal())
  248. self.secs = secs
  249. self.target = kwargs.get('target', '')
  250. def __call__(self, f):
  251. uwsgi.register_signal(self.num, self.target, f)
  252. uwsgi.add_rb_timer(self.num, self.secs)
  253. return f
  254. class filemon(object):
  255. def __init__(self, fsobj, **kwargs):
  256. self.num = kwargs.get('signum', get_free_signal())
  257. self.fsobj = fsobj
  258. self.target = kwargs.get('target', '')
  259. def __call__(self, f):
  260. uwsgi.register_signal(self.num, self.target, f)
  261. uwsgi.add_file_monitor(self.num, self.fsobj)
  262. return f
  263. class erlang(object):
  264. def __init__(self, name):
  265. self.name = name
  266. def __call__(self, f):
  267. uwsgi.erlang_register_process(self.name, f)
  268. return f
  269. class lock(object):
  270. def __init__(self, f):
  271. self.f = f
  272. def __call__(self, *args, **kwargs):
  273. # ensure the spooler will not call it
  274. if uwsgi.i_am_the_spooler():
  275. return
  276. uwsgi.lock()
  277. try:
  278. return self.f(*args, **kwargs)
  279. finally:
  280. uwsgi.unlock()
  281. class thread(object):
  282. def __init__(self, f):
  283. self.f = f
  284. def __call__(self, *args):
  285. t = Thread(target=self.f, args=args)
  286. t.daemon = True
  287. t.start()
  288. return self.f
  289. class harakiri(object):
  290. def __init__(self, seconds):
  291. self.s = seconds
  292. def real_call(self, *args, **kwargs):
  293. uwsgi.set_user_harakiri(self.s)
  294. r = self.f(*args, **kwargs)
  295. uwsgi.set_user_harakiri(0)
  296. return r
  297. def __call__(self, f):
  298. self.f = f
  299. return self.real_call