📦 Marketplace⭐ GitHub
Guidesv2.0

Events, Queues & Background Jobs

Decouple your application with event-driven architecture and async processing.


Table of Contents


Events & Listeners

Events decouple your business logic. When something happens (a post is published, a user registers), you dispatch an event. Listeners react independently.

Service                  EventDispatcher              Listeners
   │                          │                           │
   │── dispatch(event) ──────▶│                           │
   │                          │── NotifyAdmin ───────────▶│
   │                          │── SendEmail ─────────────▶│
   │                          │── UpdateStats ───────────▶│
   │◀── return ──────────────│                           │

Step 1: Create an Event

Create app/Event/ProductCreated.php:

<?php
declare(strict_types=1);

namespace App\Event;

use App\Entity\Product;

/**
 * Dispatched when a new product is created.
 */
final readonly class ProductCreated
{
    public function __construct(
        public Product $product,
        public \DateTimeImmutable $createdAt = new \DateTimeImmutable(),
    ) {}
}

Events are plain PHP objects — no base class required.

Step 2: Create a Listener

Create app/Listener/NotifyOnProductCreated.php:

<?php
declare(strict_types=1);

namespace App\Listener;

use MonkeysLegion\Events\Attribute\Listener;
use Psr\Log\LoggerInterface;
use App\Event\ProductCreated;

#[Listener(ProductCreated::class)]
final class NotifyOnProductCreated
{
    public function __construct(
        private readonly LoggerInterface $logger,
    ) {}

    public function __invoke(ProductCreated $event): void
    {
        $this->logger->info('New product created', [
            'id'   => $event->product->id,
            'name' => $event->product->name,
        ]);

        // Send notification, update search index, etc.
    }
}

Step 3: Dispatch from Your Service

use Psr\EventDispatcher\EventDispatcherInterface;

#[Singleton]
final class ProductService
{
    public function __construct(
        private readonly ProductRepository $products,
        private readonly EventDispatcherInterface $events,
    ) {}

    public function create(CreateProductRequest $dto): Product
    {
        $product = new Product();
        $product->name = $dto->name;
        // ... set fields ...

        $this->products->persist($product);

        // Dispatch the event — all listeners run automatically
        $this->events->dispatch(new ProductCreated($product));

        return $product;
    }
}

That's it. Listeners in app/Listener/ with #[Listener] attributes are auto-discovered.


Dispatching Events

Standard Dispatch

$this->events->dispatch(new ProductCreated($product));

Dispatch with Metrics

$result = $this->events->dispatchWithResult(new ProductCreated($product));

echo $result->listenersInvoked; // 3
echo $result->durationMs;       // 12.5
echo $result->stopped;          // false
echo $result->errors;           // []

Batch Dispatch

$results = $this->events->dispatchBatch([
    new ProductCreated($product1),
    new ProductCreated($product2),
    new InventoryUpdated($warehouse),
]);

Listener Attributes

Basic Listener

#[Listener(ProductCreated::class)]
final class IndexProduct
{
    public function __invoke(ProductCreated $event): void { ... }
}

Prioritized Listener

Higher priority runs first:

#[Listener(ProductCreated::class, priority: 100)]  // Runs first
final class ValidateProduct { ... }

#[Listener(ProductCreated::class, priority: 10)]   // Runs second
final class IndexProduct { ... }

#[Listener(ProductCreated::class, priority: 1)]    // Runs last
final class NotifyAdmin { ... }

Multiple Events

#[Listener(ProductCreated::class)]
#[Listener(ProductUpdated::class)]
final class UpdateSearchIndex
{
    public function __invoke(object $event): void
    {
        // Handle both events
    }
}

Before / After Interceptors

use MonkeysLegion\Events\Attribute\BeforeEvent;
use MonkeysLegion\Events\Attribute\AfterEvent;

#[BeforeEvent(ProductCreated::class)]
final class LogBeforeProductCreate
{
    public function __invoke(ProductCreated $event): void
    {
        // Runs BEFORE regular listeners
    }
}

#[AfterEvent(ProductCreated::class)]
final class LogAfterProductCreate
{
    public function __invoke(ProductCreated $event): void
    {
        // Runs AFTER all regular listeners
    }
}

Conditional & One-Shot Listeners

Conditional Listener

Only runs when a condition is met:

use MonkeysLegion\Events\Attribute\ListenWhen;

#[Listener(OrderCompleted::class)]
#[ListenWhen(method: 'shouldNotify')]
final class SendOrderNotification
{
    public function shouldNotify(OrderCompleted $event): bool
    {
        return $event->order->total > 100.00;
    }

    public function __invoke(OrderCompleted $event): void
    {
        // Only runs for orders > $100
    }
}

Stoppable Events

Events can stop propagation:

use Psr\EventDispatcher\StoppableEventInterface;

final class PaymentProcessing implements StoppableEventInterface
{
    private bool $stopped = false;

    public function stopPropagation(): void
    {
        $this->stopped = true;
    }

    public function isPropagationStopped(): bool
    {
        return $this->stopped;
    }
}

Background Jobs

Jobs run asynchronously in a worker process — perfect for sending emails, processing images, or anything that shouldn't block the HTTP response.

Create a Job

