1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 observer.py - Classes to observe the data store
18 """
19
20 from threading import Timer
21 from storm.locals import *
22 from datetime import datetime
23
24 from run import Punch, Run
25 from runner import Team
26
28 """Observes an Event for new data (e.g. new punches).
29 @warn: This will rollback the store every <intervall> seconds!
30 """
31
35
48
49
60
67
70
73
74 _tables = [(Punch, _punch_notify),
75 ]
76
77 - def __init__(self, store, interval = 5, rollback = True):
78 """
79 @param interval: Check interval
80 @param rollback: Rollback store before checking for new objects?
81 This is necessary to get new objects added by other
82 connections but it resets all uncommited changes.
83 """
84
85 self._interval = interval
86
87 self._store = store
88 self._rollback = rollback
89
90 self._registry = {}
91
92 self._last = {}
93 for t in EventObserver._tables:
94 self._last[t[0]] = self._get_max(t[0])
95
96 self._running = False
97
99 if self._running == True:
100 t = Timer(self._interval, self.observe)
101 t.start()
102
104 self._running = False
105
107 """Register an object to be notified of changes of type event.
108
109 @param obj: The object to receive to notification. obj must have a
110 update(self, event) method.
111 @param observable: object that should be observed for changes. Currently
112 run, runner and team are supported
113 """
114 if not observable in self._registry:
115 self._registry[observable] = []
116 self._registry[observable].append(obj)
117 if self._running == False:
118 self._running = True
119 self._start_timer()
120
122 """Unregister an object from this observer for this observable."""
123 self._registry[observable].remove(obj)
124 if len(self._registry[observable]) == 0:
125 del(self._registry[observable])
126 if len(self._registry) == 0:
127 self._running = False
128
130 """Does the actual observation."""
131
132
133 if self._rollback:
134 self._store.rollback()
135
136 for t in EventObserver._tables:
137 last = self._get_max(t[0])
138 if self._last[t[0]] != last:
139 for i in range(self._last[t[0]]+1, last+1):
140 obj = self._store.get(t[0], i)
141 if not obj is None:
142 t[1](self, obj)
143 self._last[t[0]] = last
144
145
146 self._start_timer()
147
149 return self._store.execute(Select(Max(t.id))).get_one()[0] or 0
150
152 """Notifies objects of an event."""
153 try:
154 for obj in self._registry[observable]:
155 obj.update(observable)
156 except KeyError:
157 pass
158
159
161 """Observes an Event for new data (e.g. new punches).
162 This implementation uses triggers and a log table to
163 also get notified of changed values.
164 @warn: This will rollback the store every <intervall> seconds!
165 """
166
167 _tables = {Punch : EventObserver._punch_notify,
168 Run : EventObserver._run_notify,
169 Team : EventObserver._team_notify,
170 }
171
172 - def __init__(self, store, interval = 5, rollback = True):
173 """
174 @param interval: Check interval
175 @param rollback: Rollback store before checking for new objects?
176 This is necessary to get new objects added by other
177 connections but it resets all uncommited changes.
178 """
179 EventObserver.__init__(self, store, interval, rollback)
180 self._last = datetime.utcnow()
181
183 """Does the actual observation."""
184
185 try:
186
187 if self._rollback:
188 self._store.rollback()
189
190 changed = self._store.execute("""SELECT object_type, change_time, row
191 FROM log
192 WHERE change_time > %s
193 ORDER BY change_time""",
194 params = [self._last])
195
196 for row in changed:
197 for obj_type in self._tables.keys():
198 if row[0] == obj_type.__storm_table__:
199 obj = self._store.get(obj_type, row[2])
200 if obj is not None:
201 self._tables[obj_type](self, obj)
202
203
204 try:
205 self._last = row[1]
206 except UnboundLocalError:
207 pass
208
209 finally:
210
211 self._start_timer()
212