Django'da Celery ve Redis kullanarak zamanlanmış görevler oluşturmak

Merhabalar. Başlıkta da belirtildiği üzere amacımız, Django uygulamamız için belirlediğimiz zamanda çalışması için görevler oluşturmak. Şunu da açıklayalım ki zamanlanmış görevlerden kastımız synchronous görevler değil!

Senaryomuzu şu şekilde kurgulayalım: Bir blog sayfamız var. Ve kullanıcılar bloglarını hemen değil de belirledikleri tarihlerde yayınlamak istiyor. Kullanıcı bloğu yazacak ve yayınlanacağı tarihi seçecek. Gayet basit değil mi? Evet gayet basit, ancak konsepti anladıktan sonra bu yapıyı bir çok farklı şekilde kullanabiliriz ve kullanmak durumunda kalacağız.

Celery kullanırken bir message broker'a ihtiyacımız olacak. İsterseniz farklı brokerlar kullanabilirsiniz, ben Redis tercih ettim. Bu yazıda sıfırdan proje oluşturmaktan ziyade olaya daha fazla odaklanmak istedim. Bu yüzden daha önceden Django, Redis, Celery ve diğer gereksinimleri kurduğunuzu ve Django için gerekli konfigürasyonları yaptığınızı varsayıyorum. Her ihtimale karşı buradan veya buradan gerekli adımlara ulaşabileceğiniz linkleri bırakayım. Bunlar haricinde de bir çok yerli yabancı kaynakta gerekli adımları bulabilirsiniz. (Eğer sıfırdan bir proje ile gerekli paketlerin kurulumlarıyla açıklanmasına yönelik bir talep gelirse ileride bu yazıyı güncellerim.)

Başlıca pip gereksinimlerimiz şu şekilde;

  • celery==5.2.3
  • Django==4.0.1
  • flower==1.0.0
  • redis==4.1.1

Sistemimizde ise redis versiyonu şu şekilde;

  • redis_version:5.0.14

Haydi başlayalım!

Modelimizi oluşturalım;

# blogapp/models.py
from django.db import models
from django.contrib.auth.models import User

class Post(models.Model):
    author = models.ForeignKey(
        User,
        on_delete=models.CASCADE,
        related_name='posts'
    )
    title = models.CharField(max_length=255)
    content = models.TextField()
    expiration_time = models.DateTimeField()
    active = models.BooleanField(default=False)

    def __str__(self):
        return self.title

Modelimiz pek işlevsel görünmüyor olabilir, evet gerçekten de öyle aslında. Ama bize şimdilik yeterli. Sırada oluşturduğumuz postu celery ile kuyruğa eklemek var. Ama öncesinde oluşturduğumuz postun id'si ile expiration_time'i almamız lazım ki bu id ile tekrardan postumuzu bulup active fieldini True yapacağız. (active fieldleri sadece True olanları listelediğimizi düşünelim)

Bu aşamada işleri basit tutmak için django signals kullanarak, oluşturulan Post'u celery taskına ekleyeceğim. Bunu form içinde veya views içinde de yapabilirsiniz. Tamamen sizin iş akış mantığınıza kalmış.

Signals;

# blogapp/signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver

from .models import Post
from .tasks import expiration_task

@receiver(post_save, sender=Post)
def check_post(sender, instance, **kwargs):
    expiration_task.apply_async(
        [instance.id],
        eta=instance.expiration_time,
        kwargs={"post-id": instance.id, "post-author": f'{instance.author}'}
    )

Burada receiver decoratörü ile Post modelimiz kayıt edildikten sonra (post_save) aksiyon almasını belirtiyoruz. Burada birazdan oluşturacağımız expiration_task celery fonksiyonumuza verileri gönderiyoruz. instance.id post'umuzun id'si, eta ise task'in çalışacağı zaman, kwargs ise task'ımızı izlerken hangi postun işlendiğini ve author'u görebilmek için ekstradan eklediğimiz arguman, zorunlu değil. Burada dikkat edilmesi gereken celery eta ile birlikte task'in verilen tarihten sonra çalışacağını garanti eder. Yani kesin olarak tam o zamanda çalışacağını garanti etmez. Buna etki eden bir çok parametre olabilir.

Django signallerin çalışabilmesi için apps.py'da belirtilmesi gerekiyor. Şu şekilde apps.py'ı güncelleyelim;

Apps.py

# blogapp/apps.py
from django.apps import AppConfig

class BlogappConfig(AppConfig):
    default_auto_field = 'django.db.models.BigAutoField'
    name = 'blogapp'

    def ready(self):
        from .signals import check_post

Şimdi celery fonksiyonumuza geçelim.

Tasks;

# blogapp/tasks.py
from celery import shared_task

from .models import Post

@shared_task
def expiration_task(post_id, **kwargs):
    Post.objects.filter(id=post_id).update(active=True)

Gördüğünüz gibi signals.py'dan gönderdiğimiz post_id ile post'umuzu filtreledik ve active fieldini True olarak güncelledik.

Son olarak sırada kodlarımızın çalışıp çalışmadığını kontrol etmek var. Öncelikle Django shelli açıp yeni bir kullanıcı oluşturup bir post ekliyoruz.

User.objects.create(username="peter", email="peter@pipper.com", password="peterpipper")

user = User.objects.get(username="peter")

Post.objects.create(
    author=user,
    title="peter's blog",
    content="blog content",
    expiration_time=timezone.now() + timezone.timedelta(seconds=10)
)

Sonucu hemen görebilmek için on saniye sonrasına expiration_time verdim. Objemizi yarattığımız anda taskımız flower celery monitöre düştü;

queue.png

Çalışma zamanı geldiğinde de flower succeeded sekmesi altında şu şekilde görünüyor;

succeed.png

Böylelikle amacımıza başarılı bir şekilde ulaşmış olduk. Aferin bize!

Şunu söylemek gerek ki gerçek bir senaryoda temel olarak izlenecek yol bu şekilde olsa da daha fazla kontrol mekanizması gerekmekte. Biz burada her şeyin olması gerektiği gibi gittiğini varsaydık. Ancak bir exception karşısında nasıl bir aksiyon alınması gerekir ve tasklerin loglanması gibi konuları göz ardı ettik. Amacımız da bu değildi zaten.

Celery hakkında daha fazla araştırma yapmak isterseniz (ki Celery ile çalışacaksanız mutlaka detaylı bir araştırma yapmak gerekir, sadece bir kütüphane deyip geçmemek gerek) buradan resmi sayfasına ulaşabilirsiniz.

Bu da benim ilk blog postum olsun, sürçülisan ettiysem affediniz, gerekli düzenlemeleri hemen yaparım. Hoşçakalın!