Source code for debusine.db.models.workers

# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Data models for db workers."""

import copy
import hashlib
from typing import Any, Optional, TYPE_CHECKING

from django.db import IntegrityError, models, transaction
from django.db.models import (
    CheckConstraint,
    Count,
    Exists,
    F,
    JSONField,
    Max,
    OuterRef,
    Q,
    QuerySet,
)
from django.db.models.expressions import Case, When
from django.db.models.functions import Extract, Greatest, Now
from django.utils import timezone
from django.utils.text import slugify

from debusine.db.models import WorkRequest
from debusine.db.models.auth import Token
from debusine.db.models.worker_pools import WorkerPool
from debusine.tasks.models import WorkerType

if TYPE_CHECKING:
    from django_stubs_ext.db.models import TypedModelMeta
else:
    TypedModelMeta = object


class WorkerManager(models.Manager["Worker"]):
    """Manager for Worker model."""

    def connected(self) -> QuerySet["Worker"]:
        """Return connected workers."""
        return self.filter(connected_at__isnull=False).order_by('connected_at')

    def waiting_for_work_request(self) -> QuerySet["Worker"]:
        """
        Return workers that can be assigned a new work request.

        The workers with fewer associated pending or running work requests
        than their concurrency level could take more work right now and are
        thus waiting for a work request.

        Worker's token must be enabled.
        """
        # Import here to prevent circular imports
        from debusine.db.models.work_requests import WorkRequest

        running_work_request_count = Count(
            'assigned_work_requests',
            filter=Q(
                assigned_work_requests__status__in=[
                    WorkRequest.Statuses.RUNNING,
                    WorkRequest.Statuses.PENDING,
                ]
            ),
        )
        workers = (
            self.filter(connected_at__isnull=False)
            .order_by(F('worker_pool').asc(nulls_first=True), 'connected_at')
            .annotate(count_running=running_work_request_count)
            .filter(count_running__lt=F("concurrency"))
            .filter(Q(worker_type=WorkerType.CELERY) | Q(token__enabled=True))
        )

        return workers

    def with_idle_time(self) -> QuerySet["Worker"]:
        """
        Annotate workers with `idle_time`.

        `Worker.idle_time` will be 0 for workers currently assigned a running
        work request, or the number of seconds the worker has been spending
        without a running work request assigned.
        """
        tasks = WorkRequest.objects.filter(worker=OuterRef("pk"))
        return self.annotate(
            idle_time=Case(
                When(
                    Exists(tasks.filter(status=WorkRequest.Statuses.RUNNING)),
                    then=0,
                ),
                When(
                    Exists(tasks),
                    then=Extract(
                        Now() - Max(tasks.values("completed_at")), "epoch"
                    ),
                ),
                default=Extract(
                    Now() - Greatest("registered_at", "instance_created_at"),
                    "epoch",
                ),
            )
        )

    @staticmethod
    def _generate_unique_name(name: str, counter: int) -> str:
        """Return name slugified adding "-counter" if counter != 1."""
        new_name = slugify(name.replace('.', '-'))

        if counter != 1:
            new_name += f'-{counter}'

        return new_name

    def create_with_fqdn(
        self,
        fqdn: str,
        token: Token,
        worker_type: WorkerType = WorkerType.EXTERNAL,
        worker_pool: WorkerPool | None = None,
    ) -> "Worker":
        """Return a new Worker with its name based on fqdn, with token."""
        counter = 1

        while True:
            name = self._generate_unique_name(fqdn, counter)
            try:
                with transaction.atomic():
                    return self.create(
                        name=name,
                        token=token,
                        worker_type=worker_type,
                        registered_at=timezone.now(),
                        worker_pool=worker_pool,
                    )
            except IntegrityError:
                counter += 1

    def create_pool_members(
        self,
        worker_pool: WorkerPool,
        count: int,
    ) -> None:
        """
        Create count new External Workers in worker_pool.

        Use throw-away activation tokens; when workers start, they'll
        exchange their activation tokens for full worker tokens.
        """
        created = 0
        index = 1
        while created < count:
            with transaction.atomic():
                try:
                    self.create(
                        name=f"{worker_pool.name}-{index:03}",
                        activation_token=(
                            Token.objects.create_worker_activation()
                        ),
                        worker_type=WorkerType.EXTERNAL,
                        worker_pool=worker_pool,
                        registered_at=timezone.now(),
                    )
                    created += 1
                except IntegrityError:
                    pass
            index += 1

    def get_or_create_celery(self) -> "Worker":
        """Return a new Worker representing the Celery task queue."""
        try:
            return self.get(name="celery", worker_type=WorkerType.CELERY)
        except Worker.DoesNotExist:
            return self.create(
                name="celery",
                worker_type=WorkerType.CELERY,
                registered_at=timezone.now(),
            )

    def get_worker_by_token_key_or_none(
        self, token_key: str
    ) -> Optional["Worker"]:
        """Return a Worker identified by its associated secret token."""
        try:
            token_hash = hashlib.sha256(token_key.encode()).hexdigest()
            return self.get(token__hash=token_hash)
        except Worker.DoesNotExist:
            return None

    def get_worker_or_none(self, worker_name: str) -> Optional["Worker"]:
        """Return the worker with worker_name or None."""
        try:
            return self.get(name=worker_name)
        except Worker.DoesNotExist:
            return None


