pixie16.tasks module

A collection of tasks that can be used in a pipeline.

These tasks can also easily by sub-classed to create custom ones or used as an example on how to write your own.

class pixie16.tasks.ConvertToEvents

Bases: Task

Task to convert data stream to events.

do_work(value_dict)

Needs to be overwritten to do the work.

value will be the current element in the queue.

class pixie16.tasks.DummyData(runtime, filename=None)

Bases: Task

A class that will send dummy data into a pipeline.

This can be used as a first task in a pipeline during testing and when the pixie16 is not online/available.

do_work(value)

Needs to be overwritten to do the work.

value will be the current element in the queue.

class pixie16.tasks.GatherData(maxsize=50000000.0, save_size=None, path=None)

Bases: Task

Task to create larger data buckets out of the data directly from the FPGA.

It has two different buckets: one for sending data to the next queue and one for saving data to disk.

cleanup()

Can be overwritten to close any open files, etc

do_work(value)

Needs to be overwritten to do the work.

value will be the current element in the queue.

get_size(data)
save_data(out)
class pixie16.tasks.GatherEvents(size=1000000)

Bases: Task

Gather Events into larger chunks.

cleanup()

Can be overwritten to close any open files, etc

do_work(value)

Needs to be overwritten to do the work.

value will be the current element in the queue.

class pixie16.tasks.LoadFiles(file_list, batch_size=1000)

Bases: Task

Load events from a list of files,

cleanup()

Can be overwritten to close any open files, etc

do_work(value)

Needs to be overwritten to do the work.

value will be the current element in the queue.

class pixie16.tasks.PickSingleModule(module: int = 0)

Bases: Task

Task to pick events from a single module.

Takes output from, e.g., ConvertToEvents and outputs only the data for a single module.

do_work(value)

Needs to be overwritten to do the work.

value will be the current element in the queue.

class pixie16.tasks.SetDefaults(modules: list)

Bases: Task

Set defaults for parameters in the pixie.

do_work(value)

Needs to be overwritten to do the work.

value will be the current element in the queue.

class pixie16.tasks.SortEvents(start_time: float, maxsize: int = 10000, number_to_sort: int = 8000, all_start_times: list[float] | float | None = None)

Bases: Task

Task to sort events by timestamp.

This task, takes as input list of events that have already been converted from the binary format to namedtuples. It then populates an additionl column chunk time in the namedtuple which will roughly be the unix time stamp of the event. For a run that wasn’t paused, the chunk timestamp will just be the normal time stamp plus the unix timestamp from the beginning of the run.

The task will add incoming events to an internal list and once a certain number, M, of events has been reached, it will sort the events and send the first N off. Normally N<M, so that some events are held back since they might need to be sorted together with new events that will come in next.

In case the event input stream comes from a run that has been paused and restarted, the timestamps in the event stream might have reset to start counting again at zero. However, we want the chunk time to be roughly the real time, so we have to add special code to handle this.

For this to work, we detect whenever the timestamps go back to zero (our proxy for this is to test if the current event’s timestamp is more than 20s before the last event, which should only happen if the counter did reset).

We supply several options through the init variable all_start_times to handle the chunk time:

  1. all_start_times is None In this case, we just use the system time.time() for this event and calculate the following events chunk time by adding the delta t calculated from the pixie timestamp. This mode should be used during data acquisition. If one starts/stops the data acquisition, one should also write down all the start time, so that one can rerun the binary convertion later by using the next mode.

  2. all_start_times is a list of unix time stamps In this case, we use a value from the list whenever we notice a reset in the timestamps and otherwise handle delta t’s as above. This mode can be used when binary data as to be converted again and the original time stamps should be recreated.

  3. all_start_times is a float > 0 This mode can be used to estimate times when binary data needs to be converted again, but start times for b) are not available. In this case, we just ignore the negative dt at the point of the reset and keep adding positive dt to the chunk time and also add the value of all_start_times at the time of the reset. The value should be an estimate of how long it took between stopping and re-starting the data acquisition.

cleanup()

Can be overwritten to close any open files, etc

do_work(events_lst)

Does the sorting, time reset recognition and handling.

Whenever we reach self.maxsize events, sort them by time and pass the first self.N_to_sort events to the next Task. We keep the a few events behind, since they might need to be sorted with the next incoming batch.

class pixie16.tasks.TakeData(runtime)

Bases: Task

Example Task to aquire data, each binary blob from the FPGA will be put in the queue.

A second task can then convert the binary blob to events and from there perhaps another task can convert to pandas.

One tricky part is that we also need to initialize and boot the pixie, since this will run in another process. However, we cannot do this in the init, since the init will still be executed in the main thread, so this has to happen in do_work.

We also check the runtime in do_work and then stop the process and end the run during the cleanup phase. We also read out the remaining events from the buffer and send them on during cleanup.

cleanup()

Can be overwritten to close any open files, etc

do_work(value)

Needs to be overwritten to do the work.

value will be the current element in the queue.