Skip to content

Rapidata flow item

RapidataFlowItem #

RapidataFlowItem(
    id: str, flow_id: str, openapi_service: OpenAPIService
)
Source code in src/rapidata/rapidata_client/flow/rapidata_flow_item.py
def __init__(self, id: str, flow_id: str, openapi_service: OpenAPIService):
    self.id = id
    self.flow_id = flow_id
    self._openapi_service = openapi_service
    self._response_count: float | int | None = None

get_response_count #

get_response_count() -> float | int

Get the total number of pairwise comparison responses for this flow item.

The count is derived from the win/loss matrix by summing all entries. If the matrix hasn't been fetched yet, this will trigger a call to :meth:get_win_loss_matrix, which waits for the flow item to finish.

Returns:

Type Description
float | int

float | int: The total number of comparison votes collected.

Source code in src/rapidata/rapidata_client/flow/rapidata_flow_item.py
def get_response_count(self) -> float | int:
    """Get the total number of pairwise comparison responses for this flow item.

    The count is derived from the win/loss matrix by summing all entries.
    If the matrix hasn't been fetched yet, this will trigger a call to
    :meth:`get_win_loss_matrix`, which waits for the flow item to finish.

    Returns:
        float | int: The total number of comparison votes collected.
    """
    with tracer.start_as_current_span("RapidataFlowItem.get_response_count"):
        if self._response_count is None:
            self.get_win_loss_matrix()
        assert self._response_count is not None
        return self._response_count

get_status #

get_status() -> FlowItemState

Get the current state of this flow item.

Returns:

Name Type Description
FlowItemState FlowItemState

The current state (Pending, Running, Completed, Failed, Stopped, or Incomplete).

Source code in src/rapidata/rapidata_client/flow/rapidata_flow_item.py
def get_status(self) -> FlowItemState:
    """Get the current state of this flow item.

    Returns:
        FlowItemState: The current state (Pending, Running, Completed, Failed, Stopped, or Incomplete).
    """
    with tracer.start_as_current_span("RapidataFlowItem.get_status"):
        logger.debug("Getting status for flow item '%s'", self.id)
        details = self._get_details()
        return details.state

get_results #

get_results() -> FlowItemResult

Get the results of this flow item from the API.

Returns:

Name Type Description
FlowItemResult FlowItemResult

Contains a mapping of asset identifier to elo score and the total number of votes.

Source code in src/rapidata/rapidata_client/flow/rapidata_flow_item.py
def get_results(self) -> FlowItemResult:
    """Get the results of this flow item from the API.

    Returns:
        FlowItemResult: Contains a mapping of asset identifier to elo score
            and the total number of votes.
    """
    with tracer.start_as_current_span("RapidataFlowItem.get_results"):
        from rapidata.api_client.models.flow_item_state import FlowItemState

        logger.debug("Getting results for flow item '%s'", self.id)
        self._wait_for_state(
            target_states=[
                FlowItemState.COMPLETED,
                FlowItemState.FAILED,
                FlowItemState.STOPPED,
                FlowItemState.INCOMPLETE,
            ],
            check_interval=1,
            status_message="Flow item '%s' is in state %s, waiting for completion...",
        )

        results = self._openapi_service.flow.ranking_flow_item_api.flow_ranking_item_flow_item_id_results_get(
            flow_item_id=self.id,
        )

        datapoints = {
            self._extract_asset_key(dp): dp.get("elo", 0)
            for dp in (datapoint.to_dict() for datapoint in results.datapoints)
        }

        return FlowItemResult(
            datapoints=datapoints,
            total_votes=results.total_votes,
        )

get_win_loss_matrix #

get_win_loss_matrix() -> DataFrame

Get the win/loss matrix of this flow item from the API.

The win/loss matrix shows pairwise comparison counts where data[i][j] is the number of times row i was preferred over column j.

Returns:

Type Description
DataFrame

pd.DataFrame: A DataFrame where rows and columns are asset identifiers, and values are win/loss counts.

Source code in src/rapidata/rapidata_client/flow/rapidata_flow_item.py
def get_win_loss_matrix(self) -> pd.DataFrame:
    """Get the win/loss matrix of this flow item from the API.

    The win/loss matrix shows pairwise comparison counts where ``data[i][j]`` is
    the number of times row ``i`` was preferred over column ``j``.

    Returns:
        pd.DataFrame: A DataFrame where rows and columns are asset identifiers,
            and values are win/loss counts.
    """
    with tracer.start_as_current_span("RapidataFlowItem.get_win_loss_matrix"):
        import pandas as pd
        from rapidata.api_client.models.flow_item_state import FlowItemState

        logger.debug("Getting win/loss matrix for flow item '%s'", self.id)
        self._wait_for_state(
            target_states=[
                FlowItemState.COMPLETED,
                FlowItemState.FAILED,
                FlowItemState.STOPPED,
                FlowItemState.INCOMPLETE,
            ],
            check_interval=1,
            status_message="Flow item '%s' is in state %s, waiting for completion...",
        )

        result = self._openapi_service.flow.ranking_flow_item_api.flow_ranking_item_flow_item_id_vote_matrix_get(
            flow_item_id=self.id,
        )
        self._response_count = sum(sum(row) for row in result.data)

        return pd.DataFrame(
            data=result.data,
            index=pd.Index(result.index),
            columns=pd.Index(result.columns),
        )