Skip to content

Rapidata job

RapidataJob #

RapidataJob(
    job_id: str,
    name: str,
    audience_id: str,
    created_at: datetime,
    definition_id: str,
    openapi_service: OpenAPIService,
    pipeline_id: str | None = None,
)

An instance of a Rapidata job.

Used to interact with a specific job in the Rapidata system, such as getting status and retrieving results.

A job is created from a job definition and an audience, and represents a specific run of that definition.

Parameters:

Name Type Description Default
job_id str

The ID of the job.

required
name str

The name of the job.

required
openapi_service OpenAPIService

The OpenAPIService instance for API interaction.

required
Source code in src/rapidata/rapidata_client/job/rapidata_job.py
def __init__(
    self,
    job_id: str,
    name: str,
    audience_id: str,
    created_at: datetime,
    definition_id: str,
    openapi_service: OpenAPIService,
    pipeline_id: str | None = None,
):
    self.id = job_id
    self.name = name
    self.audience_id = audience_id
    self._openapi_service = openapi_service
    self.created_at = created_at
    self.definition_id = definition_id
    self.__pipeline_id = pipeline_id
    self.__completed_at = None
    self.job_details_page = f"https://app.{self._openapi_service.environment}/audiences/{self.audience_id}/job/{self.id}"
    logger.debug("RapidataJob initialized")

completed_at property #

completed_at: datetime | None

Returns the completion date of the job, or None if not completed.

pipeline_id property #

pipeline_id: str

Returns the pipeline ID of the job.

get_status #

get_status() -> str

Gets the status of the job.

Returns:

Type Description
str

The current status of the job as a string.

Source code in src/rapidata/rapidata_client/job/rapidata_job.py
def get_status(self) -> str:
    """
    Gets the status of the job.

    Returns:
        The current status of the job as a string.
    """
    with tracer.start_as_current_span("RapidataJob.get_status"):
        return self._openapi_service.order.job_api.job_job_id_get(self.id).status

get_results #

get_results() -> RapidataResults

Gets the results of the job.

If wait_for_completion is True and the job is still processing, this method will block until the job is completed and then return the results.

Returns:

Name Type Description
RapidataResults RapidataResults

The results of the job.

Raises:

Type Description
Exception

If failed to get job results.

Source code in src/rapidata/rapidata_client/job/rapidata_job.py
def get_results(self) -> RapidataResults:
    """
    Gets the results of the job.

    If wait_for_completion is True and the job is still processing, this method
    will block until the job is completed and then return the results.

    Returns:
        RapidataResults: The results of the job.

    Raises:
        Exception: If failed to get job results.
    """
    with tracer.start_as_current_span("RapidataJob.get_results"):
        from rapidata.api_client.exceptions import ApiException
        from rapidata.rapidata_client.results.rapidata_results import (
            RapidataResults,
        )

        logger.info("Getting results for job '%s'...", self)

        self._wait_for_status(
            target_statuses=["Completed", "Failed"],
            status_message="Job '%s' is in status %s, waiting for completion...",
        )

        try:
            results = (
                self._openapi_service.order.job_api.job_job_id_download_results_get(
                    job_id=self.id
                )
            )
            return RapidataResults(json.loads(results))
        except (ApiException, json.JSONDecodeError) as e:
            raise Exception(f"Failed to get job results: {str(e)}") from e

display_progress_bar #

display_progress_bar(refresh_rate: int = 5) -> None

Displays a progress bar for the job processing using tqdm.

Parameters:

Name Type Description Default
refresh_rate int

How often to refresh the progress bar, in seconds.

5

Raises:

Type Description
ValueError

If refresh_rate is less than 1.

Source code in src/rapidata/rapidata_client/job/rapidata_job.py
def display_progress_bar(self, refresh_rate: int = 5) -> None:
    """
    Displays a progress bar for the job processing using tqdm.

    Args:
        refresh_rate: How often to refresh the progress bar, in seconds.

    Raises:
        ValueError: If refresh_rate is less than 1.
    """
    if refresh_rate < 1:
        raise ValueError("refresh_rate must be at least 1")

    current_status = self.get_status()
    if current_status == "Completed":
        managed_print(f"Job '{self}' is already completed.")
        return

    if current_status == "Failed":
        failure_message = self._get_job_failure_message()
        raise Exception(f"Job '{self}' has failed: {failure_message}")

    # Get progress from pipeline if available
    with tqdm(
        total=100,
        desc="Processing job",
        unit="%",
        bar_format="{desc}: {percentage:3.0f}%|{bar}| completed [{elapsed}<{remaining}, {rate_fmt}]",
        disable=rapidata_config.logging.silent_mode,
    ) as pbar:
        last_percentage = 0
        while True:
            current_status = self.get_status()

            if current_status == "Completed":
                pbar.update(100 - last_percentage)
                break

            if current_status == "Failed":
                failure_message = self._get_job_failure_message()
                raise Exception(f"Job '{self}' has failed: {failure_message}")

            # Try to get progress from workflow
            try:
                progress = self._get_workflow_progress()
                current_percentage = (
                    progress.completion_percentage if progress else 0
                )

                if current_percentage > last_percentage:
                    pbar.update(current_percentage - last_percentage)
                    last_percentage = current_percentage
            except Exception:
                pass  # Continue without progress update if we can't get it

            sleep(refresh_rate)

delete #

delete() -> None

Deletes the job.

Source code in src/rapidata/rapidata_client/job/rapidata_job.py
def delete(self) -> None:
    """Deletes the job."""
    with tracer.start_as_current_span("RapidataJob.delete"):
        logger.info("Deleting job '%s'", self)
        self._openapi_service.order.job_api.job_job_id_delete(self.id)
        logger.debug("Job '%s' has been deleted.", self)
        managed_print(f"Job '{self}' has been deleted.")

view #

view() -> None

Opens the job details page in the browser.

Source code in src/rapidata/rapidata_client/job/rapidata_job.py
def view(self) -> None:
    """Opens the job details page in the browser."""
    logger.info("Opening job details page in browser...")
    if not webbrowser.open(self.job_details_page):
        encoded_url = urllib.parse.quote(
            self.job_details_page, safe="%/:=&?~#+!$,;'@()*[]"
        )
        managed_print(
            Fore.RED
            + f"Please open this URL in your browser: '{encoded_url}'"
            + Fore.RESET
        )