The Python API is a concise API for using LiberTEM from Python code. It is suitable both for interactive scripting, for example from Jupyter notebooks, and for usage from within a Python application or script.
libertem.api.Context object is the entry-point for most interaction
and processing with LiberTEM. It is used to load datasets, specify and run analyses.
The following snippet initializes a
with default parameters and backing.
import libertem.api as lt ctx = lt.Context()
API documentation for the full capabilities
exposed by the
This is a basic example to load the API, create a local cluster, load a file and run an analysis. For complete examples on how to use the Python API, please see the Jupyter notebooks in the example directory.
For more details, please see Loading data, Data Set API and format-specific reference. See Sample Datasets for publicly available datasets.
import sys import logging # Changed in 0.5.0: The thread count is set dynamically # on the workers. No need for setting environment variables anymore. import numpy as np import matplotlib.pyplot as plt from libertem import api logging.basicConfig(level=logging.WARNING) # Protect the entry point. # LiberTEM uses dask, which uses multiprocessing to # start worker processes. # https://docs.python.org/3/library/multiprocessing.html if __name__ == '__main__': # api.Context() starts a new local cluster. # The "with" clause makes sure we shut it down in the end. with api.Context() as ctx: try: path = sys.argv ds = ctx.load( 'auto', path=path, ) except IndexError: path = ('C:/Users/weber/Nextcloud/Projects/' 'Open Pixelated STEM framework/Data/EMPAD/' 'scan_11_x256_y256.emd') ds = ctx.load( 'hdf5', path=path, ds_path='experimental/science_data/data', ) (scan_y, scan_x, detector_y, detector_x) = ds.shape mask_shape = (detector_y, detector_x) # LiberTEM sends functions that create the masks # rather than mask data to the workers in order # to reduce transfers in the cluster. def mask(): return np.ones(shape=mask_shape) analysis = ctx.create_mask_analysis(dataset=ds, factories=[mask]) result = ctx.run(analysis, progress=True) # Do something useful with the result: print(result) print(result.mask_0.raw_data) # For each mask, one channel is present in the result. # This may be different for other analyses. # You can access the result channels by their key on # the result object: plt.figure() plt.imshow(result.mask_0.raw_data) plt.show() # Otherwise, results handle like lists. # For example, you can iterate over the result channels: raw_result_list = [channel.raw_data for channel in result]
Custom processing routines
To go beyond the included capabilities of LiberTEM, you can implement your own using User-defined functions (UDFs).
For a full reference, please see Python API reference.
New in version 0.9.0: The executor API is internal. Since choice and parameters of executors are important for integration with Dask and other frameworks, they are now documented. Only the names and creation methods for executors are reasonably stable. The rest of the API is subject to change without notice. For that reason it is documented in the developer section and not in the API reference.
The default executor is
dask.distributed scheduler. To support all LiberTEM features and
achieve optimal performance, the methods provided by LiberTEM to start a
dask.distributed cluster should be used. However, LiberTEM can also run on a
dask.distributed cluster. Please note that
clusters that are not created by LiberTEM might use threading or a mixture of threads
and processes, and therefore might behave or perform differently to a
InlineJobExecutor runs all tasks
synchronously in the current thread. This is useful for debugging and for
special applications such as running UDFs that perform their own multithreading
efficiently or for other non-standard use that requires tasks to be executed
sequentially and in order.
See also Threading for more information on multithreading in UDFs.
New in version 0.9.0.
ConcurrentJobExecutor runs all tasks
concurrent.futures. Using a
concurrent.futures.ThreadPoolExecutor, which is the deafult behaviour,
allows sharing large amounts of data as well as other resources between the
main thread and workers efficiently, but is severely slowed down by the
Python global interpreter lock
under many circumstances. Furthermore, it can create thread safety issues such as
It is also in principle possible to use a
as backing for the
though this is untested and is likely to lead to worse performance than the
For special applications, the
DelayedJobExecutor can use dask.delayed to delay the processing. This
is experimental, see Dask integration for more details. It might use threading as
well, depending on the Dask scheduler that is used by
Common executor choices
New in version 0.9.0.
libertem.api.Context.make_with() provides a convenient shortcut to start a
Context with common executor choices. See the
for available options!
Connect to a cluster
See Starting a custom cluster on how to start a scheduler and workers.
from libertem import api from libertem.executor.dask import DaskJobExecutor # Connect to a Dask.Distributed scheduler at 'tcp://localhost:8786' with DaskJobExecutor.connect('tcp://localhost:8786') as executor: ctx = api.Context(executor=executor) ...
Customize CPUs and CUDA devices
To control how many CPUs and which CUDA devices are used, you can specify them as follows:
from libertem import api from libertem.executor.dask import DaskJobExecutor, cluster_spec from libertem.utils.devices import detect # Find out what would be used, if you like # returns dictionary with keys "cpus" and "cudas", each with a list of device ids devices = detect() # Example: Deactivate CUDA devices by removing them from the device dictionary devices['cudas'] =  # Example: Deactivate CuPy integration devices['has_cupy'] = False # Example: Use 3 CPUs. The IDs are ignored at the moment, i.e. no CPU pinning devices['cpus'] = range(3) # Generate a spec for a Dask.distributed SpecCluster # Relevant kwargs match the dictionary entries spec = cluster_spec(**devices) # Start a local cluster with the custom spec with DaskJobExecutor.make_local(spec=spec) as executor: ctx = api.Context(executor=executor) ...
Please see Dask.Distributed for a reference of the Dask-based executor.
New in version 0.10.0.
For live data processing using
provides a multiprocessing executor that routes the live data source in a round-robin
fashion to worker processes. This is important to support processing that cannot keep
up with the detector speed on a single CPU core. This executor also works for offline
data sets in principle, but is not optimized for that use case.
Similar to the Dask-based executor, it is possible to customize the devices used for computation:
from libertem import api from libertem.executor.pipelined import PipelinedExecutor from libertem.utils.devices import detect spec = PipelinedExecutor.make_spec(cpus=[0, 1, 2], cudas=) executor = PipelinedExecutor( spec=spec, pin_workers=False, # set to True to keep worker processes pinned to specific CPU cores or CPUs ) ... executor.close() # you can also use the `detect` function as above: spec2 = PipelinedExecutor.make_spec(**detect())
Please see Pipelined executor for a reference of the pipelined executor, and the LiberTEM-live documentation for details on live processing.