pixie16.pipeline module¶
- exception pixie16.pipeline.Error(msg: str)¶
Bases:
Exception
- class pixie16.pipeline.Message¶
Bases:
objectStores pipelines messages.
- class CONFIGURE(*args)¶
Bases:
_Message[dict]
- class CONNECT(*args)¶
Bases:
_Message[str]
- class DATA(*args)¶
Bases:
_Message[Any]
- class ERROR(*args)¶
Bases:
_Message[dict]
- FINISHED = '🏁'¶
- class PRINT(*args)¶
Bases:
_Message[dict]
- QUIT = '⏏️'¶
- class REPORT(*args)¶
Bases:
_Message[dict]
- START = '▶️'¶
- STOP = '⏹️'¶
- class pixie16.pipeline.Pipeline(task_kwargs=None)¶
Bases:
objectMain orchestrator for multi-process data analysis workflows.
The Pipeline class manages a collection of Task processes connected via ZeroMQ messaging. It provides a framework for building complex data processing workflows with real-time monitoring and control.
The pipeline operates as a state machine with four main states: - ⏸️ WAITING: Pipeline idle, ready to accept tasks - ▶️ RUNNING: Pipeline actively processing data - ⏏️ DYING: Pipeline shutting down gracefully - 🆘 PANICKING: Pipeline in error state
Message Types¶
ℹ️ CONFIGURE: Update user-configurable parameters
🔁 CONNECT: Update system/network parameters
▶️ START: Begin data processing
⏹️ STOP: Pause processing, return to waiting
⏏️ QUIT: Shutdown pipeline and all tasks
📶 DATA: Inter-task data transmission
🆗 REPORT: Status and monitoring information
🔣 PRINT: Debug/logging output
🆘 ERROR: Error conditions and exceptions
Examples
>>> pipeline = Pipeline() >>> pipeline.create_task("GatherData") >>> pipeline.create_task("ConvertToEvents") >>> pipeline.start()
- configure(config: dict[str, Any]) None¶
Configures all task by sending the config dictionary to all tasks
- Parameters:
config – Parameters to send.
- connect(tasks_active: list[str]) None¶
Does connecting.
- Parameters:
tasks_active – Names of tasks to connect.
- Raises:
RuntimeError – Not all tasks responding.
- create_task(name: str)¶
Create and register a new task process.
Instantiates a task class by name from the global task registry, configures it with pipeline parameters, and starts the process.
- Parameters:
name (str) – Task class name as registered in Task.class_registry.
- Raises:
ValueError – If task name not found in Task.class_registry.
Notes
Tasks are automatically connected to the pipeline’s ZeroMQ router and inherit any task_kwargs passed to the pipeline constructor.
- handle_error(name: str, err: Error) None¶
Handles
Message.ERROR.- Parameters:
name – Task name.
err – Recieved error.
- handle_print(name: str, string: str) None¶
Handles
Message.PRINT.- Parameters:
name – Task name.
string – Recieved string.
- handle_report(name: str, report: dict[str, Any]) None¶
Handles
Message.REPORT.- Parameters:
name – Task name.
report – Recieved report.
- kill() None¶
Kills all tasks.
- poll() bool¶
Does listening. Returns True if a message was received, False otherwise.
- quit() None¶
- send_connect(name: str, address_pull: str, is_start: bool, is_final: bool) None¶
Sends
Message.CONNECT.- Parameters:
name – Task name.
address_pull – ZMQ pull socket address.
is_start – First task flag.
is_final – Last task flag.
- send_kill(name: str)¶
Kills the given task.
- Parameters:
name – Task to kill.
- start() None¶
- stop() None¶
- class pixie16.pipeline.Status(value)¶
Bases:
EnumRepresents
Taskstates.- DYING = '⏏️'¶
- FINISHED = '🏁'¶
- PANICKING = '🆘'¶
- RUNNING = '▶️'¶
- WAITING = '⏸️'¶
- class pixie16.pipeline.Task(**kwargs)¶
Bases:
ProcessBase class for pipeline processing tasks.
Task represents a single processing unit in the data analysis pipeline. Each task runs as an independent process and communicates via ZeroMQ messages. Tasks implement specific data processing algorithms and can be chained together to create complex workflows.
Architecture¶
Tasks follow a message-driven architecture with four method categories: - Handlers: Process incoming messages from other tasks/pipeline - Senders: Dispatch outgoing messages and data - Actions: User interface methods for external control - Callbacks: Extension points for subclass customization
Task States¶
⏸️ WAITING: Task idle, ready to process
▶️ RUNNING: Task actively processing data
🏁 FINISHED: Task completed successfully
⏏️ DYING: Task shutting down
🆘 PANICKING: Task in error state
Message Protocol¶
ℹ️ CONFIGURE: Update user parameters
🔁 CONNECT: Update system parameters
▶️ START: Begin processing
⏹️ STOP: Pause processing
⏏️ QUIT: Shutdown task
📶 DATA: Inter-task data flow
🆗 REPORT: Status/monitoring info
🔣 PRINT: Debug output
🆘 ERROR: Error conditions
Examples
>>> class MyTask(Task): ... def callback_process(self, data): ... return process_data(data) >>> task = MyTask() >>> task.start()
- begin()¶
Starts the task.
- class_registry = {'ConvertToEvents': <class 'pixie16.pipeline.tasks.ConvertToEvents.ConvertToEvents'>, 'DummyData': <class 'pixie16.pipeline.tasks.DummyData.DummyData'>, 'GatherData': <class 'pixie16.pipeline.tasks.GatherData.GatherData'>, 'GatherEvents': <class 'pixie16.pipeline.tasks.GatherEvents.GatherEvents'>, 'LoadFiles': <class 'pixie16.pipeline.tasks.LoadFiles.LoadFiles'>, 'PickSingleModule': <class 'pixie16.pipeline.tasks.PickSingleModule.PickSingleModule'>, 'PrintData': <class 'pixie16.pipeline.tasks.PrintData.PrintData'>, 'SortEvents': <class 'pixie16.pipeline.tasks.SortEvents.SortEvents'>, 'TakeData': <class 'pixie16.pipeline.tasks.TakeData.TakeData'>}¶
- configure(config: dict)¶
Configures the task.
- Parameters:
config – Configuration for configuring.
- connect(address_pull: str, is_start: bool, is_final: bool)¶
Connects the task.
- Parameters:
address_pull – ZMQ pll address.
is_start – First task flag.
is_final – Last task flag.
- do_configure(config: dict) None¶
configurecallback.- Parameters:
config – Configuration for configuring.
- do_connect(address_pull: str, is_start: bool, is_final: bool) None¶
connectcallback.- Parameters:
address_pull – ZMQ pll address.
is_start – First task flag.
is_final – Last task flag.
- do_init() None¶
initcallback.
- do_quit() None¶
quitcallback.
- do_start() None¶
startcallback.
- do_stop() None¶
stopcallback.
- do_wait() None¶
waitcallback.
- do_work(datum: Any) None¶
workcallback.- Parameters:
datum – Data for working.
- flush_report() None¶
Flushes any pending reports to the pipeline.
- init()¶
- poll()¶
Listens to sockets for the task.
- print(string: str) None¶
Prints for the task.
- Parameters:
string – String to print.
- quit()¶
Quits the task.
- report(report: dict) None¶
Reports for the task.
- Parameters:
report – Report to report.
- run()¶
Represents the task.
- send(report: Any) None¶
Reports for the task.
- Parameters:
data – Data to send.
- send_data(data: Any) None¶
Sends
Message.DATA.- Parameters:
data – Data to send.
- send_finished() None¶
Sends
Message.FINISHED.
- send_heart_beat()¶
send a heart beat every second
- send_report(report: dict, force: bool = False) None¶
Sends
Message.REPORT.- Parameters:
report – Report to send.
force – Force immediate dispatch bypassing rate limiter.
- stop()¶
Stops the task.
- wait()¶
Waits for the task.
- work() None¶
Works for the task.