@@ -71,12 +71,20 @@ def buildbot_sess(repo_cfg):
71
71
sess .get (repo_cfg ['buildbot' ]['url' ] + '/logout' , allow_redirects = False )
72
72
73
73
74
- db_query_lock = Lock ()
74
+ class LockingDatabase :
75
+ def __init__ (self , db ):
76
+ self .db = db
77
+ self .query_lock = Lock ()
78
+
79
+ def execute (self , * args ):
80
+ with self .query_lock :
81
+ return self .db .execute (* args )
75
82
83
+ def fetchone (self , * args ):
84
+ return self .db .fetchone (* args )
76
85
77
- def db_query (db , * args ):
78
- with db_query_lock :
79
- db .execute (* args )
86
+ def fetchall (self , * args ):
87
+ return self .db .fetchall (* args )
80
88
81
89
82
90
class Repository :
@@ -90,8 +98,7 @@ def __init__(self, gh, repo_label, db):
90
98
self .gh = gh
91
99
self .repo_label = repo_label
92
100
self .db = db
93
- db_query (
94
- db ,
101
+ db .execute (
95
102
'SELECT treeclosed, treeclosed_src FROM repos WHERE repo = ?' ,
96
103
[repo_label ]
97
104
)
@@ -106,14 +113,12 @@ def __init__(self, gh, repo_label, db):
106
113
def update_treeclosed (self , value , src ):
107
114
self .treeclosed = value
108
115
self .treeclosed_src = src
109
- db_query (
110
- self .db ,
116
+ self .db .execute (
111
117
'DELETE FROM repos where repo = ?' ,
112
118
[self .repo_label ]
113
119
)
114
120
if value > 0 :
115
- db_query (
116
- self .db ,
121
+ self .db .execute (
117
122
'''
118
123
INSERT INTO repos (repo, treeclosed, treeclosed_src)
119
124
VALUES (?, ?, ?)
@@ -228,16 +233,14 @@ def set_status(self, status):
228
233
self .timeout_timer .cancel ()
229
234
self .timeout_timer = None
230
235
231
- db_query (
232
- self .db ,
236
+ self .db .execute (
233
237
'UPDATE pull SET status = ? WHERE repo = ? AND num = ?' ,
234
238
[self .status , self .repo_label , self .num ]
235
239
)
236
240
237
241
# FIXME: self.try_ should also be saved in the database
238
242
if not self .try_ :
239
- db_query (
240
- self .db ,
243
+ self .db .execute (
241
244
'UPDATE pull SET merge_sha = ? WHERE repo = ? AND num = ?' ,
242
245
[self .merge_sha , self .repo_label , self .num ]
243
246
)
@@ -252,8 +255,7 @@ def set_mergeable(self, mergeable, *, cause=None, que=True):
252
255
if mergeable is not None :
253
256
self .mergeable = mergeable
254
257
255
- db_query (
256
- self .db ,
258
+ self .db .execute (
257
259
'INSERT OR REPLACE INTO mergeable (repo, num, mergeable) VALUES (?, ?, ?)' , # noqa
258
260
[self .repo_label , self .num , self .mergeable ]
259
261
)
@@ -263,8 +265,7 @@ def set_mergeable(self, mergeable, *, cause=None, que=True):
263
265
else :
264
266
self .mergeable = None
265
267
266
- db_query (
267
- self .db ,
268
+ self .db .execute (
268
269
'DELETE FROM mergeable WHERE repo = ? AND num = ?' ,
269
270
[self .repo_label , self .num ]
270
271
)
@@ -276,8 +277,7 @@ def init_build_res(self, builders, *, use_db=True):
276
277
} for x in builders }
277
278
278
279
if use_db :
279
- db_query (
280
- self .db ,
280
+ self .db .execute (
281
281
'DELETE FROM build_res WHERE repo = ? AND num = ?' ,
282
282
[self .repo_label , self .num ]
283
283
)
@@ -291,8 +291,7 @@ def set_build_res(self, builder, res, url):
291
291
'url' : url ,
292
292
}
293
293
294
- db_query (
295
- self .db ,
294
+ self .db .execute (
296
295
'INSERT OR REPLACE INTO build_res (repo, num, builder, res, url, merge_sha) VALUES (?, ?, ?, ?, ?, ?)' , # noqa
297
296
[
298
297
self .repo_label ,
@@ -318,8 +317,7 @@ def get_repo(self):
318
317
return repo
319
318
320
319
def save (self ):
321
- db_query (
322
- self .db ,
320
+ self .db .execute (
323
321
'INSERT OR REPLACE INTO pull (repo, num, status, merge_sha, title, body, head_sha, head_ref, base_ref, assignee, approved_by, priority, try_, rollup, delegate) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)' , # noqa
324
322
[
325
323
self .repo_label ,
@@ -401,13 +399,11 @@ def timed_out(self):
401
399
402
400
def record_retry_log (self , src , body ):
403
401
# destroy ancient records
404
- db_query (
405
- self .db ,
402
+ self .db .execute (
406
403
"DELETE FROM retry_log WHERE repo = ? AND time < date('now', ?)" ,
407
404
[self .repo_label , global_cfg .get ('retry_log_expire' , '-42 days' )],
408
405
)
409
- db_query (
410
- self .db ,
406
+ self .db .execute (
411
407
'INSERT INTO retry_log (repo, num, src, msg) VALUES (?, ?, ?, ?)' ,
412
408
[self .repo_label , self .num , src , body ],
413
409
)
@@ -1474,9 +1470,9 @@ def synchronize(repo_label, repo_cfg, logger, gh, states, repos, db, mergeable_q
1474
1470
1475
1471
repo = gh .repository (repo_cfg ['owner' ], repo_cfg ['name' ])
1476
1472
1477
- db_query ( db , 'DELETE FROM pull WHERE repo = ?' , [repo_label ])
1478
- db_query ( db , 'DELETE FROM build_res WHERE repo = ?' , [repo_label ])
1479
- db_query ( db , 'DELETE FROM mergeable WHERE repo = ?' , [repo_label ])
1473
+ db . execute ( 'DELETE FROM pull WHERE repo = ?' , [repo_label ])
1474
+ db . execute ( 'DELETE FROM build_res WHERE repo = ?' , [repo_label ])
1475
+ db . execute ( 'DELETE FROM mergeable WHERE repo = ?' , [repo_label ])
1480
1476
1481
1477
saved_states = {}
1482
1478
for num , state in states [repo_label ].items ():
@@ -1489,8 +1485,7 @@ def synchronize(repo_label, repo_cfg, logger, gh, states, repos, db, mergeable_q
1489
1485
repos [repo_label ] = Repository (repo , repo_label , db )
1490
1486
1491
1487
for pull in repo .iter_pulls (state = 'open' ):
1492
- db_query (
1493
- db ,
1488
+ db .execute (
1494
1489
'SELECT status FROM pull WHERE repo = ? AND num = ?' ,
1495
1490
[repo_label , pull .number ])
1496
1491
row = db .fetchone ()
@@ -1626,9 +1621,10 @@ def main():
1626
1621
db_conn = sqlite3 .connect (db_file ,
1627
1622
check_same_thread = False ,
1628
1623
isolation_level = None )
1629
- db = db_conn .cursor ()
1624
+ inner_db = db_conn .cursor ()
1625
+ db = LockingDatabase (inner_db )
1630
1626
1631
- db_query ( db , '''CREATE TABLE IF NOT EXISTS pull (
1627
+ db . execute ( '''CREATE TABLE IF NOT EXISTS pull (
1632
1628
repo TEXT NOT NULL,
1633
1629
num INTEGER NOT NULL,
1634
1630
status TEXT NOT NULL,
@@ -1647,7 +1643,7 @@ def main():
1647
1643
UNIQUE (repo, num)
1648
1644
)''' )
1649
1645
1650
- db_query ( db , '''CREATE TABLE IF NOT EXISTS build_res (
1646
+ db . execute ( '''CREATE TABLE IF NOT EXISTS build_res (
1651
1647
repo TEXT NOT NULL,
1652
1648
num INTEGER NOT NULL,
1653
1649
builder TEXT NOT NULL,
@@ -1657,36 +1653,36 @@ def main():
1657
1653
UNIQUE (repo, num, builder)
1658
1654
)''' )
1659
1655
1660
- db_query ( db , '''CREATE TABLE IF NOT EXISTS mergeable (
1656
+ db . execute ( '''CREATE TABLE IF NOT EXISTS mergeable (
1661
1657
repo TEXT NOT NULL,
1662
1658
num INTEGER NOT NULL,
1663
1659
mergeable INTEGER NOT NULL,
1664
1660
UNIQUE (repo, num)
1665
1661
)''' )
1666
- db_query ( db , '''CREATE TABLE IF NOT EXISTS repos (
1662
+ db . execute ( '''CREATE TABLE IF NOT EXISTS repos (
1667
1663
repo TEXT NOT NULL,
1668
1664
treeclosed INTEGER NOT NULL,
1669
1665
treeclosed_src TEXT,
1670
1666
UNIQUE (repo)
1671
1667
)''' )
1672
1668
1673
- db_query ( db , '''CREATE TABLE IF NOT EXISTS retry_log (
1669
+ db . execute ( '''CREATE TABLE IF NOT EXISTS retry_log (
1674
1670
repo TEXT NOT NULL,
1675
1671
num INTEGER NOT NULL,
1676
1672
time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
1677
1673
src TEXT NOT NULL,
1678
1674
msg TEXT NOT NULL
1679
1675
)''' )
1680
- db_query ( db , '''
1676
+ db . execute ( '''
1681
1677
CREATE INDEX IF NOT EXISTS retry_log_time_index ON retry_log
1682
1678
(repo, time DESC)
1683
1679
''' )
1684
1680
1685
1681
# manual DB migration :/
1686
1682
try :
1687
- db_query ( db , 'SELECT treeclosed_src FROM repos LIMIT 0' )
1683
+ db . execute ( 'SELECT treeclosed_src FROM repos LIMIT 0' )
1688
1684
except sqlite3 .OperationalError :
1689
- db_query ( db , 'ALTER TABLE repos ADD COLUMN treeclosed_src TEXT' )
1685
+ db . execute ( 'ALTER TABLE repos ADD COLUMN treeclosed_src TEXT' )
1690
1686
1691
1687
for repo_label , repo_cfg in cfg ['repo' ].items ():
1692
1688
repo_cfgs [repo_label ] = repo_cfg
@@ -1695,8 +1691,7 @@ def main():
1695
1691
repo_states = {}
1696
1692
repos [repo_label ] = Repository (None , repo_label , db )
1697
1693
1698
- db_query (
1699
- db ,
1694
+ db .execute (
1700
1695
'SELECT num, head_sha, status, title, body, head_ref, base_ref, assignee, approved_by, priority, try_, rollup, delegate, merge_sha FROM pull WHERE repo = ?' , # noqa
1701
1696
[repo_label ])
1702
1697
for num , head_sha , status , title , body , head_ref , base_ref , assignee , approved_by , priority , try_ , rollup , delegate , merge_sha in db .fetchall (): # noqa
@@ -1738,8 +1733,7 @@ def main():
1738
1733
1739
1734
states [repo_label ] = repo_states
1740
1735
1741
- db_query (
1742
- db ,
1736
+ db .execute (
1743
1737
'SELECT repo, num, builder, res, url, merge_sha FROM build_res' )
1744
1738
for repo_label , num , builder , res , url , merge_sha in db .fetchall ():
1745
1739
try :
@@ -1749,8 +1743,7 @@ def main():
1749
1743
if state .merge_sha != merge_sha :
1750
1744
raise KeyError
1751
1745
except KeyError :
1752
- db_query (
1753
- db ,
1746
+ db .execute (
1754
1747
'DELETE FROM build_res WHERE repo = ? AND num = ? AND builder = ?' , # noqa
1755
1748
[repo_label , num , builder ])
1756
1749
continue
@@ -1760,23 +1753,22 @@ def main():
1760
1753
'url' : url ,
1761
1754
}
1762
1755
1763
- db_query ( db , 'SELECT repo, num, mergeable FROM mergeable' )
1756
+ db . execute ( 'SELECT repo, num, mergeable FROM mergeable' )
1764
1757
for repo_label , num , mergeable in db .fetchall ():
1765
1758
try :
1766
1759
state = states [repo_label ][num ]
1767
1760
except KeyError :
1768
- db_query (
1769
- db ,
1761
+ db .execute (
1770
1762
'DELETE FROM mergeable WHERE repo = ? AND num = ?' ,
1771
1763
[repo_label , num ])
1772
1764
continue
1773
1765
1774
1766
state .mergeable = bool (mergeable ) if mergeable is not None else None
1775
1767
1776
- db_query ( db , 'SELECT repo FROM pull GROUP BY repo' )
1768
+ db . execute ( 'SELECT repo FROM pull GROUP BY repo' )
1777
1769
for repo_label , in db .fetchall ():
1778
1770
if repo_label not in repos :
1779
- db_query ( db , 'DELETE FROM pull WHERE repo = ?' , [repo_label ])
1771
+ db . execute ( 'DELETE FROM pull WHERE repo = ?' , [repo_label ])
1780
1772
1781
1773
queue_handler_lock = Lock ()
1782
1774
0 commit comments