pixie16.pipeline module

exception pixie16.pipeline.Error(msg: str)

Bases: Exception

class pixie16.pipeline.Message

Bases: object

Stores pipelines messages.

class CONFIGURE(*args)

Bases: _Message[dict]

class CONNECT(*args)

Bases: _Message[str]

class DATA(*args)

Bases: _Message[Any]

class ERROR(*args)

Bases: _Message[dict]

FINISHED = '🏁'
class PRINT(*args)

Bases: _Message[dict]

QUIT = '⏏️'
class REPORT(*args)

Bases: _Message[dict]

START = '▶️'
STOP = '⏹️'
class pixie16.pipeline.Pipeline(task_kwargs=None)

Bases: object

Main orchestrator for multi-process data analysis workflows.

The Pipeline class manages a collection of Task processes connected via ZeroMQ messaging. It provides a framework for building complex data processing workflows with real-time monitoring and control.

The pipeline operates as a state machine with four main states: - ⏸️ WAITING: Pipeline idle, ready to accept tasks - ▶️ RUNNING: Pipeline actively processing data - ⏏️ DYING: Pipeline shutting down gracefully - 🆘 PANICKING: Pipeline in error state

Message Types

  • ℹ️ CONFIGURE: Update user-configurable parameters

  • 🔁 CONNECT: Update system/network parameters

  • ▶️ START: Begin data processing

  • ⏹️ STOP: Pause processing, return to waiting

  • ⏏️ QUIT: Shutdown pipeline and all tasks

  • 📶 DATA: Inter-task data transmission

  • 🆗 REPORT: Status and monitoring information

  • 🔣 PRINT: Debug/logging output

  • 🆘 ERROR: Error conditions and exceptions

Examples

>>> pipeline = Pipeline()
>>> pipeline.create_task("GatherData")
>>> pipeline.create_task("ConvertToEvents")
>>> pipeline.start()
configure(config: dict[str, Any]) None

Configures all task by sending the config dictionary to all tasks

Parameters:

config – Parameters to send.

connect(tasks_active: list[str]) None

Does connecting.

Parameters:

tasks_active – Names of tasks to connect.

Raises:

RuntimeError – Not all tasks responding.

create_task(name: str)

Create and register a new task process.

Instantiates a task class by name from the global task registry, configures it with pipeline parameters, and starts the process.

Parameters:

name (str) – Task class name as registered in Task.class_registry.

Raises:

ValueError – If task name not found in Task.class_registry.

Notes

Tasks are automatically connected to the pipeline’s ZeroMQ router and inherit any task_kwargs passed to the pipeline constructor.

handle_error(name: str, err: Error) None

Handles Message.ERROR.

Parameters:
  • name – Task name.

  • err – Recieved error.

handle_print(name: str, string: str) None

Handles Message.PRINT.

Parameters:
  • name – Task name.

  • string – Recieved string.

handle_report(name: str, report: dict[str, Any]) None

Handles Message.REPORT.

Parameters:
  • name – Task name.

  • report – Recieved report.

kill() None

Kills all tasks.

poll() bool

Does listening. Returns True if a message was received, False otherwise.

quit() None
send_connect(name: str, address_pull: str, is_start: bool, is_final: bool) None

Sends Message.CONNECT.

Parameters:
  • name – Task name.

  • address_pull – ZMQ pull socket address.

  • is_start – First task flag.

  • is_final – Last task flag.

send_kill(name: str)

Kills the given task.

Parameters:

name – Task to kill.

start() None
stop() None
update_active_tasks_status(msg_type: Message)
class pixie16.pipeline.Status(value)

Bases: Enum

Represents Task states.

DYING = '⏏️'
FINISHED = '🏁'
PANICKING = '🆘'
RUNNING = '▶️'
WAITING = '⏸️'
class pixie16.pipeline.Task(**kwargs)

Bases: Process

Base class for pipeline processing tasks.

