Package bosco :: Module observer
[hide private]
[frames] | no frames]

Source Code for Module bosco.observer

  1  # 
  2  #    Copyright (C) 2008  Gaudenz Steinlin <gaudenz@soziologie.ch> 
  3  # 
  4  #    This program is free software: you can redistribute it and/or modify 
  5  #    it under the terms of the GNU General Public License as published by 
  6  #    the Free Software Foundation, either version 3 of the License, or 
  7  #    (at your option) any later version. 
  8  # 
  9  #    This program is distributed in the hope that it will be useful, 
 10  #    but WITHOUT ANY WARRANTY; without even the implied warranty of 
 11  #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 12  #    GNU General Public License for more details. 
 13  # 
 14  #    You should have received a copy of the GNU General Public License 
 15  #    along with this program.  If not, see <http://www.gnu.org/licenses/>. 
 16  """ 
 17  observer.py - Classes to observe the data store 
 18  """ 
 19   
 20  from threading import Timer 
 21  from storm.locals import * 
 22  from datetime import datetime 
 23   
 24  from run import Punch, Run 
 25  from runner import Team 
 26   
27 -class EventObserver(object):
28 """Observes an Event for new data (e.g. new punches). 29 @warn: This will rollback the store every <intervall> seconds! 30 """ 31
32 - def _punch_notify(self, punch):
33 34 self._run_notify(punch.run)
35
36 - def _run_notify(self, run):
37 38 # send notification for run 39 self._notify(run) 40 41 # send for runner 42 if run.sicard.runner is not None: 43 self._runner_notify(run.sicard.runner) 44 45 # send notifications for course 46 if run.course is not None: 47 self._course_notify(run.course)
48 49
50 - def _runner_notify(self, runner):
51 self._notify(runner) 52 53 # send for team 54 if runner.team is not None: 55 self._team_notify(runner.team) 56 57 # send notifications for category 58 if runner.category is not None: 59 self._category_notify(runner.category)
60
61 - def _team_notify(self, team):
62 self._notify(team) 63 64 # send for category 65 if team.category is not None: 66 self._category_notify(team.category)
67
68 - def _category_notify(self, category):
69 self._notify(category)
70
71 - def _course_notify(self, course):
72 self._notify(course)
73 74 _tables = [(Punch, _punch_notify), 75 ] 76
77 - def __init__(self, store, interval = 5, rollback = True):
78 """ 79 @param interval: Check interval 80 @param rollback: Rollback store before checking for new objects? 81 This is necessary to get new objects added by other 82 connections but it resets all uncommited changes. 83 """ 84 85 self._interval = interval 86 87 self._store = store 88 self._rollback = rollback 89 90 self._registry = {} 91 92 self._last = {} 93 for t in EventObserver._tables: 94 self._last[t[0]] = self._get_max(t[0]) 95 96 self._running = False
97
98 - def _start_timer(self):
99 if self._running == True: 100 t = Timer(self._interval, self.observe) 101 t.start()
102
103 - def stop(self):
104 self._running = False
105
106 - def register(self, obj, observable):
107 """Register an object to be notified of changes of type event. 108 109 @param obj: The object to receive to notification. obj must have a 110 update(self, event) method. 111 @param observable: object that should be observed for changes. Currently 112 run, runner and team are supported 113 """ 114 if not observable in self._registry: 115 self._registry[observable] = [] 116 self._registry[observable].append(obj) 117 if self._running == False: 118 self._running = True 119 self._start_timer()
120
121 - def unregister(self, obj, observable):
122 """Unregister an object from this observer for this observable.""" 123 self._registry[observable].remove(obj) 124 if len(self._registry[observable]) == 0: 125 del(self._registry[observable]) 126 if len(self._registry) == 0: 127 self._running = False
128
129 - def observe(self):
130 """Does the actual observation.""" 131 132 # rollback store to end transaction 133 if self._rollback: 134 self._store.rollback() 135 136 for t in EventObserver._tables: 137 last = self._get_max(t[0]) 138 if self._last[t[0]] != last: 139 for i in range(self._last[t[0]]+1, last+1): 140 obj = self._store.get(t[0], i) 141 if not obj is None: 142 t[1](self, obj) 143 self._last[t[0]] = last 144 145 # start new timer 146 self._start_timer()
147
148 - def _get_max(self, t):
149 return self._store.execute(Select(Max(t.id))).get_one()[0] or 0
150
151 - def _notify(self, observable):
152 """Notifies objects of an event.""" 153 try: 154 for obj in self._registry[observable]: 155 obj.update(observable) 156 except KeyError: 157 pass
158 159
160 -class TriggerEventObserver(EventObserver):
161 """Observes an Event for new data (e.g. new punches). 162 This implementation uses triggers and a log table to 163 also get notified of changed values. 164 @warn: This will rollback the store every <intervall> seconds! 165 """ 166 167 _tables = {Punch : EventObserver._punch_notify, 168 Run : EventObserver._run_notify, 169 Team : EventObserver._team_notify, 170 } 171
172 - def __init__(self, store, interval = 5, rollback = True):
173 """ 174 @param interval: Check interval 175 @param rollback: Rollback store before checking for new objects? 176 This is necessary to get new objects added by other 177 connections but it resets all uncommited changes. 178 """ 179 EventObserver.__init__(self, store, interval, rollback) 180 self._last = datetime.utcnow()
181
182 - def observe(self):
183 """Does the actual observation.""" 184 185 try: 186 # rollback store to end transaction 187 if self._rollback: 188 self._store.rollback() 189 190 changed = self._store.execute("""SELECT object_type, change_time, row 191 FROM log 192 WHERE change_time > %s 193 ORDER BY change_time""", 194 params = [self._last]) 195 196 for row in changed: 197 for obj_type in self._tables.keys(): 198 if row[0] == obj_type.__storm_table__: 199 obj = self._store.get(obj_type, row[2]) 200 if obj is not None: 201 self._tables[obj_type](self, obj) 202 203 # obj is now the last object 204 try: 205 self._last = row[1] 206 except UnboundLocalError: 207 pass 208 209 finally: 210 # start new timer, even if an exception occurs 211 self._start_timer()
212