StreamFinalizer
类私有上下文源码
with StreamFinalizer(
fobj: IO[Any]@StreamFinalizer,
truncate: bool = False,
close: bool = False,
callback_on_exit: Callable[[_IO], None] | None = None
) as _fobj:
_fobj: IO[Any]@StreamFinalizer
当流成功推送时,针对流IO调用的终结器。
ServiceData
透过调用该类、以在提供文件流服务时安全维护文件对象。
参数
参数 | 类型 | 必选 | |
---|---|---|---|
fobj | IO[Any] | 退出终结器环境时,要管理的类文件对象。 | |
truncate | bool | 二值量。启用时,会在离开上下文的时候将文件截断为空。IO流为只读时,不应启用该值。 | |
close | bool | 二值量。启用时,会在离开上下文时关闭文件。 | |
callback_on_exit | Callable[[_IO], None] | None | 离开上下文时调用的回调函数。该函数会在文件被截断或关闭之前调用。 |
范例
离开上下文时,自动关闭文件
- 代码
- 结果
import os
from dash_file_cache.utilities import StreamFinalizer, TempDir
temp_dir = TempDir()
fobj = open(os.path.join(temp_dir.path, "test.txt"), "w")
with StreamFinalizer(fobj, close=True) as _fobj:
_fobj.write("test")
print(fobj.closed)
True
离开上下文时,自动截断文件为空
- 代码
- 结果
import os
from dash_file_cache.utilities import StreamFinalizer, TempDir
temp_dir = TempDir()
fobj = open(os.path.join(temp_dir.path, "test.txt"), "w")
with StreamFinalizer(fobj, truncate=True) as _fobj:
_fobj.write("test")
print(fobj.closed)
if not fobj.closed:
fobj.seek(0, os.SEEK_END)
print(fobj.tell())
fobj.close()
False
0