Package turbomail :: Package managers :: Module demand
[hide private]
[frames] | no frames]

Source Code for Module turbomail.managers.demand

  1  # encoding: utf-8 
  2   
  3  """On-demand threaded queue manager. 
  4   
  5  Worker threads are spawned based on demand at the time a message is added to the queue.""" 
  6   
  7   
  8  import copy 
  9  import logging 
 10  import math 
 11   
 12  from Queue import Queue, Empty 
 13  from threading import Event, Thread 
 14   
 15  from turbomail.api import Manager 
 16  from turbomail.exceptions import TransportExhaustedException 
 17  from turbomail.control import interface 
 18   
 19   
 20  __all__ = ['load'] 
 21   
 22  log = logging.getLogger("turbomail.manager") 
 23   
 24   
 25   
26 -def load():
27 return DemandManager()
28 29
30 -class DemandManager(Manager):
31 name = "demand" 32
33 - def __init__(self):
34 log.info("Demand manager starting up.") 35 super(DemandManager, self).__init__() 36 37 self.pool = 0 38 self.queue = Queue() 39 self.finished = Event() 40 41 self.threads = interface.config.get("mail.demand.threads", 4) # Maximum number of threads to create. 42 self.divisor = interface.config.get("mail.demand.divisor", 10) # Estimate the number of required threads by dividing the queue size by this. 43 self.timeout = interface.config.get("mail.demand.timeout", 60) 44 45 log.info("Demand manager ready.")
46
47 - def stop(self):
48 log.info("Demand manager shutting down.") 49 self.finished.set()
50
51 - def spawn(self):
52 thread = Thread(target=self.wrapper) 53 thread.start() 54 self.pool += 1
55
56 - def deliver(self, message):
57 log.info("Adding message %s to the queue for background delivery." % message.id) 58 self.queue.put(copy.deepcopy(message)) 59 message._processed = True 60 message._dirty = True 61 62 if not self.queue.empty() and self.pool < self.optimum: 63 tospawn = int(self.optimum - self.pool) 64 log.debug("Spawning %d thread%s." % (tospawn, tospawn != 1 and "s" or "")) 65 for i in range(tospawn): 66 self.spawn() 67 68 return True
69
70 - def wrapper(self):
71 log.debug("Mail queue worker starting up.") 72 73 try: 74 self.worker() 75 except: 76 log.exception("Internal error in worker thread.") 77 78 self.pool -= 1 79 log.debug("Mail queue worker finished.")
80
81 - def worker(self):
82 log.debug("Requesting new transport instance from.") 83 transport = self.get_new_transport() 84 85 while True: 86 try: 87 message = self.queue.get(True, self.timeout) 88 transport.deliver(message) 89 90 except Empty: 91 log.debug("Worker death from starvation.") 92 break 93 94 except TransportExhaustedException: 95 log.debug("Worker death from transport exhaustion - spawning child.") 96 self.deliver(message) 97 self.spawn() 98 break 99 100 except: 101 log.exception("Delivery of message %s failed." % message.id) 102 break 103 104 else: 105 log.info("Delivery of message %s successful or deferred." % message.id) 106 transport.stop()
107
108 - def optimum(self):
109 return min(self.threads, math.ceil(self.queue.qsize() / float(self.divisor)))
110 111 optimum = property(optimum)
112