root/lang/python/pySchwartz/Schwartz.py @ 18629

Revision 18629, 2.8 kB (checked in by mattn, 6 years ago)

・Schwartz.py: sleep短すぎ
・Worker.py: ゴミ取り
・example/worker.py: jobからargへ変数名変更

Line 
1#!/usr/bin/python
2
3import sys
4import time
5import simplejson
6from Job import Job
7from Worker import Worker
8
9class Schwartz(object):
10  conns = []
11  funcmap = {}
12  workermap = {}
13
14  def __init__(self, databases):
15    for database in databases:
16      m = __import__(database['driver'], globals(), locals(), [''])
17      del database['driver']
18      conn = m.connect(**database)
19      self.conns.append(conn)
20      if not self.funcmap.has_key(str(conn)):
21        self.funcmap[str(conn)] = {}
22
23  @staticmethod
24  def funcname_to_id(conns, funcmap, funcname):
25    funcid = ''
26    for conn in conns:
27      funcmap = funcmap[str(conn)]
28      if funcmap.has_key(funcname):
29        funcid = funcmap[funcname]
30      if not funcid:
31        cursor = None
32        try:
33          cursor = conn.cursor()
34          cursor.execute('select funcid, funcname from funcmap')
35          rows = cursor.fetchall()
36          for row in rows:
37            if row[1] == funcname:
38              funcid = row[0]
39              funcmap[funcname] = funcid
40              break
41          if not funcid:
42            cursor.execute("insert into funcmap (funcname) values (%s)", funcname)
43            cursor.execute('select funcid, funcname from funcmap')
44            rows = cursor.fetchall()
45            for row in rows:
46              if row[1] == funcname:
47                funcid = row[0]
48                funcmap[funcname] = funcid
49                break
50          cursor.close()
51        except:
52          pass
53        finally:
54          if cursor:
55            cursor.close()
56      if funcid:
57        break
58    return funcid
59
60  def can_do(self, funcname, worker):
61    if isinstance(worker, Worker):
62      if not self.workermap.has_key(funcname):
63        self.workermap[funcname] = []
64      self.workermap[funcname].append(worker)
65
66  def work(self):
67    while True:
68      for funcname in self.workermap:
69        funcid = Schwartz.funcname_to_id(self.conns, self.funcmap, funcname)
70        for worker in self.workermap[funcname]:
71          for conn in self.conns:
72            cursor = None
73            try:
74              cursor = conn.cursor()
75              cursor.execute('select arg from job where funcid = %s', funcid)
76              rows = cursor.fetchall()
77              for row in rows:
78                worker.work(simplejson.loads(row[0]))
79              cursor.execute('delete from job where funcid = %s', funcid)
80              cursor.close()
81              conn.commit()
82            except Exception, e:
83              print e
84            finally:
85              if cursor:
86                cursor.close()
87              conn.rollback()
88      time.sleep(3)
89      print "do working..."
90
91  def __del__(self):
92    for conn in self.conns:
93      try:
94        conn.close()
95      except:
96        pass
97
Note: See TracBrowser for help on using the browser.