[docs]class Worker(models.Model): """Database model of a worker.""" name = models.SlugField( unique=True, help_text='Human readable name of the worker based on the FQDN', ) registered_at = models.DateTimeField() connected_at = models.DateTimeField(blank=True, null=True) instance_created_at = models.DateTimeField(blank=True, null=True) # This is the token used by the Worker to authenticate # Users have their own tokens - this is specific to a single worker. token = models.OneToOneField( Token, null=True, on_delete=models.PROTECT, related_name="worker" ) # This is used to grant workers the ability to bootstrap authentication # for themselves, in situations where their provisioning data may be # revealed to untrusted code later. The activation token will be # deleted once it has been used. activation_token = models.OneToOneField( Token, null=True, on_delete=models.SET_NULL, related_name="activating_worker", ) static_metadata = JSONField(default=dict, blank=True) dynamic_metadata = JSONField(default=dict, blank=True) dynamic_metadata_updated_at = models.DateTimeField(blank=True, null=True) worker_type = models.CharField( max_length=8, choices=WorkerType.choices, default=WorkerType.EXTERNAL, editable=False, ) # Only Celery workers currently support concurrency levels greater than # 1. concurrency = models.PositiveIntegerField( default=1, help_text="Number of tasks this worker can run simultaneously", ) worker_pool = models.ForeignKey( WorkerPool, blank=True, null=True, on_delete=models.PROTECT ) worker_pool_data = JSONField(null=True, blank=True) class Meta(TypedModelMeta): constraints = [ # Non-Celery workers must have a token. CheckConstraint( name="%(app_label)s_%(class)s_celery_or_token", check=Q(worker_type=WorkerType.CELERY) | Q(activation_token__isnull=False) | Q(token__isnull=False), ) ]
[docs] def mark_disconnected(self) -> None: """Update and save relevant Worker fields after disconnecting.""" self.connected_at = None self.save()
[docs] def running_work_requests(self) -> QuerySet["WorkRequest"]: """Return queryset of work requests running on this worker.""" return self.assigned_work_requests.filter( status=WorkRequest.Statuses.RUNNING ).order_by("id")
[docs] def mark_connected(self) -> None: """Update and save relevant Worker fields after connecting.""" self.connected_at = timezone.now() self.save()
[docs] def connected(self) -> bool: """Return True if the Worker is connected.""" return self.connected_at is not None
[docs] def is_busy(self) -> bool: """ Return True if the Worker is busy with work requests. A Worker is busy if it has as many running or pending work requests as its concurrency level. """ # Import here to prevent circular imports from debusine.db.models.work_requests import WorkRequest return ( WorkRequest.objects.running(worker=self) | WorkRequest.objects.pending(worker=self) ).count() >= self.concurrency
[docs] def metadata(self) -> dict[str, Any]: """ Return all metadata with static_metadata and dynamic_metadata merged. If the same key is in static_metadata and dynamic_metadata: static_metadata takes priority. """ return { **copy.deepcopy(self.dynamic_metadata), **copy.deepcopy(self.static_metadata), }
[docs] def set_dynamic_metadata(self, metadata: dict[str, Any]) -> None: """Save metadata and update dynamic_metadata_updated_at.""" self.dynamic_metadata = metadata self.dynamic_metadata_updated_at = timezone.now() self.save()
def __str__(self) -> str: """Return the id and name of the Worker.""" return f"Id: {self.id} Name: {self.name}" objects = WorkerManager()