Python API
The Python API is a concise API for using LiberTEM from Python code. It is suitable both for interactive scripting, for example from Jupyter notebooks, and for usage from within a Python application or script.
Context
The libertem.api.Context
object is the entry-point for most interaction
and processing with LiberTEM. It is used to load datasets, specify and run analyses.
The following snippet initializes a Context
ready-for-use
with default parameters and backing.
import libertem.api as lt
ctx = lt.Context()
See the API documentation
for the full capabilities
exposed by the Context
.
Basic example
This is a basic example to load the API, create a local cluster, load a file and run an analysis. For complete examples on how to use the Python API, please see the Jupyter notebooks in the example directory.
For more details, please see Loading data, Data Set API and format-specific reference. See Sample Datasets for publicly available datasets.
import sys
import logging
# Changed in 0.5.0: The thread count is set dynamically
# on the workers. No need for setting environment variables anymore.
import numpy as np
import matplotlib.pyplot as plt
from libertem import api
logging.basicConfig(level=logging.WARNING)
# Protect the entry point.
# LiberTEM uses dask, which uses multiprocessing to
# start worker processes.
# https://docs.python.org/3/library/multiprocessing.html
if __name__ == '__main__':
# api.Context() starts a new local cluster.
# The "with" clause makes sure we shut it down in the end.
with api.Context() as ctx:
try:
path = sys.argv[1]
ds = ctx.load(
'auto',
path=path,
)
except IndexError:
path = ('C:/Users/weber/Nextcloud/Projects/'
'Open Pixelated STEM framework/Data/EMPAD/'
'scan_11_x256_y256.emd')
ds = ctx.load(
'hdf5',
path=path,
ds_path='experimental/science_data/data',
)
(scan_y, scan_x, detector_y, detector_x) = ds.shape
mask_shape = (detector_y, detector_x)
# LiberTEM sends functions that create the masks
# rather than mask data to the workers in order
# to reduce transfers in the cluster.
def mask(): return np.ones(shape=mask_shape)
analysis = ctx.create_mask_analysis(dataset=ds, factories=[mask])
result = ctx.run(analysis, progress=True)
# Do something useful with the result:
print(result)
print(result.mask_0.raw_data)
# For each mask, one channel is present in the result.
# This may be different for other analyses.
# You can access the result channels by their key on
# the result object:
plt.figure()
plt.imshow(result.mask_0.raw_data)
plt.show()
# Otherwise, results handle like lists.
# For example, you can iterate over the result channels:
raw_result_list = [channel.raw_data for channel in result]
Custom processing routines
To go beyond the included capabilities of LiberTEM, you can implement your own using User-defined functions (UDFs).
Reference
For a full reference, please see Python API reference.
Executors
New in version 0.9.0: The executor API is internal. Since choice and parameters of executors are important for integration with Dask and other frameworks, they are now documented. Only the names and creation methods for executors are reasonably stable. The rest of the API is subject to change without notice. For that reason it is documented in the developer section and not in the API reference.
The default executor is DaskJobExecutor
that
uses the dask.distributed
scheduler. To support all LiberTEM features and
achieve optimal performance, the methods provided by LiberTEM to start a
dask.distributed
cluster should be used. However, LiberTEM can also run on a
“vanilla” dask.distributed
cluster. Please note that dask.distributed
clusters that are not created by LiberTEM might use threading or a mixture of threads
and processes, and therefore might behave or perform differently to a
LiberTEM-instantiated cluster.
The InlineJobExecutor
runs all tasks
synchronously in the current thread. This is useful for debugging and for
special applications such as running UDFs that perform their own multithreading
efficiently or for other non-standard use that requires tasks to be executed
sequentially and in order.
See also Threading for more information on multithreading in UDFs.
New in version 0.9.0.
The ConcurrentJobExecutor
runs all tasks
using concurrent.futures
. Using a
concurrent.futures.ThreadPoolExecutor
, which is the deafult behaviour,
allows sharing large amounts of data as well as other resources between the
main thread and workers efficiently, but is severely slowed down by the
Python global interpreter lock
under many circumstances. Furthermore, it can create thread safety issues such as
https://github.com/LiberTEM/LiberTEM-blobfinder/issues/35.
It is also in principle possible to use a concurrent.futures.ProcessPoolExecutor
as backing for the ConcurrentJobExecutor
,
though this is untested and is likely to lead to worse performance than the
LiberTEM default DaskJobExecutor
.
For special applications, the
DelayedJobExecutor
can use dask.delayed to delay the processing. This
is experimental, see Dask integration for more details. It might use threading as
well, depending on the Dask scheduler that is used by compute()
.
Common executor choices
New in version 0.9.0.
libertem.api.Context.make_with()
provides a convenient shortcut to start a
Context
with common executor choices. See the
API documentation
for available options!
Connect to a cluster
See Starting a custom cluster on how to start a scheduler and workers.
from libertem import api
from libertem.executor.dask import DaskJobExecutor
# Connect to a Dask.Distributed scheduler at 'tcp://localhost:8786'
with DaskJobExecutor.connect('tcp://localhost:8786') as executor:
ctx = api.Context(executor=executor)
...
Customize CPUs and CUDA devices
To control how many CPUs and which CUDA devices are used, you can specify them as follows:
from libertem import api
from libertem.executor.dask import DaskJobExecutor, cluster_spec
from libertem.utils.devices import detect
# Find out what would be used, if you like
# returns dictionary with keys "cpus" and "cudas", each with a list of device ids
devices = detect()
# Example: Deactivate CUDA devices by removing them from the device dictionary
devices['cudas'] = []
# Example: Deactivate CuPy integration
devices['has_cupy'] = False
# Example: Use 3 CPUs. The IDs are ignored at the moment, i.e. no CPU pinning
devices['cpus'] = range(3)
# Generate a spec for a Dask.distributed SpecCluster
# Relevant kwargs match the dictionary entries
spec = cluster_spec(**devices)
# Start a local cluster with the custom spec
with DaskJobExecutor.make_local(spec=spec) as executor:
ctx = api.Context(executor=executor)
...
Please see Dask.Distributed for a reference of the Dask-based executor.
Pipelined executor
New in version 0.10.0.
For live data processing using
LiberTEM-live, the
PipelinedExecutor
provides a multiprocessing executor that routes the live data source in a round-robin
fashion to worker processes. This is important to support processing that cannot keep
up with the detector speed on a single CPU core. This executor also works for offline
data sets in principle, but is not optimized for that use case.
Similar to the Dask-based executor, it is possible to customize the devices used for computation:
from libertem import api
from libertem.executor.pipelined import PipelinedExecutor
from libertem.utils.devices import detect
spec = PipelinedExecutor.make_spec(cpus=[0, 1, 2], cudas=[])
executor = PipelinedExecutor(
spec=spec,
pin_workers=False, # set to True to keep worker processes pinned to specific CPU cores or CPUs
)
...
executor.close()
# you can also use the `detect` function as above:
spec2 = PipelinedExecutor.make_spec(**detect())
Please see Pipelined executor for a reference of the pipelined executor, and the LiberTEM-live documentation for details on live processing.