1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
|
import unittest
import sys
import gc
import time
import weakref
import threading
import greenlet
class TestLeaks(unittest.TestCase):
def test_arg_refs(self):
args = ('a', 'b', 'c')
refcount_before = sys.getrefcount(args)
# pylint:disable=unnecessary-lambda
g = greenlet.greenlet(
lambda *args: greenlet.getcurrent().parent.switch(*args))
for _ in range(100):
g.switch(*args)
self.assertEqual(sys.getrefcount(args), refcount_before)
def test_kwarg_refs(self):
kwargs = {}
# pylint:disable=unnecessary-lambda
g = greenlet.greenlet(
lambda **kwargs: greenlet.getcurrent().parent.switch(**kwargs))
for _ in range(100):
g.switch(**kwargs)
self.assertEqual(sys.getrefcount(kwargs), 2)
assert greenlet.GREENLET_USE_GC # Option to disable this was removed in 1.0
def recycle_threads(self):
# By introducing a thread that does sleep we allow other threads,
# that have triggered their __block condition, but did not have a
# chance to deallocate their thread state yet, to finally do so.
# The way it works is by requiring a GIL switch (different thread),
# which does a GIL release (sleep), which might do a GIL switch
# to finished threads and allow them to clean up.
def worker():
time.sleep(0.001)
t = threading.Thread(target=worker)
t.start()
time.sleep(0.001)
t.join()
def test_threaded_leak(self):
gg = []
def worker():
# only main greenlet present
gg.append(weakref.ref(greenlet.getcurrent()))
for _ in range(2):
t = threading.Thread(target=worker)
t.start()
t.join()
del t
greenlet.getcurrent() # update ts_current
self.recycle_threads()
greenlet.getcurrent() # update ts_current
gc.collect()
greenlet.getcurrent() # update ts_current
for g in gg:
self.assertIsNone(g())
def test_threaded_adv_leak(self):
gg = []
def worker():
# main and additional *finished* greenlets
ll = greenlet.getcurrent().ll = []
def additional():
ll.append(greenlet.getcurrent())
for _ in range(2):
greenlet.greenlet(additional).switch()
gg.append(weakref.ref(greenlet.getcurrent()))
for _ in range(2):
t = threading.Thread(target=worker)
t.start()
t.join()
del t
greenlet.getcurrent() # update ts_current
self.recycle_threads()
greenlet.getcurrent() # update ts_current
gc.collect()
greenlet.getcurrent() # update ts_current
for g in gg:
self.assertIsNone(g())
def test_issue251_killing_cross_thread_leaks_list(self, manually_collect_background=True):
# See https://github.com/python-greenlet/greenlet/issues/251
# Killing a greenlet (probably not the main one)
# in one thread from another thread would
# result in leaking a list (the ts_delkey list).
# For the test to be valid, even empty lists have to be tracked by the
# GC
assert gc.is_tracked([])
def count_objects(kind=list):
# pylint:disable=unidiomatic-typecheck
# Collect the garbage.
for _ in range(3):
gc.collect()
gc.collect()
return sum(
1
for x in gc.get_objects()
if type(x) is kind
)
# XXX: The main greenlet of a dead thread is only released
# when one of the proper greenlet APIs is used from a different
# running thread. See #252 (https://github.com/python-greenlet/greenlet/issues/252)
greenlet.getcurrent()
greenlets_before = count_objects(greenlet.greenlet)
background_glet_running = threading.Event()
background_glet_killed = threading.Event()
background_greenlets = []
def background_greenlet():
# Throw control back to the main greenlet.
greenlet.getcurrent().parent.switch()
def background_thread():
glet = greenlet.greenlet(background_greenlet)
background_greenlets.append(glet)
glet.switch() # Be sure it's active.
# Control is ours again.
del glet # Delete one reference from the thread it runs in.
background_glet_running.set()
background_glet_killed.wait()
# To trigger the background collection of the dead
# greenlet, thus clearing out the contents of the list, we
# need to run some APIs. See issue 252.
if manually_collect_background:
greenlet.getcurrent()
t = threading.Thread(target=background_thread)
t.start()
background_glet_running.wait()
lists_before = count_objects()
assert len(background_greenlets) == 1
self.assertFalse(background_greenlets[0].dead)
# Delete the last reference to the background greenlet
# from a different thread. This puts it in the background thread's
# ts_delkey list.
del background_greenlets[:]
background_glet_killed.set()
# Now wait for the background thread to die.
t.join(10)
del t
# Free the background main greenlet by forcing greenlet to notice a difference.
greenlet.getcurrent()
greenlets_after = count_objects(greenlet.greenlet)
lists_after = count_objects()
# On 2.7, we observe that lists_after is smaller than
# lists_before. No idea what lists got cleaned up. All the
# Python 3 versions match exactly.
self.assertLessEqual(lists_after, lists_before)
self.assertEqual(greenlets_before, greenlets_after)
@unittest.expectedFailure
def test_issue251_issue252_need_to_collect_in_background(self):
# This still fails because the leak of the list
# still exists when we don't call a greenlet API before exiting the
# thread. The proximate cause is that neither of the two greenlets
# from the background thread are actually being destroyed, even though
# the GC is in fact visiting both objects.
# It's not clear where that leak is? For some reason the thread-local dict
# holding it isn't being cleaned up.
self.test_issue251_killing_cross_thread_leaks_list(manually_collect_background=False)
|