Skip to content

Commit c653905

Browse files
main loop: add a "log_tracemalloc" plugin to track memory usage
1 parent bf39edf commit c653905

File tree

3 files changed

+173
-0
lines changed

3 files changed

+173
-0
lines changed
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
2+
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
3+
#
4+
# This program is free software: you can redistribute it and/or modify
5+
# it under the terms of the GNU General Public License as published by
6+
# the Free Software Foundation, either version 3 of the License, or
7+
# (at your option) any later version.
8+
#
9+
# This program is distributed in the hope that it will be useful,
10+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
# GNU General Public License for more details.
13+
#
14+
# You should have received a copy of the GNU General Public License
15+
# along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
17+
"""Profile Cylc with tracemalloc.
18+
19+
This takes tracemalloc snapshots periodically.
20+
21+
Snapshots are written into "~/cylc-run/<workflow>/tracemalloc/", to load them
22+
for analysis, run:
23+
24+
tracemalloc.Snapshot.load('.../path/to/x.tracemalloc')
25+
26+
The memory diffs are written to stdout.
27+
"""
28+
29+
from pathlib import Path
30+
import tracemalloc
31+
32+
from cylc.flow import LOG
33+
from cylc.flow.main_loop import periodic, shutdown, startup
34+
35+
36+
@startup
37+
async def init(scheduler, state):
38+
"""Create the state object on startup."""
39+
tracemalloc.start()
40+
state['out_dir'] = Path(scheduler.workflow_run_dir, 'tracemalloc')
41+
state['out_dir'].mkdir()
42+
logfile = state['out_dir'] / 'log'
43+
state['log'] = logfile.open('w+')
44+
state['itt'] = 0
45+
LOG.warning(f'Writing tracemalloc output to {logfile}')
46+
47+
48+
@periodic
49+
async def take_snapshot(scheduler, state, diff_filter='cylc/', max_length=20):
50+
"""Take a memory snapshot and compare it to the previous one.
51+
52+
Args:
53+
scheduler:
54+
Unused in this plugin.
55+
state:
56+
The state object initialised in "init".
57+
diff_filter:
58+
If supplied, only changes containing this string will be displayed.
59+
Used to restrict reporting to items which contain Cylc file paths.
60+
max_length:
61+
The top "max_length" items will be displayed with each summary.
62+
63+
"""
64+
# take a snapshot
65+
new = tracemalloc.take_snapshot()
66+
67+
# dump the snapshot to the filesystem
68+
new.dump(state['out_dir'] / f'{state["itt"]}.tracemalloc')
69+
70+
# compare this snapshot to the previous one
71+
if state.get('prev'):
72+
# generate a list of the things which have changed
73+
cmp = [
74+
item
75+
for item in new.compare_to(state['prev'], 'lineno')
76+
# filter for the libraries we are interested in
77+
if not diff_filter or diff_filter in str(item)
78+
]
79+
80+
# print a summary of the memory change
81+
print('+/-', sum(stat.size_diff for stat in cmp), file=state['log'])
82+
83+
# report the individual changes
84+
for stat in sorted(cmp, key=lambda x: x.size_diff, reverse=True)[
85+
:max_length
86+
]:
87+
if stat.size_diff != 0:
88+
print(f' {stat}', file=state['log'])
89+
print('', file=state['log'])
90+
91+
state['prev'] = new
92+
state['itt'] += 1
93+
state['log'].flush()
94+
95+
96+
@shutdown
97+
async def close_log(scheduler, state):
98+
"""Close the log file on shutdown."""
99+
state['log'].close()

setup.cfg

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ cylc.main_loop =
198198
log_main_loop = cylc.flow.main_loop.log_main_loop [main_loop-log_main_loop]
199199
log_memory = cylc.flow.main_loop.log_memory [main_loop-log_memory]
200200
reset_bad_hosts = cylc.flow.main_loop.reset_bad_hosts
201+
log_tracemalloc = cylc.flow.main_loop.log_tracemalloc
201202
# NOTE: all entry points should be listed here even if Cylc Flow does not
202203
# provide any implementations, to make entry point scraping easier
203204
cylc.pre_configure =
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
2+
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
3+
#
4+
# This program is free software: you can redistribute it and/or modify
5+
# it under the terms of the GNU General Public License as published by
6+
# the Free Software Foundation, either version 3 of the License, or
7+
# (at your option) any later version.
8+
#
9+
# This program is distributed in the hope that it will be useful,
10+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
# GNU General Public License for more details.
13+
#
14+
# You should have received a copy of the GNU General Public License
15+
# along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
17+
from types import SimpleNamespace
18+
from cylc.flow.main_loop.log_tracemalloc import take_snapshot, init, close_log
19+
20+
import pytest
21+
22+
23+
# find the number of the MARKER line in this file
24+
MARKER_LINE = None
25+
with open(__file__, 'r') as this_file:
26+
for line_number, line in enumerate(this_file):
27+
print('$', line.strip())
28+
print('#', line[-7:].strip())
29+
if line[-7:] == 'MARKER\n':
30+
MARKER_LINE = line_number + 1
31+
break
32+
33+
34+
@pytest.fixture
35+
async def state(tmp_path):
36+
"""A clean state object for this plugin."""
37+
state = {}
38+
await init(SimpleNamespace(workflow_run_dir=tmp_path), state)
39+
return state
40+
41+
42+
async def test_tracemalloc(tmp_path, state):
43+
"""Test the tracemalloc plugin functionality."""
44+
out_dir = tmp_path / 'tracemalloc'
45+
46+
# test the empty state object
47+
assert state['itt'] == 0
48+
assert len(list(out_dir.iterdir())) == 1 # the tracemalloc folder
49+
assert state['log'].closed is False # the log file is open
50+
51+
# take a snapshot
52+
await take_snapshot(None, state, diff_filter=None)
53+
assert state['itt'] == 1, 'the iteration has been incremented'
54+
assert len(list(out_dir.iterdir())) == 2, 'dump file has been written'
55+
56+
# allocate some memory
57+
_memory = [x for x in range(100)] # MARKER
58+
59+
# take another snapshot
60+
await take_snapshot(None, state, diff_filter=None)
61+
assert state['itt'] == 2, 'the iteration has been incremented'
62+
assert len(list(out_dir.iterdir())) == 3, 'dump file has been written'
63+
64+
# close the log file
65+
await close_log(None, state)
66+
assert state['log'].closed is True, 'log file has been closed'
67+
68+
# ensure the allocated memory appears in the log file
69+
with open(out_dir / 'log', 'r') as tracemalloc_file:
70+
tracemalloc_log = tracemalloc_file.read()
71+
assert f'{__file__}:{MARKER_LINE}' in tracemalloc_log
72+
73+
del _memory # make linters happy

0 commit comments

Comments
 (0)