← back

Using postgres as task queue for asynchronous workers architecture

Assume a condition where you have one function which is going to produce task and you have multiple asynchronous workers consuming tasks and working on them.

Concept architecture diagram

Its essentially like Single Producer Multiple Consumer system. How you gonna implement a queue to solve the problem.

Do you gonna use an in-memory queue, like what we get in programming languages.

Watch out, the application is going to be deployed in a replicated mode with containers, so we can't really have a state in out application cause we don't have guarantee if the task is going to finish before the container dies.

And if we use an in-memory data structure (queue) to store then we miss the essence of replicated deployment because only one instance is carrying out the computations.

So, we are not going to use an in-memory queue.

We need something on disk, like a database. I know two ways of implementing these workers,

Having separate application for producer and workers.

Concept architecture diagram 2

This architecture allows us to use a Message Queue like RabbitMQ. Message queue is more powerful when it comes to delivering message and data reliably.

RabbitMQ will send the message/data to our worker so that the worker does not need to pool the message form the queue.

But what if the producer and worker are in same application.

Use it in a single application along with the producers, both the producers and workers will work asynchronously.

Concept architecture diagram 3

In this architecture the producer and the worker are in the same application, that is why using message queue is not good idea.

So, finally we are going to use Postgres for implementing the queue. In this the worker need to pool for the data from the database because postgres will not send them to us just like what RabbitMQ does.

The Problem.

We are going to create an application which will going to echo some text for some interval (time), the text and the time is given by the user.

So lets go..

I am going to use GoLang for the example, though using PostgreSQL as a task queue will work with any language.

Lets create a brand new project

You can get the final source code at k9exp/postgres-task-queue

go mod init postgres-task-queue
touch main.go # create main.go file

Add this code in your main.go, this will create a simple http server which will listen on PORT 4545. The endpoint /producer is to trigger the producer.

main.go
package main
 
import (
	"log"
	"net/http"
)
 
type RequestPayload struct {
	Text string `json:"text"`
	Time uint32 `json:"time"`
}
 
// POST /producer
func producer(w http.ResponseWriter, r *http.Request) {
	// retrieve text and time
	// insert the text and time in the postgres table
	// return request
}
 
func worker(err chan error) {
	// pool the first element the queue
	// do what it required to do
	// repeat
}
 
func app(err chan error) {
	http.HandleFunc("/producer", producer)
 
	POST := "4545"
	log.Printf("Serving on http://localhost:%s\n", PORT)
	err <- http.ListenAndServe(":"+PORT, nil)
}
 
func main() {
	err := make(chan error, 1)
	go app(err)
	go worker(err)
 
	e := <-err
	log.Printf("Got error: %v\n", e.Error())
}

Lets focus on producer. We need to get the text and time from the request body and insert the two in the task queue, that's all producer has to do.

Go ahead and get the text and the time from the request body (which is JSON) using encoding/json from go-standard library.

main.go
[...]
 
import (
	"encoding/json" // new import
	"log"
	"net/http"
)
 
type RequestPayload struct {
	Text string `json:"text"`
	Time uint32 `json:"time"`
}
 
// POST /producer
func producer(w http.ResponseWriter, r *http.Request) {
	var requestPayload RequestPayload
 
	err := json.NewDecoder(r.Body).Decode(&requestPayload)
	if err != nil {
		w.WriteHeader(http.StatusBadRequest)
		return
	}
 
	text := requestPayload.Text
	time := requestPayload.Time
 
	// insert the text and time in the postgres table
	// return request
}
[...]

Now we have to inset the data in the database, but we haven't talked about the database yet. So let start.

Create a new module called data in the project to handle all the database related stuff.

mkdir data # create data directory
touch data/queue.go # create queue.go file in data directory

The queue.go will look something like this

data/queue.go
package data
 
func SetupQueue() {
	// setup database
	// create queue table
}
 
func InsertTask() {
	// insert task in the queue
}

To use PostgreSQL in go, we need a postgres driver to use it with go standard libs database/sql, so we are going to use github.com/lib/pq.

Download it

go get github.com/lib/pq

After it downloads, let write a sql query to create queue table on the database.

data/queue.go
package data
 
// new imports
import (
	"database/sql"
	"os"
 
	_ "github.com/lib/pq"
)
 
var DB *sql.DB // new global variable
 
func SetupQueue() error {
	connString := os.Getenv("DATABASE_URL")
 
	db, err := sql.Open("postgres", connString)
	if err != nil {
		return err
	}
 
	DB = db // Set the global variable
 
	// CREATE QUEUE TABLE
	_, err = DB.Exec(
		`CREATE TABLE IF NOT EXISTS queue (
			task_id SERIAL PRIMARY KEY,
			text TEXT NOT NULL,
			time INT NOT NULL
		);
	`)
 
	if err != nil {
		return err
	}
 
	log.Println("Table \"queue\" is created in the database")
 
	return nil
}

