Skip to content

Ranking Flows#

Overview#

Ranking Flows provide a lightweight way to continuously rank items using human comparisons without the overhead of creating full jobs. They are ideal for ongoing evaluation where new items are added over time and ranked against each other in a specified time frame (ttl). Each ranking uses the configuration of the flow but is fully independent of the other rankings.

Note

Can be used with Images, Videos, Audio, and Text.

How to use Ranking Flows#

1. Create a Flow#

Start by creating a ranking flow with an instruction that will be shown to evaluators for each comparison:

from rapidata import RapidataClient

client = RapidataClient()

flow = client.flow.create_ranking_flow(
    name="Image Quality Ranking",
    instruction="Which image looks better?",
)

You can optionally configure a response threshold range to control how many pairwise comparison responses are collected per flow item:

  • max_response_threshold (default 100): The target number of responses. The system will try to collect up to this many responses for each flow item.
  • min_response_threshold (default = max_response_threshold): The minimum number of responses you are willing to accept. If the time_to_live expires and fewer than min_response_threshold responses have been collected, the flow item is marked as Incomplete. If at least min_response_threshold responses have been collected, it is marked as Completed.
flow = client.flow.create_ranking_flow(
    name="Image Quality Ranking",
    instruction="Which image looks better?",
    max_response_threshold=200, # (1)!
    min_response_threshold=50, # (2)!
)
  1. The target number of responses. The system will try to collect up to this many responses for each flow item.
  2. The minimum acceptable response count. If time_to_live expires with fewer than this, the flow item is marked as Incomplete; otherwise it's Completed.

2. Add a Flow Batch#

Submit datapoints to the flow by creating a batch. Each batch uploads a set of items that will be compared and ranked:

flow_item = flow.create_new_flow_batch(
    datapoints=[
        "https://example.com/image_a.jpg",
        "https://example.com/image_b.jpg",
        "https://example.com/image_c.jpg",
    ],
)

You can optionally provide a context and a time_to_live:

flow_item = flow.create_new_flow_batch(
    datapoints=[
        "https://example.com/image_a.jpg",
        "https://example.com/image_b.jpg",
        "https://example.com/image_c.jpg",
    ],
    context="These images were generated by model X", # (1)!
    time_to_live=300, # (2)!
)
  1. Shown alongside the instruction for each comparison.
  2. Automatically stops the flow item after this many seconds (minimum 60) and returns partial results.

3. Get Results#

Call get_results() on a flow item to retrieve the ranking results. If the flow item is still processing, this will automatically wait until it completes (or becomes incomplete due to time_to_live):

results = flow_item.get_results()

You can also check the status without blocking:

status = flow_item.get_status()  # Pending, Running, Completed, Failed, Stopped, or Incomplete

Note

A flow item enters the Incomplete state when its time_to_live expires before all responses are collected. You can still retrieve partial results from incomplete flow items.

To get the win/loss matrix per flow item and see what datapoints were preferred over each other:

matrix = flow_item.get_win_loss_matrix()

This returns a pandas DataFrame where matrix.loc[a, b] is the number of times item a was preferred over item b.

To get the total number of pairwise comparison responses collected for a flow item:

response_count = flow_item.get_response_count()

To query all flow items for a flow:

all_items = flow.get_flow_items()

4. Update Flow Configuration#

You can update the flow configuration at any time:

flow.update_config(
    instruction="Which image has higher visual quality?",
)

Note

This config will only affect new flow items and not modify existing ones.

Preheating#

If you need low-latency responses for upcoming flow items, you can preheat the system beforehand:

client.flow.preheat()

This warms up internal resources so that subsequent flow batches are processed faster. Call it around 5 minutes before submitting time-sensitive batches.

Retrieving Existing Flows#

You can retrieve flows by ID or list your recent flows:

# Get a specific flow by ID
flow = client.flow.get_flow_by_id("flow_id_here")

# List recent flows
recent_flows = client.flow.find_flows(amount=10)

Deleting a Flow#

flow.delete()