plainweb includes a simple, persistent task queue backed by SQLite. The web server and task workers run in a single Node.js process, allowing concurrent tasks to write to the database without locking issues.
Note: If you need to run parallel tasks, you'll need to manage process spawning yourself. Be cautious not to write to the database from parallel processes to avoid conflicts.
Add a tasks
table to your database schema:
// app/config/schema.ts
import { text, integer, sqliteTable, int } from "drizzle-orm/sqlite-core";
export const tasks = sqliteTable("tasks", {
id: text("id").primaryKey(),
name: text("name").notNull(),
data: text("data", { mode: "json" }),
created: int("created").notNull(),
failedLast: int("failed_last"),
failedNr: int("failed_nr"),
failedError: text("failed_error"),
});
export type Task = typeof tasks.$inferSelect;
To start the task worker when running pnpm start
and pnpm dev
, call getWorker()
to get an instance of you worker to start in your app/cli/serve.ts
file:
// app/cli/serve.ts
import { getWorker, log } from "plainweb";
import config from "plainweb.config";
async function serve() {
const worker = getWorker(config);
await worker.start(); // work all tasks in tasks directory
log.info("⚡️ background task worker started");
// rest of your code, starting http server, etc.
}
serve();
Tasks are defined in files within the app/tasks
directory. Any file with a default export is automatically discovered as a task.
Here's an example of a task definition:
// app/tasks/double-opt-in.ts
import { eq } from "drizzle-orm";
import { defineDatabaseTask } from "plainweb";
import { database } from "app/config/database";
import { Contact, contacts } from "app/config/schema";
import { sendDoubleOptInEmail } from "app/services/contacts";
export default defineDatabaseTask<Contact>(database, {
name: __filename,
batchSize: 5,
async process({ data }) {
await sendDoubleOptInEmail(database, data);
},
async success({ data }) {
await database
.update(contacts)
.set({ doubleOptInSent: Date.now() })
.where(eq(contacts.email, data.email));
},
});
In this example:
name
is the name of the task type, this has to be unique. Use __filename
to get the filename of the current file.batchSize: 5
means that 5 tasks are pulled from the database at a time for concurrent processing.process
defines the main task logic.success
defines actions to take after successful task completion.To enqueue a task, use the perform
function:
// app/services/contacts.ts
import type { Database } from "app/config/database";
import doubleOptIn from "app/tasks/double-opt-in";
import { perform } from "plainweb";
export async function createContact(database: Database, email: string) {
const contact = // create contact
if (contact) {
await perform(doubleOptIn, contact); // enqueue task
}
}
Important: Always await
the perform
function. This ensures the task is successfully enqueued before proceeding. It doesn't mean that the task has been processed yet!