To connect with postgres we need a running postgres server, you can run it with docker or use a manged postgres service like Neon. In any case you will have a Database Connection String which looks like this postgres://username:password@host:port/database

We are specifying this DATABASE_URL trough Environment Variables.

Let use this SetupQueue in the main.go

main.go
package main
 
import (
	"encoding/json"
	"log"
	"net/http"
 
	"k9exp/postgres-task-queue/data" // new import
)
 
[...]
 
func main() {
	db_err := data.SetupQueue()
	if db_err != nil {
		log.Fatal(db_err)
	}
 
	err := make(chan error, 1)
	go app(err)
	go worker(err)
 
	e := <-err
	log.Printf("Got error: %v\n", e.Error())
}
 

When the app runs the table will be created. Lets run the app

I am using Neon for managed postgres database, its free.

export DATABASE_URL=postgres://kunalsin9h:[email protected]/queue
 
go run main.go

The output will be

$ go run main.go
2023/11/07 15:26:53 Table "queue" is created in the database
2023/11/07 15:26:53 Serving on http://localhost:4545

The database must have got a new table called queue.

Nice, we are making progress...

Now back to producer, we have extracted the text and time from the request payload, now we need to insert these things in the database, for that we need to write an insert sql query.

So lets complete InsertTask in data/queue.go, it is easy we just need to insert tow items in table.

data/queue.go
[...]
 
func InsertTask(text string, time uint32) error {
	_, err := DB.Exec(`
		INSERT INTO queue (text, time)
		VALUES ($1, $2)
	`, text, time)
 
	return err
}

Now lets use this new function to insert the text and time extracted in producer handler

main.go
package main
 
import (
	"encoding/json"
	"log"
	"net/http"
 
	"k9exp/postgres-task-queue/data" // new import
)
 
// POST /producer
func producer(w http.ResponseWriter, r *http.Request) {
	var requestPayload RequestPayload
 
	err := json.NewDecoder(r.Body).Decode(&requestPayload)
	if err != nil {
		w.WriteHeader(http.StatusBadRequest)
		return
	}
 
	text := requestPayload.Text
	time := requestPayload.Time
 
	// new
	// inserting the data into database
	err = data.InsertTask(text, time)
	if err != nil {
		w.WriteHeader(http.StatusInternalServerError)
		return
	}
 
	w.Write([]byte("Task added in the queue\n"))
	w.WriteHeader(http.StatusAccepted)
 
	return
}
 
[...]

At this point the /producer endpoint will work and add the text and time data into the queue table.

To check it, run the app...

export DATABASE_URL=....
 
go run main.go

To add a new entry in database table, do

$ curl -X POST -d '{"text": "new_text", "time": 2}' http://localhost:4545/producer
 
Task added in the queue

Awesome, get the producer working!

Now lets focus on worker, as of now the worker function must be like this

main.go
[...]
 
func worker(err chan error) {
	// pool the first element the queue
	// do what it required to do
	// repeat
}
 
[...]

As we have already discuses about the worker to pool the data from the queue. Pool here means to fetch data on regular interval from the data-source.

So to get the first element of the queue, lets write a GetTask function in the data/queue.go

data/queue.go
[...]
 
type TaskData struct {
	task_id uint32
	text    string
	time    uint32
}
 
func GetTask() (*TaskData, error) {
	// todo
	return nil, nil
}

We need to get the first element from the queue, and delete it also.

data/queue.go
import (
	"context" // new import
	"database/sql"
	"errors"
	"log"
	"os"
 
	_ "github.com/lib/pq"
)
 
[...]
 
type TaskData struct {
	Task_id uint32
	Text    string
	Time    uint32
}
 
func GetTask() (*TaskData, error) {
	var data TaskData
 
	trx, err := DB.BeginTx(context.Background(), nil)
	if err != nil {
		return nil, err
	}
    defer trx.Rollback()
 
	err = trx.QueryRow(`SELECT task_id, text, time FROM queue FOR UPDATE SKIP LOCKED LIMIT 1;`).Scan(&data.Task_id, &data.Text, &data.Time)
	if errors.Is(err, sql.ErrNoRows) {
		return nil, nil
	} else if err != nil {
		return nil, err
	}
 
	_, err = trx.Exec("DELETE FROM queue WHERE task_id = $1;", data.Task_id)
	if err != nil {
		return nil, err
	}
 
	if err := trx.Commit(); err != nil {
		return nil, err
	}
 
	return &data, nil
}

Here, DB.BeginTx() will create a transaction and trx.Commit() will commit the transaction.

The Most Important part of this blog is here. How we will choose the first element? isn't SELECT statement will always start from the top?