Task represents a single processing unit in the data analysis pipeline. Each task runs as an independent process and communicates via ZeroMQ messages. Tasks implement specific data processing algorithms and can be chained together to create complex workflows.

Architecture

Tasks follow a message-driven architecture with four method categories: - Handlers: Process incoming messages from other tasks/pipeline - Senders: Dispatch outgoing messages and data - Actions: User interface methods for external control - Callbacks: Extension points for subclass customization

Task States

  • ⏸️ WAITING: Task idle, ready to process

  • ▶️ RUNNING: Task actively processing data

  • 🏁 FINISHED: Task completed successfully

  • ⏏️ DYING: Task shutting down

  • 🆘 PANICKING: Task in error state

Message Protocol

  • ℹ️ CONFIGURE: Update user parameters

  • 🔁 CONNECT: Update system parameters

  • ▶️ START: Begin processing

  • ⏹️ STOP: Pause processing

  • ⏏️ QUIT: Shutdown task

  • 📶 DATA: Inter-task data flow

  • 🆗 REPORT: Status/monitoring info

  • 🔣 PRINT: Debug output

  • 🆘 ERROR: Error conditions

Examples

>>> class MyTask(Task):
...     def callback_process(self, data):
...         return process_data(data)
>>> task = MyTask()
>>> task.start()
begin()

Starts the task.

class_registry = {'ConvertToEvents': <class 'pixie16.pipeline.tasks.ConvertToEvents.ConvertToEvents'>, 'DummyData': <class 'pixie16.pipeline.tasks.DummyData.DummyData'>, 'GatherData': <class 'pixie16.pipeline.tasks.GatherData.GatherData'>, 'GatherEvents': <class 'pixie16.pipeline.tasks.GatherEvents.GatherEvents'>, 'LoadFiles': <class 'pixie16.pipeline.tasks.LoadFiles.LoadFiles'>, 'PickSingleModule': <class 'pixie16.pipeline.tasks.PickSingleModule.PickSingleModule'>, 'PrintData': <class 'pixie16.pipeline.tasks.PrintData.PrintData'>, 'SortEvents': <class 'pixie16.pipeline.tasks.SortEvents.SortEvents'>, 'TakeData': <class 'pixie16.pipeline.tasks.TakeData.TakeData'>}
configure(config: dict)

Configures the task.

Parameters:

config – Configuration for configuring.

connect(address_pull: str, is_start: bool, is_final: bool)

Connects the task.

Parameters:
  • address_pull – ZMQ pll address.

  • is_start – First task flag.

  • is_final – Last task flag.

do_configure(config: dict) None

configure callback.

Parameters:

config – Configuration for configuring.

do_connect(address_pull: str, is_start: bool, is_final: bool) None

connect callback.

Parameters:
  • address_pull – ZMQ pll address.

  • is_start – First task flag.

  • is_final – Last task flag.

do_init() None

init callback.

do_quit() None

quit callback.

do_start() None

start callback.

do_stop() None

stop callback.

do_wait() None

wait callback.

do_work(datum: Any) None

work callback.

Parameters:

datum – Data for working.

flush_report() None

Flushes any pending reports to the pipeline.

init()
panic(error: Error)

Panics for the task.

Parameters:

error – Error for panicking.

poll()

Listens to sockets for the task.

print(string: str) None

Prints for the task.

Parameters:

string – String to print.

quit()

Quits the task.

report(report: dict) None

Reports for the task.

Parameters:

report – Report to report.

run()

Represents the task.

send(report: Any) None

Reports for the task.

Parameters:

data – Data to send.

send_data(data: Any) None

Sends Message.DATA.

Parameters:

data – Data to send.

send_finished() None

Sends Message.FINISHED.

send_heart_beat()

send a heart beat every second

send_report(report: dict, force: bool = False) None

Sends Message.REPORT.

Parameters:
  • report – Report to send.

  • force – Force immediate dispatch bypassing rate limiter.

stop()

Stops the task.

wait()

Waits for the task.

work() None

Works for the task.