pixie16.pipeline module

Task and Pipeline class to run data acquisition and data handling.

This file provides two base classes: Task and Pipeline.

They can be used to create a data acquisition pipeline to take data with the pixie16, convert the binary data to event data and do some calculation on them.

To use the classes, create custom Task that inherit the base class. Overwrite the init function if needed and the do_work function to process the current work load. The cleanup function can be overwritten to close files and is called once the tasks is finished or if it gets a keyboard interrupt.

Tasks can then be chained together automatically using the Pipeline class. This class can also be used to run the pipeline, print a nice status bar. An example use of 3 different tasks is:

A = TaskA() B = TaskB() C = TaskC() tasks = [A, B, C]

pipeline = Pipeline(tasks)

pipeline.start() # starts all tasks

pipeline.wait_with_progress(total=14.0) # reports progress with a bar

pipeline.join() # waits for all tasks to finish pipeline.close() # releases recsources

class pixie16.pipeline.Pipeline(tasks: list[Task], name: str = 'Run', verbose: bool = False)

Bases: object

Manage a linear chain of tasks.

Chain the input and outputs together, some shortcuts to start/stop/join all the Tasks and to test if any Task is still running.

close()

Release all resources, needed for pixie16.control.

execute(progress_bar_time: float | int = 0)

Runs the whole pipeline.

Parameters:

progress_bar_time – if > 0 run with a progress bar with this number as the total time it should take. Otherwise, run without progressbar.

is_alive()

Check if any task is still running.

join()

Join all tasks, i.e. wait until they are all done.

Link two tasks in a pipeline using a queue.

start()

Start all tasks.

stop()

Stop all workers in the pipeline.

update_status()

Look at the status queue and update the pipeline status.

We also parse out the runtime of the pixie16 if present.

wait()

Wait for the pipeline to finish.

The run can be interruptted using Ctrl-C.

wait_with_progress(total)

Create a progress bar while the pipeline is running and show the status updates.

The run can be interruptted using Ctrl-C.

class pixie16.pipeline.Task(event=None)

Bases: Process

Custom Process that can be stopped and joined into a pipeline.

Queues are used to get data or send data to the next Tasks. We provide functions that should be overwritten when customizing a Task:

do_work(value): gets called continously in a while loop without an input queue or gets called whenever a value arrives in the input queue. If the tasks is done, do_work should set self.done to True.

cleanup(): Will be called at the end of a tasks, option to close any files, etc.

If you start a data acquisition in the pixie, make sure that you call init_and_boot inside the do_work function. Since the, for example, the __init__ of a Task will still be executed in the main thread.

For example of Tasks, see task.py

Since Tasks are meant to be daisychained together, each has an input_queue and an output_queue. Furthermore, there can be a status_queue that is used to talk to the main thread and print information to the screen.

Any print output that is generated in the do_work function will be captured and send as a message to the main pipeline (using the status_queue) thread where it will be printed using rich.

cleanup()

Can be overwritten to close any open files, etc

do_work(value)

Needs to be overwritten to do the work.

value will be the current element in the queue.

handle_print(stringIO: StringIO, force=False)

Handle print statements that got captured

join(*args, **kwargs)

Stop all workers and wait until they are done.

run()

Method to be run in sub-process; can be overridden in sub-class

send_status(value_dict, force=False)

Send a status update to the main pipeline.

Status updates are only send at least 0.5 seconds apart so that they don’t spam the status queue (unless force is given).

Since sending of the value_dict is not garantueed, do not rely on this feature and only send information for information purposes.

Currently, there are two keys in the value_dict that have special meaning: - runtime: should be the elappsed time during data acquisition - message: Any message that should be printed to the screen (we use this, so that the progress bar works well)

stop()

Set signal that will stop all workers.