Source code for libertem.io.dataset.base.backend_buffered

import os
import io
from typing import Type

import numpy as np
import numba
from numba.typed import Dict
import contextlib

from libertem.common.math import prod
from libertem.io.dataset.base.backend import IOBackend, IOBackendImpl
from libertem.io.dataset.base.fileset import FileSet
from libertem.common import Shape, Slice
from libertem.common.buffers import BufferPool, ManagedBuffer
from libertem.common.numba import cached_njit
from .tiling import DataTile
from .decode import DtypeConversionDecoder

_r_n_d_cache = {}


def _make_buffered_reader_and_decoder(decode):
    """
    decode: from buffers, in bytes, possibly interpreted as native_dtype, to out_decoded.dtype
    """
    @cached_njit(boundscheck=False, nogil=True)
    def _buffered_tilereader(outer_idx, buffers, sig_dims, tile_read_ranges,
                           out_decoded, native_dtype, do_zero,
                           origin, shape, ds_shape, offsets):
        if do_zero:
            out_decoded[:] = 0
        for rr_idx in range(tile_read_ranges.shape[0]):
            rr = tile_read_ranges[rr_idx]
            if rr[1] == rr[2] == 0:
                break
            buf = buffers[rr[0]]
            offset = offsets[rr[0]]  # per-file offset of start of read buffer
            decode(
                inp=buf[rr[1] - offset:rr[2] - offset],
                out=out_decoded,
                idx=rr_idx,
                native_dtype=native_dtype,
                rr=rr,
                origin=origin,
                shape=shape,
                ds_shape=ds_shape,
            )
        return out_decoded
    return _buffered_tilereader


@numba.njit(cache=True, nogil=True)
def block_get_min_fill_factor(rrs):
    """
    Try to find out how sparse the given read ranges are, per file.

    Returns the smallest fill factor and maximum required buffer size.
    """
    # FIXME: replace with fixed arrays? If this fn stands out on a profile...
    min_per_file = {}
    max_per_file = {}
    payload_bytes = {}
    for tile_idx in range(rrs.shape[0]):
        for part_idx in range(rrs.shape[1]):
            part_rr = rrs[tile_idx, part_idx]
            fileno = part_rr[0]

            if part_rr[2] == part_rr[1] == 0:
                continue

            if fileno not in min_per_file:
                min_per_file[fileno] = part_rr[1]
            else:
                min_per_file[fileno] = min(part_rr[1], min_per_file[fileno])

            if fileno not in max_per_file:
                max_per_file[fileno] = part_rr[2]
            else:
                max_per_file[fileno] = max(part_rr[2], max_per_file[fileno])

            if fileno not in payload_bytes:
                payload_bytes[fileno] = 0
            payload_bytes[fileno] += part_rr[2] - part_rr[1]

    min_fill_factor = -1
    max_buffer_size = 0

    for k in min_per_file:
        read_size = max_per_file[k] - min_per_file[k]
        fill_factor = payload_bytes[k] / (read_size)
        # print(k, fill_factor, max_per_file[k], min_per_file[k], payload_bytes[k])
        if fill_factor < min_fill_factor or min_fill_factor == -1:
            min_fill_factor = fill_factor
        if read_size > max_buffer_size:
            max_buffer_size = read_size
    return min_fill_factor, max_buffer_size, min_per_file, max_per_file


class BufferedFile:
    def __init__(self, path, desc):
        self.path = path
        self.desc = desc
        self._handle = None
        self._arr = None

    def get_blocksize(self):
        """
        Return the block size to which reads should be aligned to. In case of normal
        buffered I/O, this can be 1
        """
        return 1

    def open(self):
        # disable internal I/O buffering, as we want to read directly
        # into our own buffer here:
        self._handle = open(self.path, "rb", buffering=0)
        return self

    def close(self):
        self._handle.close()
        self._handle = None

    @property
    def handle(self):
        if self._handle is None:
            raise RuntimeError("trying to access file handle of closed file")
        return self._handle

    def seek(self, offset):
        self._handle.seek(offset)

    def readinto(self, buf):
        BLOCKSIZE = self.get_blocksize()
        buf_orig = buf
        buf = memoryview(buf)
        to_read = len(buf)
        offset = 0
        last_to_read = to_read
        # `readinto` may return early, so we may need to re-run it:
        while to_read > 0:
            # Make sure reads are aligned to the blocksize,
            # after an early return of `readinto` to allow Direct I/O on Windows.
            # On Linux, a misaligned read which turns out to be at the end of the
            # file doesn't cause an error
            blockcount, remainder = divmod(offset, BLOCKSIZE)
            to_read += remainder
            offset = blockcount * BLOCKSIZE
            if remainder > 0:
                self._handle.seek(-remainder, io.SEEK_CUR)
            bytes_read = self._handle.readinto(buf[offset:])
            if bytes_read == 0:
                break
            to_read -= bytes_read
            offset += bytes_read
            # The block aligning code would otherwise
            # try to read a "tail" in an infinite loop if the file size
            # is not a multiple of BLOCKSIZE
            if to_read == last_to_read:
                break
            last_to_read = to_read
        return buf_orig[:offset]


class DirectBufferedFile(BufferedFile):
    def get_blocksize(self):
        """
        Return the block size to which reads should be aligned to. On Linux,
        this should be a multiple of the logical block size of the file system.
        """
        # NOTE: if required, this can be replaced with the appropriate syscall
        # to determine the block size. On Linux, this would be BLKBSZGET.
        return 4096

    def open(self):
        # disable internal I/O buffering, as we want to read directly
        # into our own buffer here:
        if hasattr(os, 'O_DIRECT'):
            self._fh = os.open(self.path, os.O_RDONLY | os.O_DIRECT)
            self._handle = open(self._fh, "rb", buffering=0)
        elif os.name == 'nt':
            import win32file
            import msvcrt
            # Make sure to keep a reference, otherwise
            # it will be closed
            self._fh = win32file.CreateFile(
                self.path,  # fileName
                win32file.GENERIC_READ,  # desiredAccess
                win32file.FILE_SHARE_READ
                | win32file.FILE_SHARE_WRITE
                | win32file.FILE_SHARE_DELETE,  # shareMode
                None,  # attributes
                win32file.OPEN_EXISTING,  # CreationDisposition
                # O_DIRECT equivalent (?)
                win32file.FILE_FLAG_NO_BUFFERING,  # flagsAndAttributes
                0,  # hTemplateFile
            )
            fd = msvcrt.open_osfhandle(int(self._fh), os.O_RDONLY)
            self._handle = os.fdopen(fd, "rb", buffering=0)
        else:
            raise RuntimeError("Direct I/O not supported on this platform.")
        return self

    def close(self):
        super().close()
        self._fh = None


[docs]class BufferedBackend(IOBackend, id_="buffered"): """ I/O backend using a buffered reading strategy. Useful for slower media like HDDs, where seeks cause performance drops. Used by default on Windows. This does not perform optimally on SSDs under all circumstances, for better best-case performance, try using :class:`~libertem.io.dataset.base.MMapBackend` instead. Parameters ---------- max_buffer_size : int Maximum buffer size, in bytes. This is passed to the tileshape negotiation to select the right depth. """ def __init__(self, max_buffer_size=16*1024*1024): self._max_buffer_size = max_buffer_size
[docs] @classmethod def from_json(cls, msg): """ Construct an instance from the already-decoded `msg`. """ raise NotImplementedError("TODO! implement me!")
[docs] def get_impl(self): return BufferedBackendImpl( max_buffer_size=self._max_buffer_size, )
class BufferedBackendImpl(IOBackendImpl): def __init__(self, max_buffer_size, direct_io=False): super().__init__() self._max_buffer_size = max_buffer_size self._direct_io = direct_io self._buffer_pool = BufferPool() @contextlib.contextmanager def open_files(self, fileset: FileSet): cls: Type[BufferedFile] if self._direct_io: cls = DirectBufferedFile else: cls = BufferedFile files = [ cls(path=f.path, desc=f).open() for f in fileset ] yield files for f in files: f.close() def need_copy( self, decoder, roi, native_dtype, read_dtype, tiling_scheme=None, fileset=None, sync_offset=0, corrections=None, ): return True # we always copy in this backend def get_read_and_decode(self, decode): key = (decode, "read") if key in _r_n_d_cache: return _r_n_d_cache[key] r_n_d = _make_buffered_reader_and_decoder(decode=decode) _r_n_d_cache[key] = r_n_d return r_n_d def get_max_io_size(self): return self._max_buffer_size def _get_tiles_by_block( self, tiling_scheme, open_files, read_ranges, read_dtype, native_dtype, decoder=None, corrections=None, sync_offset=0, ): if decoder is None: decoder = DtypeConversionDecoder() decode = decoder.get_decode( native_dtype=np.dtype(native_dtype), read_dtype=np.dtype(read_dtype), ) r_n_d = self._r_n_d = self.get_read_and_decode(decode) native_dtype = decoder.get_native_dtype(native_dtype, read_dtype) sig_dims = tiling_scheme.shape.sig.dims ds_shape = np.array(tiling_scheme.dataset_shape) largest_slice = sorted(( (prod(s_.shape), s_) for _, s_ in tiling_scheme.slices ), key=lambda x: x[0], reverse=True)[0][1] buf_shape = (tiling_scheme.depth,) + tuple(largest_slice.shape) need_clear = decoder.do_clear() slices = read_ranges[0] # Use NumPy prod for multidimensional array and axis parameter shape_prods = np.prod(slices[..., 1, :], axis=1, dtype=np.int64) ranges = read_ranges[1] scheme_indices = read_ranges[2] tile_block_size = len(tiling_scheme) with self._buffer_pool.empty(buf_shape, dtype=read_dtype) as out_decoded: out_decoded = out_decoded.reshape((-1,)) for block_idx in range(0, slices.shape[0], tile_block_size): block_ranges = ranges[block_idx:block_idx + tile_block_size] fill_factor, req_buf_size, min_per_file, max_per_file = block_get_min_fill_factor( block_ranges ) # TODO: if it makes sense, implement sparse variant # if req_buf_size > self._max_buffer_size or fill_factor < self._sparse_threshold: yield from self._read_block_dense( block_idx, tile_block_size, min_per_file, max_per_file, open_files, slices, ranges, scheme_indices, shape_prods, out_decoded, r_n_d, sig_dims, ds_shape, need_clear, native_dtype, corrections, ) def _read_block_dense( self, block_idx, tile_block_size, min_per_file, max_per_file, open_files, slices, ranges, scheme_indices, shape_prods, out_decoded, r_n_d, sig_dims, ds_shape, need_clear, native_dtype, corrections, ): """ Reads a block of tiles, starting at `block_idx`, having a size of `tile_block_size` read range entries. """ # phase 1: read buffers = Dict() # this list manages the lifetime of the ManagedBuffer instances; # after `buf_ref` goes out of scope, the buffers are returned to # the buffer pool, so make sure that this matches with the usage # of the buffers! buf_ref = [] for fileno in min_per_file.keys(): fh = open_files[fileno] # add align_to to allow for alignment cut: align_to = fh.get_blocksize() read_size = max_per_file[fileno] - min_per_file[fileno] + align_to # ManagedBuffer gives us memory in 4k blocks, so the size is 4k aligned mb = ManagedBuffer(self._buffer_pool, read_size, alignment=fh.get_blocksize()) arr = np.frombuffer(mb.buf, dtype=np.uint8) buf_ref.append(mb) seek_pos = min_per_file[fileno] alignment = 0 # seek_pos needs to be aligned to 4k block size, too: if seek_pos % align_to != 0: alignment = seek_pos % align_to seek_pos = align_to * (seek_pos // align_to) fh.seek(seek_pos) read_result = fh.readinto(arr) # read may be truncated, if the buffer is larger than the file; we # truncate the buffer, too, to make sure we don't use any # uninitialized values. Also cut off `alignment` bytes at the beginning, # which were read to make O_DIRECT happy: buffers[fileno] = read_result[alignment:] # phase 2: decode tiles from the data that was read for idx in range(block_idx, block_idx + tile_block_size): origin = slices[idx, 0] shape = slices[idx, 1] tile_ranges = ranges[idx] scheme_idx = scheme_indices[idx] out_cut = out_decoded[:shape_prods[idx]].reshape((shape[0], -1)) data = r_n_d( idx, buffers, sig_dims, tile_ranges, out_cut, native_dtype, do_zero=need_clear, origin=origin, shape=shape, ds_shape=ds_shape, offsets=min_per_file, ) tile_slice = Slice( origin=origin, shape=Shape(shape, sig_dims=sig_dims) ) data = data.reshape(shape) self.preprocess(data, tile_slice, corrections) yield DataTile( data, tile_slice=tile_slice, scheme_idx=scheme_idx, ) def get_tiles( self, tiling_scheme, fileset, read_ranges, roi, native_dtype, read_dtype, decoder, sync_offset, corrections, ): with self.open_files(fileset) as open_files: yield from self._get_tiles_by_block( tiling_scheme=tiling_scheme, open_files=open_files, read_ranges=read_ranges, read_dtype=read_dtype, native_dtype=native_dtype, decoder=decoder, corrections=corrections, sync_offset=sync_offset, )