-
-
Notifications
You must be signed in to change notification settings - Fork 133
Open
Description
In my application I would like to process specific files when they are newly created. However, when monitoring 'added' events the file processing often occurs too early, because the writing operation is too slow (satellite transmissions in my case). Ideally, the file processing should be triggered when the file is closed, but as far as I know watchfiles does not provide support for this. The interval between successive writes is undefined, so I cannot really rely on waiting for some time after the last change event. For now I solve it by calling lsof on the file to find out if there are any open handles, but this approach is not desirable. Ideally I would do my logic using Change.closed.
The code below simulates the problem:
import asyncio
import tempfile
import os
from watchfiles import awatch, Change
FN = 'test.txt'
async def slow_write(directory_name):
# wait before opening so we detect and add event.
await asyncio.sleep(1)
path = os.path.join(directory_name, FN)
with open(path, 'w') as fp:
for i in range(10):
fp.write("Hello world.\n")
await asyncio.sleep(0.5) # simulate slow writing process.
await asyncio.sleep(1) # give monitor time to process.
async def monitor(directory_name):
async for changes in awatch(directory_name):
for change, path in changes:import asyncio
import tempfile
import os
from watchfiles import awatch, Change
FN = 'test.txt'
async def slow_write(directory_name):
# wait before opening so we detect and add event.
await asyncio.sleep(1)
path = os.path.join(directory_name, FN)
with open(path, 'w') as fp:
for i in range(10):
fp.write("Hello world.\n")
await asyncio.sleep(0.5) # simulate slow writing process.
await asyncio.sleep(1) # give monitor time to process.
async def monitor(directory_name):
async for changes in awatch(directory_name):
for change, path in changes:
print(change, path)
if(change==Change.added):
print(f"Want to process {path}, but file is still being written to.")
async def run():
with tempfile.TemporaryDirectory() as tmpdirname:
write_task = asyncio.create_task(slow_write(tmpdirname))
monitor_task = asyncio.create_task(monitor(tmpdirname))
fs = set([write_task, monitor_task])
done, pending = await asyncio.wait(fs,
return_when=asyncio.FIRST_COMPLETED)
for _t in pending:
_t.cancel()
await asyncio.sleep(0.1)
asyncio.run(run())
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels