Events, Queues & Background Jobs
Decouple your application with event-driven architecture and async processing.
Table of Contents
- Events & Listeners
- Dispatching Events
- Listener Attributes
- Conditional & One-Shot Listeners
- Background Jobs
- Queue Configuration
- Job Chains
- Failed Jobs
- Task Scheduling
Events & Listeners
Events decouple your business logic. When something happens (a post is published, a user registers), you dispatch an event. Listeners react independently.
Service EventDispatcher Listeners
│ │ │
│── dispatch(event) ──────▶│ │
│ │── NotifyAdmin ───────────▶│
│ │── SendEmail ─────────────▶│
│ │── UpdateStats ───────────▶│
│◀── return ──────────────│ │
Step 1: Create an Event
Create app/Event/ProductCreated.php:
<?php
declare(strict_types=1);
namespace App\Event;
use App\Entity\Product;
/**
* Dispatched when a new product is created.
*/
final readonly class ProductCreated
{
public function __construct(
public Product $product,
public \DateTimeImmutable $createdAt = new \DateTimeImmutable(),
) {}
}
Events are plain PHP objects — no base class required.
Step 2: Create a Listener
Create app/Listener/NotifyOnProductCreated.php:
<?php
declare(strict_types=1);
namespace App\Listener;
use MonkeysLegion\Events\Attribute\Listener;
use Psr\Log\LoggerInterface;
use App\Event\ProductCreated;
#[Listener(ProductCreated::class)]
final class NotifyOnProductCreated
{
public function __construct(
private readonly LoggerInterface $logger,
) {}
public function __invoke(ProductCreated $event): void
{
$this->logger->info('New product created', [
'id' => $event->product->id,
'name' => $event->product->name,
]);
// Send notification, update search index, etc.
}
}
Step 3: Dispatch from Your Service
use Psr\EventDispatcher\EventDispatcherInterface;
#[Singleton]
final class ProductService
{
public function __construct(
private readonly ProductRepository $products,
private readonly EventDispatcherInterface $events,
) {}
public function create(CreateProductRequest $dto): Product
{
$product = new Product();
$product->name = $dto->name;
// ... set fields ...
$this->products->persist($product);
// Dispatch the event — all listeners run automatically
$this->events->dispatch(new ProductCreated($product));
return $product;
}
}
That's it. Listeners in app/Listener/ with #[Listener] attributes are auto-discovered.
Dispatching Events
Standard Dispatch
$this->events->dispatch(new ProductCreated($product));
Dispatch with Metrics
$result = $this->events->dispatchWithResult(new ProductCreated($product));
echo $result->listenersInvoked; // 3
echo $result->durationMs; // 12.5
echo $result->stopped; // false
echo $result->errors; // []
Batch Dispatch
$results = $this->events->dispatchBatch([
new ProductCreated($product1),
new ProductCreated($product2),
new InventoryUpdated($warehouse),
]);
Listener Attributes
Basic Listener
#[Listener(ProductCreated::class)]
final class IndexProduct
{
public function __invoke(ProductCreated $event): void { ... }
}
Prioritized Listener
Higher priority runs first:
#[Listener(ProductCreated::class, priority: 100)] // Runs first
final class ValidateProduct { ... }
#[Listener(ProductCreated::class, priority: 10)] // Runs second
final class IndexProduct { ... }
#[Listener(ProductCreated::class, priority: 1)] // Runs last
final class NotifyAdmin { ... }
Multiple Events
#[Listener(ProductCreated::class)]
#[Listener(ProductUpdated::class)]
final class UpdateSearchIndex
{
public function __invoke(object $event): void
{
// Handle both events
}
}
Before / After Interceptors
use MonkeysLegion\Events\Attribute\BeforeEvent;
use MonkeysLegion\Events\Attribute\AfterEvent;
#[BeforeEvent(ProductCreated::class)]
final class LogBeforeProductCreate
{
public function __invoke(ProductCreated $event): void
{
// Runs BEFORE regular listeners
}
}
#[AfterEvent(ProductCreated::class)]
final class LogAfterProductCreate
{
public function __invoke(ProductCreated $event): void
{
// Runs AFTER all regular listeners
}
}
Conditional & One-Shot Listeners
Conditional Listener
Only runs when a condition is met:
use MonkeysLegion\Events\Attribute\ListenWhen;
#[Listener(OrderCompleted::class)]
#[ListenWhen(method: 'shouldNotify')]
final class SendOrderNotification
{
public function shouldNotify(OrderCompleted $event): bool
{
return $event->order->total > 100.00;
}
public function __invoke(OrderCompleted $event): void
{
// Only runs for orders > $100
}
}
Stoppable Events
Events can stop propagation:
use Psr\EventDispatcher\StoppableEventInterface;
final class PaymentProcessing implements StoppableEventInterface
{
private bool $stopped = false;
public function stopPropagation(): void
{
$this->stopped = true;
}
public function isPropagationStopped(): bool
{
return $this->stopped;
}
}
Background Jobs
Jobs run asynchronously in a worker process — perfect for sending emails, processing images, or anything that shouldn't block the HTTP response.
Create a Job
Create app/Job/SendWelcomeEmailJob.php:
<?php
declare(strict_types=1);
namespace App\Job;
use MonkeysLegion\Queue\Contracts\ShouldQueue;
use Psr\Log\LoggerInterface;
use App\Repository\UserRepository;
final class SendWelcomeEmailJob implements ShouldQueue
{
public function __construct(
private readonly int $userId,
) {}
/**
* Dependencies are injected from the DI container.
*/
public function handle(
UserRepository $users,
LoggerInterface $logger,
): void {
$user = $users->find($this->userId);
if ($user === null) {
$logger->warning('SendWelcomeEmail: user not found', ['user_id' => $this->userId]);
return;
}
// Send the email
$logger->info('Welcome email sent', [
'user_id' => $user->id,
'email' => $user->email,
]);
}
public function failed(\Throwable $e): void
{
// Handle permanent failure — log, alert ops, etc.
}
public function tries(): int
{
return 3; // Retry up to 3 times
}
public function timeout(): int
{
return 30; // 30-second timeout
}
public function retryAfter(): int
{
return 60; // Wait 60 seconds between retries
}
}
Dispatch a Job
use MonkeysLegion\Queue\Contracts\QueueDispatcherInterface;
final class UserService
{
public function __construct(
private readonly QueueDispatcherInterface $queue,
) {}
public function register(CreateUserRequest $dto): User
{
$user = $this->createUser($dto);
// Queue the welcome email (non-blocking)
$this->queue->dispatch(new SendWelcomeEmailJob($user->id));
return $user;
}
}
Dispatch Options
// Normal async dispatch
$this->queue->dispatch($job);
// Execute synchronously (for testing or critical paths)
$this->queue->dispatchSync($job);
// Queue after HTTP response is sent
$this->queue->dispatchAfterResponse($job);
Queue Configuration
config/queue.mlc
queue {
default = ${QUEUE_CONNECTION:sync}
connections {
sync {
driver = "sync"
}
database {
driver = "database"
table = "jobs"
queue = "default"
retry_after = 90
}
redis {
driver = "redis"
connection = "default"
queue = "default"
retry_after = 90
}
}
failed {
driver = "database"
table = "failed_jobs"
}
}
.env
# Development: execute jobs immediately
QUEUE_CONNECTION=sync
# Production: use Redis for async processing
QUEUE_CONNECTION=redis
Running Workers
# Start a worker (runs forever)
php vendor/bin/ml queue:work
# Process and stop when empty
php vendor/bin/ml queue:work --stop-when-empty
# Specific queue
php vendor/bin/ml queue:work --queue=emails
# With memory limit
php vendor/bin/ml queue:work --memory=128
Job Chains
Execute jobs sequentially — each job starts only after the previous one succeeds:
$this->queue->chain([
new ProcessPayment($orderId),
new SendOrderConfirmation($orderId),
new UpdateInventory($orderId),
new NotifyWarehouse($orderId),
]);
If ProcessPayment fails, the remaining jobs never execute.
Failed Jobs
View Failed Jobs
php vendor/bin/ml queue:failed
Retry Failed Jobs
# Retry a specific job
php vendor/bin/ml queue:retry 5
# Retry all failed jobs
php vendor/bin/ml queue:retry --all
Flush Failed Jobs
php vendor/bin/ml queue:flush
Handling Failures in Code
final class ImportDataJob implements ShouldQueue
{
public function handle(): void
{
// Processing...
}
public function failed(\Throwable $e): void
{
// Called after all retries are exhausted
// Log, send alert, clean up temp files, etc.
}
}
Task Scheduling
Schedule recurring tasks without external cron management:
Define Schedules
In a schedule provider:
use MonkeysLegion\Schedule\Schedule;
final class AppSchedule
{
public function __invoke(Schedule $schedule): void
{
// Prune expired sessions every hour
$schedule->command('session:prune')
->hourly();
// Send digest emails daily at 8 AM
$schedule->call(fn() => $this->sendDigest())
->dailyAt('08:00');
// Process queued jobs every minute
$schedule->command('queue:work --stop-when-empty')
->everyMinute();
// Clean temp files weekly
$schedule->job(new CleanTempFilesJob())
->weekly();
// Custom cron expression
$schedule->command('reports:generate')
->cron('0 2 * * 1'); // 2 AM every Monday
}
}
Frequency Reference
| Method | Schedule |
|---|---|
everyMinute() | Every minute |
everyFiveMinutes() | Every 5 minutes |
everyFifteenMinutes() | Every 15 minutes |
everyThirtyMinutes() | Every 30 minutes |
hourly() | Every hour |
daily() | Daily at midnight |
dailyAt('13:00') | Daily at 1 PM |
weekly() | Weekly on Sunday |
monthly() | First day of each month |
cron('* * * * *') | Custom cron expression |
Server Cron Setup
Add a single cron entry on your server:
* * * * * cd /path/to/app && php vendor/bin/ml schedule:run >> /dev/null 2>&1