Go client for the Disque server
The Redigo Redis client and the YouTube Vitess resource pooling implementation are the only runtime dependencies used by disque-go
. Dependencies are managed with Glide.
go get github.com/zencoder/disque-go/disque
Instantiate the pool as follows:
import (
hosts := []string{""} // array of 1 or more Disque servers
cycle := 1000 // check connection stats every 1000 Fetch's
capacity := 5 // initial capacity of the pool
maxCapacity := 10 // max capacity that the pool can be resized to
idleTimeout := 15 * time.Minute // timeout for idle connections
var p *disque.Pool
p = disque.NewPool(hosts, cycle, capacity, maxCapacity, idleTimeout)
Next, get a handle to a connection from the pool, specifying a context that controls how long to wait for a connection to be retrieved:
var d *disque.Disque
var err error
d, err = p.Get(context.Background()) // get a connection from the pool
defer p.Put(d) // return a connection to the pool
... (use the connection to interact with Disque)...
To shutdown the connection pool, such as when the application is exiting, invoke the Close
p.Close() // close the pool, waits for all connections to be returned
####Single Connection Begin by instantiating and initializing a Disque client connection:
import (
hosts := []string{""} // array of 1 or more Disque servers
cycle := 1000 // check connection stats every 1000 Fetch's
var d *disque.Disque
var err error
d = disque.NewDisque(hosts, cycle)
err = d.Initialize()
This will yield a Disque client instance d
that is configured to use the Disque server at and its cluster members, if any.
Close the Disque client connection when finished:
err = d.Close()
####Disque Operations
You can push a job to a Disque queue by invoking the Push
or PushWithOptions
// Push with default settings
queueName := "queue_name"
jobDetails := "job"
timeout := time.Second // take no long than 1 second to enqueue the message
var jobID string
jobID, err = d.Push(queueName, jobDetails, timeout)
// Push with custom options
options = make(map[string]string)
options["TTL"] = "60" // 60 second TTL on the job message
options["ASYNC"] = "true" // push the message asynchronously
jobID, err = d.PushWithOptions(queueName, jobDetails, timeout, options)
Find the length of a queue using the QueueLength
var queueLength int
queueLength, err = d.QueueLength(queueName)
Fetch a single job using the Fetch
var job *disque.Job
job, err = d.Fetch(queueName, timeout) // retrieve a single job, taking no longer than timeout (1 second) to return
Fetch multiple jobs using the FetchMultiple
count := 5
var jobs []*disque.Job
jobs, err = d.FetchMultiple(queueName, count, timeout) // retrieve up to 5 Jobs, taking no longer than timeout (1 second) to return
Retrieve details for an enqueued job before it has been acknowledged:
var jobDetails *disque.JobDetails
jobDetails, err = d.GetJobDetails(jobID)
Enqueued messages can be deleted using their Job-Id:
err = d.Delete(jobID)
Acknowledge receipt and processing of a message by invoking the Ack
err = d.Ack(job.JobID)
That's it (for now)!
is available under the Apache License, Version 2.0.