Skip to main content

get_task_status()

Get current task status:
require 'legnext_sdk'

# Configure API client
Legnext.configure do |config|
  config.api_key['x-api-key'] = ENV['LEGNEXT_API_KEY']
  config.host = 'api.legnext.ai'
  config.base_path = '/api'
end

task_api = Legnext::TaskManagementApi.new

begin
  job_id = 'job-123'
  task = task_api.get_task_status(job_id)

  puts "Status: #{task.status}"
  puts "Progress: #{task.progress}%"

  if task.status == 'completed'
    puts "Result: #{task.output}"
  end

rescue Legnext::ApiError => e
  puts "Error: #{e.message}"
end

Polling for Completion

Wait for task completion with polling:
def wait_for_completion(task_api, job_id, timeout_seconds = 300, poll_interval_seconds = 3)
  start_time = Time.now
  timeout = timeout_seconds

  loop do
    task = task_api.get_task_status(job_id)

    puts "Current status: #{task.status}"

    if task.status == 'completed'
      puts "Task completed successfully!"
      puts "Output: #{task.output}"
      return task
    elsif task.status == 'failed'
      raise "Task failed: #{task.error}"
    end

    # Check timeout
    elapsed = Time.now - start_time
    if elapsed > timeout
      raise "Task timeout after #{timeout_seconds} seconds"
    end

    # Wait before next poll
    sleep poll_interval_seconds
  end
end
Usage:
task_api = Legnext::TaskManagementApi.new
job_id = 'job-123'

begin
  result = wait_for_completion(task_api, job_id, 300, 3)
  puts "Final result: #{result}"
rescue StandardError => e
  puts "Error: #{e.message}"
end

Complete Example with Error Handling

require 'legnext_sdk'

# Configure API client
Legnext.configure do |config|
  config.api_key['x-api-key'] = ENV['LEGNEXT_API_KEY']
  config.host = 'api.legnext.ai'
  config.base_path = '/api'
end

def wait_for_completion(task_api, job_id, timeout_seconds = 300, poll_interval_seconds = 3)
  start_time = Time.now
  timeout = timeout_seconds

  loop do
    task = task_api.get_task_status(job_id)

    if task.status == 'completed'
      return task
    elsif task.status == 'failed'
      raise "Task failed: #{task.error}"
    end

    elapsed = Time.now - start_time
    if elapsed > timeout
      raise "Task timeout after #{timeout_seconds} seconds"
    end

    sleep poll_interval_seconds
  end
end

def generate_image_with_polling
  image_api = Legnext::ImageGenerationApi.new
  task_api = Legnext::TaskManagementApi.new

  # Step 1: Start generation
  request = Legnext::DiffusionRequest.new(
    text: 'a beautiful sunset over mountains'
  )

  response = image_api.generate_image(request)
  job_id = response.job_id

  puts "Job started: #{job_id}"

  # Step 2: Poll for completion
  result = wait_for_completion(task_api, job_id, 600, 3)

  # Step 3: Get final result
  if result.output && result.output.image_urls
    puts "Image URLs: #{result.output.image_urls}"
  end

rescue Legnext::ApiError => e
  handle_api_error(e)
rescue StandardError => e
  puts "Error: #{e.message}"
end

def handle_api_error(error)
  case error.code
  when 400
    puts "Validation error: #{error.response_body}"
  when 401
    puts "Authentication error: Invalid API key"
  when 404
    puts "Resource not found: #{error.response_body}"
  when 429
    puts "Rate limit exceeded. Please retry later."
  when 500, 502, 503
    puts "Server error: #{error.response_body}"
  else
    puts "API error (#{error.code}): #{error.response_body}"
  end
end

generate_image_with_polling

Error Handling

HTTP Error Handling:
begin
  task = task_api.get_task_status(job_id)

rescue Legnext::ApiError => e
  case e.code
  when 400
    puts "Validation error: Check request parameters"
  when 401
    puts "Authentication error: Invalid API key"
  when 404
    puts "Resource not found: Check job ID exists"
  when 429
    puts "Rate limit exceeded. Please retry later."
  when 500, 502, 503
    puts "Server error. Please retry with backoff."
  else
    puts "API error (#{e.code}): #{e.message}"
  end
  puts "Response body: #{e.response_body}"
end
Common HTTP Status Codes:
Status CodeDescriptionAction
400Invalid request parametersCheck request body format
401Invalid API keyVerify API key is correct
404Resource not foundCheck job ID exists
429Rate limit exceededWait and retry with backoff
500/502/503Server errorRetry with exponential backoff
Retry Logic with Exponential Backoff:
def call_with_retry(image_api, request, max_retries = 3)
  retries = 0

  begin
    image_api.generate_image(request)

  rescue Legnext::ApiError => e
    if e.code == 429 && retries < max_retries
      # Rate limited - wait and retry
      retries += 1
      sleep 5
      retry
    elsif e.code >= 500 && retries < max_retries
      # Server error - exponential backoff
      retries += 1
      delay = 2**retries
      sleep delay
      retry if retries < max_retries
      raise
    else
      # Other errors - don't retry
      raise
    end
  end
end

# Usage
image_api = Legnext::ImageGenerationApi.new
request = Legnext::DiffusionRequest.new(text: 'a beautiful landscape')

begin
  response = call_with_retry(image_api, request, 3)
  puts "Job ID: #{response.job_id}"
rescue Legnext::ApiError => e
  puts "Failed after retries: #{e.message}"
end

Concurrent Operations with Threads

For processing multiple tasks concurrently:
require 'thread'

def generate_multiple_images
  Legnext.configure do |config|
    config.api_key['x-api-key'] = ENV['LEGNEXT_API_KEY']
    config.host = 'api.legnext.ai'
    config.base_path = '/api'
  end

  image_api = Legnext::ImageGenerationApi.new
  task_api = Legnext::TaskManagementApi.new

  prompts = ['sunset', 'mountains', 'ocean']
  threads = []
  results = Queue.new

  prompts.each do |prompt|
    threads << Thread.new do
      begin
        # Start generation
        request = Legnext::DiffusionRequest.new(text: prompt)
        response = image_api.generate_image(request)
        job_id = response.job_id

        # Wait for completion
        result = wait_for_completion(task_api, job_id, 600, 3)

        if result.output && result.output.image_urls && !result.output.image_urls.empty?
          results << { prompt: prompt, url: result.output.image_urls[0] }
        end

      rescue StandardError => e
        results << { prompt: prompt, error: e.message }
      end
    end
  end

  # Wait for all threads
  threads.each(&:join)

  # Collect results
  all_results = []
  all_results << results.pop until results.empty?

  all_results.each do |result|
    if result[:error]
      puts "Error for '#{result[:prompt]}': #{result[:error]}"
    else
      puts "Image '#{result[:prompt]}': #{result[:url]}"
    end
  end
end

generate_multiple_images

Progress Callbacks

def wait_for_completion_with_progress(task_api, job_id, timeout_seconds = 300, poll_interval_seconds = 3, &block)
  start_time = Time.now
  timeout = timeout_seconds

  loop do
    task = task_api.get_task_status(job_id)

    # Call progress callback
    if block_given?
      progress = task.progress || 0
      status = task.status || 'unknown'
      yield(progress, status)
    end

    if task.status == 'completed'
      return task
    elsif task.status == 'failed'
      raise "Task failed: #{task.error}"
    end

    elapsed = Time.now - start_time
    if elapsed > timeout
      raise "Task timeout after #{timeout_seconds} seconds"
    end

    sleep poll_interval_seconds
  end
end

# Usage
task_api = Legnext::TaskManagementApi.new
job_id = 'job-123'

result = wait_for_completion_with_progress(task_api, job_id, 600, 3) do |progress, status|
  puts "Progress: #{progress}% - Status: #{status}"
end

Using Fibers for Cooperative Multitasking

require 'fiber'

def create_image_fiber(image_api, task_api, prompt)
  Fiber.new do
    # Start generation
    request = Legnext::DiffusionRequest.new(text: prompt)
    response = image_api.generate_image(request)
    job_id = response.job_id

    Fiber.yield job_id

    # Poll for completion
    loop do
      task = task_api.get_task_status(job_id)

      if task.status == 'completed'
        Fiber.yield task.output
        break
      elsif task.status == 'failed'
        raise "Task failed: #{task.error}"
      end

      Fiber.yield :pending
      sleep 3
    end
  end
end

# Usage
Legnext.configure do |config|
  config.api_key['x-api-key'] = ENV['LEGNEXT_API_KEY']
  config.host = 'api.legnext.ai'
  config.base_path = '/api'
end

image_api = Legnext::ImageGenerationApi.new
task_api = Legnext::TaskManagementApi.new

fiber = create_image_fiber(image_api, task_api, 'a beautiful landscape')

job_id = fiber.resume
puts "Job started: #{job_id}"

loop do
  result = fiber.resume
  break unless fiber.alive?

  if result == :pending
    puts "Still processing..."
  elsif result
    puts "Result: #{result}"
  end
end

Learn More