What we are going to use is the Database Locks. This query contains FOR UPDATE and SKIP LOCKED clauses.

FOR UPDATE: This clause is used in a transaction to indicate that the rows selected by the query are to be locked exclusively by the current transaction. This prevents other transactions from modifying or locking the same rows until the current transaction is committed or rolled back.

SKIP LOCKED: This is an extension to the "FOR UPDATE" clause. When used, it allows the SELECT statement to skip over any rows that are already locked by another transaction. This is useful in scenarios where you want to avoid waiting for locked rows and instead proceed with the next available unlocked row.

This way we are always going to get the row which is not select by any worker. After we get the data from the row we will delete the row.

Let use this function in worker function.

main.go
[...]
 
func worker(err chan error) {
	for {
		data, e := data.GetTask()
		if e != nil {
			err <- e
		}
		if data == nil {
			fmt.Println("Queue is empty")
			time.Sleep(10 * time.Second)
			continue
		}
 
		printText(data)
 
		time.Sleep(1 * time.Second)
	}
}
 
func printText(data *data.TaskData) {
	for i := 0; i < int(data.Time); i++ {
		fmt.Printf("\t%d  > %s [%d/%d]\n", data.Task_id, data.Text, i+1, data.Time)
		time.Sleep(200 * time.Millisecond)
	}
}
 
[...]

We are going to loop infinitely with 1 second pause, when the queue is empty, then we are going to pause for 10 seconds.

When we get data then we use print it using printText function. We are also going to sleep for 200 Millisecond (1/2 second) when we print text for time intervals.

With this, we have finish the application.

What should it do?

It will get the top most element from the queue (in our case from the postgres table), then delete the entry (i.e consume it) and print the text for time interval.

We can use POST /producer endpoint to add entry in the queue.

Let run it again, Here is a video of me running it.

The real magic is when we have more then 1 workers.

So lets refactor the code to show the worker id in the output. We will use a worker_id to print output by that worker, for this we need to make change in the few place.

main.go
[...]
 
func worker(err chan error, worker_id uint16) { // working id
	for {
		data, e := data.GetTask()
		if e != nil {
			err <- e
		}
		if data == nil {
			fmt.Println("Queue is empty")
			time.Sleep(10 * time.Second)
			continue
		}
 
		printText(data, worker_id) // passing here
 
		time.Sleep(1 * time.Second)
	}
}
 
func printText(data *data.TaskData, worker_id uint16) {
	for i := 0; i < int(data.Time); i++ {
		// using worker_id to print output from the worker
		fmt.Printf("\ttask: %d, by worker: %d> %s [%d/%d]\n", data.Task_id, worker_id, data.Text, i+1, data.Time)
		time.Sleep(200 * time.Millisecond)
	}
}
 
func app(err chan error) {
	http.HandleFunc("/producer", producer)
 
	PORT := "4545"
	log.Printf("Serving on http://localhost:%s\n", PORT)
	err <- http.ListenAndServe(":"+PORT, nil)
}
 
func main() {
	y := data.SetupQueue()
	if y != nil {
		log.Fatal(y)
	}
 
	err := make(chan error, 1)
 
	go app(err)
	go worker(err, 1) // here giving worker_id
 
	e := <-err
	log.Printf("Got error: %v\n", e.Error())
}

Now we can spawn more worker, by just calling worker(err, 1) multiple times like

main.go
[...]
 
func main() {
	y := data.SetupQueue()
	if y != nil {
		log.Fatal(y)
	}
 
	err := make(chan error, 1)
 
	go app(err)
	go worker(err, 1)
	go worker(err, 2)
 
	e := <-err
	log.Printf("Got error: %v\n", e.Error())
}

Lets see the output

So you can see both worker are working together to solve the problem.

we can even spawn 100s of workers lets do that.

main.go
[...]
 
func main() {
	y := data.SetupQueue()
	if y != nil {
		log.Fatal(y)
	}
 
	err := make(chan error, 1)
 
	go app(err)
 
	for i := 0; i < 100; i++ {
		go worker(err, uint16(i+1))
	}
 
	e := <-err
	log.Printf("Got error: %v\n", e.Error())
}

Lets see the output, it going to be wild, i am going to insert lot more rows.

Wow, how cool. And this is just one instance of application, imagine the app is deployed using kubernetes with 10 replicas with 100 worker each...

So lets wrap what we have done.

  1. We used PostgreSQL for our task queue.
    1. Because we need something permanent.
    2. The worker and the producer are in same application, that's why we have not used message queues like RabbitMQ.
  2. We are adding task in the queue using a REST API
  3. The tasks are being completed by the async workers

I hope you enjoyed the blog, if you like the content then don't forgot to subscribe the blog for getting notifications about future blogs.

See you soon :)