跳到主要内容

StreamFinalizer

私有上下文源码

with StreamFinalizer(
fobj: IO[Any]@StreamFinalizer,
truncate: bool = False,
close: bool = False,
callback_on_exit: Callable[[_IO], None] | None = None
) as _fobj:
_fobj: IO[Any]@StreamFinalizer

当流成功推送时,针对流IO调用的终结器。

ServiceData透过调用该类、以在提供文件流服务时安全维护文件对象。

参数

参数类型必选
说明
fobjIO[Any]退出终结器环境时,要管理的类文件对象。
truncatebool二值量。启用时,会在离开上下文的时候将文件截断为空。IO流为只读时,不应启用该值。
closebool二值量。启用时,会在离开上下文时关闭文件。
callback_on_exitCallable[[_IO], None] | None离开上下文时调用的回调函数。该函数会在文件被截断或关闭之前调用。

范例

离开上下文时,自动关闭文件

import os
from dash_file_cache.utilities import StreamFinalizer, TempDir


temp_dir = TempDir()
fobj = open(os.path.join(temp_dir.path, "test.txt"), "w")
with StreamFinalizer(fobj, close=True) as _fobj:
_fobj.write("test")

print(fobj.closed)

离开上下文时,自动截断文件为空

import os
from dash_file_cache.utilities import StreamFinalizer, TempDir


temp_dir = TempDir()
fobj = open(os.path.join(temp_dir.path, "test.txt"), "w")
with StreamFinalizer(fobj, truncate=True) as _fobj:
_fobj.write("test")

print(fobj.closed)
if not fobj.closed:
fobj.seek(0, os.SEEK_END)
print(fobj.tell())
fobj.close()