Create app/Job/SendWelcomeEmailJob.php:

<?php
declare(strict_types=1);

namespace App\Job;

use MonkeysLegion\Queue\Contracts\ShouldQueue;
use Psr\Log\LoggerInterface;
use App\Repository\UserRepository;

final class SendWelcomeEmailJob implements ShouldQueue
{
    public function __construct(
        private readonly int $userId,
    ) {}

    /**
     * Dependencies are injected from the DI container.
     */
    public function handle(
        UserRepository $users,
        LoggerInterface $logger,
    ): void {
        $user = $users->find($this->userId);

        if ($user === null) {
            $logger->warning('SendWelcomeEmail: user not found', ['user_id' => $this->userId]);
            return;
        }

        // Send the email
        $logger->info('Welcome email sent', [
            'user_id' => $user->id,
            'email'   => $user->email,
        ]);
    }

    public function failed(\Throwable $e): void
    {
        // Handle permanent failure — log, alert ops, etc.
    }

    public function tries(): int
    {
        return 3; // Retry up to 3 times
    }

    public function timeout(): int
    {
        return 30; // 30-second timeout
    }

    public function retryAfter(): int
    {
        return 60; // Wait 60 seconds between retries
    }
}

Dispatch a Job

use MonkeysLegion\Queue\Contracts\QueueDispatcherInterface;

final class UserService
{
    public function __construct(
        private readonly QueueDispatcherInterface $queue,
    ) {}

    public function register(CreateUserRequest $dto): User
    {
        $user = $this->createUser($dto);

        // Queue the welcome email (non-blocking)
        $this->queue->dispatch(new SendWelcomeEmailJob($user->id));

        return $user;
    }
}

Dispatch Options

// Normal async dispatch
$this->queue->dispatch($job);

// Execute synchronously (for testing or critical paths)
$this->queue->dispatchSync($job);

// Queue after HTTP response is sent
$this->queue->dispatchAfterResponse($job);

Queue Configuration

config/queue.mlc

queue {
    default = ${QUEUE_CONNECTION:sync}

    connections {
        sync {
            driver = "sync"
        }
        database {
            driver    = "database"
            table     = "jobs"
            queue     = "default"
            retry_after = 90
        }
        redis {
            driver     = "redis"
            connection = "default"
            queue      = "default"
            retry_after = 90
        }
    }

    failed {
        driver = "database"
        table  = "failed_jobs"
    }
}

.env

# Development: execute jobs immediately
QUEUE_CONNECTION=sync

# Production: use Redis for async processing
QUEUE_CONNECTION=redis

Running Workers

# Start a worker (runs forever)
php vendor/bin/ml queue:work

# Process and stop when empty
php vendor/bin/ml queue:work --stop-when-empty

# Specific queue
php vendor/bin/ml queue:work --queue=emails

# With memory limit
php vendor/bin/ml queue:work --memory=128

Job Chains

Execute jobs sequentially — each job starts only after the previous one succeeds:

$this->queue->chain([
    new ProcessPayment($orderId),
    new SendOrderConfirmation($orderId),
    new UpdateInventory($orderId),
    new NotifyWarehouse($orderId),
]);

If ProcessPayment fails, the remaining jobs never execute.


Failed Jobs

View Failed Jobs

php vendor/bin/ml queue:failed

Retry Failed Jobs

# Retry a specific job
php vendor/bin/ml queue:retry 5

# Retry all failed jobs
php vendor/bin/ml queue:retry --all

Flush Failed Jobs

php vendor/bin/ml queue:flush

Handling Failures in Code

final class ImportDataJob implements ShouldQueue
{
    public function handle(): void
    {
        // Processing...
    }

    public function failed(\Throwable $e): void
    {
        // Called after all retries are exhausted
        // Log, send alert, clean up temp files, etc.
    }
}

Task Scheduling

Schedule recurring tasks without external cron management:

Define Schedules

In a schedule provider:

use MonkeysLegion\Schedule\Schedule;

final class AppSchedule
{
    public function __invoke(Schedule $schedule): void
    {
        // Prune expired sessions every hour
        $schedule->command('session:prune')
            ->hourly();

        // Send digest emails daily at 8 AM
        $schedule->call(fn() => $this->sendDigest())
            ->dailyAt('08:00');

        // Process queued jobs every minute
        $schedule->command('queue:work --stop-when-empty')
            ->everyMinute();

        // Clean temp files weekly
        $schedule->job(new CleanTempFilesJob())
            ->weekly();

        // Custom cron expression
        $schedule->command('reports:generate')
            ->cron('0 2 * * 1');  // 2 AM every Monday
    }
}

Frequency Reference

MethodSchedule
everyMinute()Every minute
everyFiveMinutes()Every 5 minutes
everyFifteenMinutes()Every 15 minutes
everyThirtyMinutes()Every 30 minutes
hourly()Every hour
daily()Daily at midnight
dailyAt('13:00')Daily at 1 PM
weekly()Weekly on Sunday
monthly()First day of each month
cron('* * * * *')Custom cron expression

Server Cron Setup

Add a single cron entry on your server:

* * * * * cd /path/to/app && php vendor/bin/ml schedule:run >> /dev/null 2>&1