get_task_status()
Get current task status:Copy
require 'legnext_sdk'
# Configure API client
Legnext.configure do |config|
config.api_key['x-api-key'] = ENV['LEGNEXT_API_KEY']
config.host = 'api.legnext.ai'
config.base_path = '/api'
end
task_api = Legnext::TaskManagementApi.new
begin
job_id = 'job-123'
task = task_api.get_task_status(job_id)
puts "Status: #{task.status}"
puts "Progress: #{task.progress}%"
if task.status == 'completed'
puts "Result: #{task.output}"
end
rescue Legnext::ApiError => e
puts "Error: #{e.message}"
end
Polling for Completion
Wait for task completion with polling:Copy
def wait_for_completion(task_api, job_id, timeout_seconds = 300, poll_interval_seconds = 3)
start_time = Time.now
timeout = timeout_seconds
loop do
task = task_api.get_task_status(job_id)
puts "Current status: #{task.status}"
if task.status == 'completed'
puts "Task completed successfully!"
puts "Output: #{task.output}"
return task
elsif task.status == 'failed'
raise "Task failed: #{task.error}"
end
# Check timeout
elapsed = Time.now - start_time
if elapsed > timeout
raise "Task timeout after #{timeout_seconds} seconds"
end
# Wait before next poll
sleep poll_interval_seconds
end
end
Copy
task_api = Legnext::TaskManagementApi.new
job_id = 'job-123'
begin
result = wait_for_completion(task_api, job_id, 300, 3)
puts "Final result: #{result}"
rescue StandardError => e
puts "Error: #{e.message}"
end
Complete Example with Error Handling
Copy
require 'legnext_sdk'
# Configure API client
Legnext.configure do |config|
config.api_key['x-api-key'] = ENV['LEGNEXT_API_KEY']
config.host = 'api.legnext.ai'
config.base_path = '/api'
end
def wait_for_completion(task_api, job_id, timeout_seconds = 300, poll_interval_seconds = 3)
start_time = Time.now
timeout = timeout_seconds
loop do
task = task_api.get_task_status(job_id)
if task.status == 'completed'
return task
elsif task.status == 'failed'
raise "Task failed: #{task.error}"
end
elapsed = Time.now - start_time
if elapsed > timeout
raise "Task timeout after #{timeout_seconds} seconds"
end
sleep poll_interval_seconds
end
end
def generate_image_with_polling
image_api = Legnext::ImageGenerationApi.new
task_api = Legnext::TaskManagementApi.new
# Step 1: Start generation
request = Legnext::DiffusionRequest.new(
text: 'a beautiful sunset over mountains'
)
response = image_api.generate_image(request)
job_id = response.job_id
puts "Job started: #{job_id}"
# Step 2: Poll for completion
result = wait_for_completion(task_api, job_id, 600, 3)
# Step 3: Get final result
if result.output && result.output.image_urls
puts "Image URLs: #{result.output.image_urls}"
end
rescue Legnext::ApiError => e
handle_api_error(e)
rescue StandardError => e
puts "Error: #{e.message}"
end
def handle_api_error(error)
case error.code
when 400
puts "Validation error: #{error.response_body}"
when 401
puts "Authentication error: Invalid API key"
when 404
puts "Resource not found: #{error.response_body}"
when 429
puts "Rate limit exceeded. Please retry later."
when 500, 502, 503
puts "Server error: #{error.response_body}"
else
puts "API error (#{error.code}): #{error.response_body}"
end
end
generate_image_with_polling
Error Handling
HTTP Error Handling:Copy
begin
task = task_api.get_task_status(job_id)
rescue Legnext::ApiError => e
case e.code
when 400
puts "Validation error: Check request parameters"
when 401
puts "Authentication error: Invalid API key"
when 404
puts "Resource not found: Check job ID exists"
when 429
puts "Rate limit exceeded. Please retry later."
when 500, 502, 503
puts "Server error. Please retry with backoff."
else
puts "API error (#{e.code}): #{e.message}"
end
puts "Response body: #{e.response_body}"
end
| Status Code | Description | Action |
|---|---|---|
| 400 | Invalid request parameters | Check request body format |
| 401 | Invalid API key | Verify API key is correct |
| 404 | Resource not found | Check job ID exists |
| 429 | Rate limit exceeded | Wait and retry with backoff |
| 500/502/503 | Server error | Retry with exponential backoff |
Copy
def call_with_retry(image_api, request, max_retries = 3)
retries = 0
begin
image_api.generate_image(request)
rescue Legnext::ApiError => e
if e.code == 429 && retries < max_retries
# Rate limited - wait and retry
retries += 1
sleep 5
retry
elsif e.code >= 500 && retries < max_retries
# Server error - exponential backoff
retries += 1
delay = 2**retries
sleep delay
retry if retries < max_retries
raise
else
# Other errors - don't retry
raise
end
end
end
# Usage
image_api = Legnext::ImageGenerationApi.new
request = Legnext::DiffusionRequest.new(text: 'a beautiful landscape')
begin
response = call_with_retry(image_api, request, 3)
puts "Job ID: #{response.job_id}"
rescue Legnext::ApiError => e
puts "Failed after retries: #{e.message}"
end
Concurrent Operations with Threads
For processing multiple tasks concurrently:Copy
require 'thread'
def generate_multiple_images
Legnext.configure do |config|
config.api_key['x-api-key'] = ENV['LEGNEXT_API_KEY']
config.host = 'api.legnext.ai'
config.base_path = '/api'
end
image_api = Legnext::ImageGenerationApi.new
task_api = Legnext::TaskManagementApi.new
prompts = ['sunset', 'mountains', 'ocean']
threads = []
results = Queue.new
prompts.each do |prompt|
threads << Thread.new do
begin
# Start generation
request = Legnext::DiffusionRequest.new(text: prompt)
response = image_api.generate_image(request)
job_id = response.job_id
# Wait for completion
result = wait_for_completion(task_api, job_id, 600, 3)
if result.output && result.output.image_urls && !result.output.image_urls.empty?
results << { prompt: prompt, url: result.output.image_urls[0] }
end
rescue StandardError => e
results << { prompt: prompt, error: e.message }
end
end
end
# Wait for all threads
threads.each(&:join)
# Collect results
all_results = []
all_results << results.pop until results.empty?
all_results.each do |result|
if result[:error]
puts "Error for '#{result[:prompt]}': #{result[:error]}"
else
puts "Image '#{result[:prompt]}': #{result[:url]}"
end
end
end
generate_multiple_images
Progress Callbacks
Copy
def wait_for_completion_with_progress(task_api, job_id, timeout_seconds = 300, poll_interval_seconds = 3, &block)
start_time = Time.now
timeout = timeout_seconds
loop do
task = task_api.get_task_status(job_id)
# Call progress callback
if block_given?
progress = task.progress || 0
status = task.status || 'unknown'
yield(progress, status)
end
if task.status == 'completed'
return task
elsif task.status == 'failed'
raise "Task failed: #{task.error}"
end
elapsed = Time.now - start_time
if elapsed > timeout
raise "Task timeout after #{timeout_seconds} seconds"
end
sleep poll_interval_seconds
end
end
# Usage
task_api = Legnext::TaskManagementApi.new
job_id = 'job-123'
result = wait_for_completion_with_progress(task_api, job_id, 600, 3) do |progress, status|
puts "Progress: #{progress}% - Status: #{status}"
end
Using Fibers for Cooperative Multitasking
Copy
require 'fiber'
def create_image_fiber(image_api, task_api, prompt)
Fiber.new do
# Start generation
request = Legnext::DiffusionRequest.new(text: prompt)
response = image_api.generate_image(request)
job_id = response.job_id
Fiber.yield job_id
# Poll for completion
loop do
task = task_api.get_task_status(job_id)
if task.status == 'completed'
Fiber.yield task.output
break
elsif task.status == 'failed'
raise "Task failed: #{task.error}"
end
Fiber.yield :pending
sleep 3
end
end
end
# Usage
Legnext.configure do |config|
config.api_key['x-api-key'] = ENV['LEGNEXT_API_KEY']
config.host = 'api.legnext.ai'
config.base_path = '/api'
end
image_api = Legnext::ImageGenerationApi.new
task_api = Legnext::TaskManagementApi.new
fiber = create_image_fiber(image_api, task_api, 'a beautiful landscape')
job_id = fiber.resume
puts "Job started: #{job_id}"
loop do
result = fiber.resume
break unless fiber.alive?
if result == :pending
puts "Still processing..."
elsif result
puts "Result: #{result}"
end
end