Ask AI

Source code for dagster._core.storage.dagster_run

import os
from datetime import datetime
from enum import Enum
from functools import cached_property
from typing import (
    TYPE_CHECKING,
    AbstractSet,
    Any,
    Dict,
    Iterable,
    List,
    Mapping,
    NamedTuple,
    Optional,
    Sequence,
    Union,
)

from typing_extensions import Self

import dagster._check as check
from dagster._annotations import PublicAttr, experimental_param, public
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.events import AssetKey
from dagster._core.loader import LoadableBy, LoadingContext
from dagster._core.origin import JobPythonOrigin
from dagster._core.storage.tags import (
    ASSET_EVALUATION_ID_TAG,
    AUTO_RETRY_RUN_ID_TAG,
    AUTOMATION_CONDITION_TAG,
    BACKFILL_ID_TAG,
    PARENT_RUN_ID_TAG,
    REPOSITORY_LABEL_TAG,
    RESUME_RETRY_TAG,
    ROOT_RUN_ID_TAG,
    SCHEDULE_NAME_TAG,
    SENSOR_NAME_TAG,
    TICK_ID_TAG,
    WILL_RETRY_TAG,
)
from dagster._core.utils import make_new_run_id
from dagster._record import IHaveNew, record_custom
from dagster._serdes.serdes import NamedTupleSerializer, whitelist_for_serdes
from dagster._utils.tags import get_boolean_tag_value

if TYPE_CHECKING:
    from dagster._core.definitions.schedule_definition import ScheduleDefinition
    from dagster._core.definitions.sensor_definition import SensorDefinition
    from dagster._core.remote_representation.external import RemoteSchedule, RemoteSensor
    from dagster._core.remote_representation.origin import RemoteJobOrigin
    from dagster._core.scheduler.instigation import InstigatorState


[docs] @whitelist_for_serdes(storage_name="PipelineRunStatus") class DagsterRunStatus(Enum): """The status of run execution.""" # Runs waiting to be launched by the Dagster Daemon. QUEUED = "QUEUED" # Runs in the brief window between creating the run and launching or enqueueing it. NOT_STARTED = "NOT_STARTED" # Runs that are managed outside of the Dagster control plane. MANAGED = "MANAGED" # Runs that have been launched, but execution has not yet started. STARTING = "STARTING" # Runs that have been launched and execution has started. STARTED = "STARTED" # Runs that have successfully completed. SUCCESS = "SUCCESS" # Runs that have failed to complete. FAILURE = "FAILURE" # Runs that are in-progress and pending to be canceled. CANCELING = "CANCELING" # Runs that have been canceled before completion. CANCELED = "CANCELED"
# These statuses that indicate a run may be using compute resources IN_PROGRESS_RUN_STATUSES = [ DagsterRunStatus.STARTING, DagsterRunStatus.STARTED, DagsterRunStatus.CANCELING, ] # This serves as an explicit list of run statuses that indicate that the run is not using compute # resources. This and the enum above should cover all run statuses. NON_IN_PROGRESS_RUN_STATUSES = [ DagsterRunStatus.QUEUED, DagsterRunStatus.NOT_STARTED, DagsterRunStatus.SUCCESS, DagsterRunStatus.FAILURE, DagsterRunStatus.MANAGED, DagsterRunStatus.CANCELED, ] FINISHED_STATUSES = [ DagsterRunStatus.SUCCESS, DagsterRunStatus.FAILURE, DagsterRunStatus.CANCELED, ] NOT_FINISHED_STATUSES = [ DagsterRunStatus.STARTING, DagsterRunStatus.STARTED, DagsterRunStatus.CANCELING, DagsterRunStatus.QUEUED, DagsterRunStatus.NOT_STARTED, ] # Run statuses for runs that can be safely canceled. # Does not include the other unfinished statuses for the following reasons: # STARTING: Control has been ceded to the run worker, which will eventually move the run to a STARTED. # NOT_STARTED: Mostly replaced with STARTING. Runs are only here in the brief window between # creating the run and launching or enqueueing it. CANCELABLE_RUN_STATUSES = [DagsterRunStatus.STARTED, DagsterRunStatus.QUEUED] @whitelist_for_serdes(storage_name="PipelineRunStatsSnapshot") class DagsterRunStatsSnapshot( NamedTuple( "_DagsterRunStatsSnapshot", [ ("run_id", str), ("steps_succeeded", int), ("steps_failed", int), ("materializations", int), ("expectations", int), ("enqueued_time", Optional[float]), ("launch_time", Optional[float]), ("start_time", Optional[float]), ("end_time", Optional[float]), ], ) ): def __new__( cls, run_id: str, steps_succeeded: int, steps_failed: int, materializations: int, expectations: int, enqueued_time: Optional[float], launch_time: Optional[float], start_time: Optional[float], end_time: Optional[float], ): return super(DagsterRunStatsSnapshot, cls).__new__( cls, run_id=check.str_param(run_id, "run_id"), steps_succeeded=check.int_param(steps_succeeded, "steps_succeeded"), steps_failed=check.int_param(steps_failed, "steps_failed"), materializations=check.int_param(materializations, "materializations"), expectations=check.int_param(expectations, "expectations"), enqueued_time=check.opt_float_param(enqueued_time, "enqueued_time"), launch_time=check.opt_float_param(launch_time, "launch_time"), start_time=check.opt_float_param(start_time, "start_time"), end_time=check.opt_float_param(end_time, "end_time"), ) @whitelist_for_serdes class RunOpConcurrency( NamedTuple( "_RunOpConcurrency", [ ("root_key_counts", Mapping[str, int]), ("has_unconstrained_root_nodes", bool), ], ) ): """Utility class to help calculate the immediate impact of launching a run on the op concurrency slots that will be available for other runs. """ def __new__( cls, root_key_counts: Mapping[str, int], has_unconstrained_root_nodes: bool, ): return super(RunOpConcurrency, cls).__new__( cls, root_key_counts=check.dict_param( root_key_counts, "root_key_counts", key_type=str, value_type=int ), has_unconstrained_root_nodes=check.bool_param( has_unconstrained_root_nodes, "has_unconstrained_root_nodes" ), ) class DagsterRunSerializer(NamedTupleSerializer["DagsterRun"]): # serdes log # * removed reexecution_config - serdes logic expected to strip unknown keys so no need to preserve # * added pipeline_snapshot_id # * renamed previous_run_id -> parent_run_id, added root_run_id # * added execution_plan_snapshot_id # * removed selector # * added solid_subset # * renamed solid_subset -> solid_selection, added solids_to_execute # * renamed environment_dict -> run_config # * added asset_selection # * added has_repository_load_data def before_unpack(self, context, unpacked_dict: Dict[str, Any]) -> Dict[str, Any]: # back compat for environment dict => run_config if "environment_dict" in unpacked_dict: check.invariant( unpacked_dict.get("run_config") is None, "Cannot set both run_config and environment_dict. Use run_config parameter.", ) unpacked_dict["run_config"] = unpacked_dict["environment_dict"] del unpacked_dict["environment_dict"] # back compat for previous_run_id => parent_run_id, root_run_id if "previous_run_id" in unpacked_dict and not ( "parent_run_id" in unpacked_dict and "root_run_id" in unpacked_dict ): unpacked_dict["parent_run_id"] = unpacked_dict["previous_run_id"] unpacked_dict["root_run_id"] = unpacked_dict["previous_run_id"] del unpacked_dict["previous_run_id"] # back compat for selector => pipeline_name, solids_to_execute if "selector" in unpacked_dict: selector = unpacked_dict["selector"] if not isinstance(selector, ExecutionSelector): check.failed(f"unexpected entry for 'select', {selector}") selector_name = selector.name selector_subset = selector.solid_subset job_name = unpacked_dict.get("pipeline_name") check.invariant( job_name is None or selector_name == job_name, f"Conflicting pipeline name {job_name} in arguments to PipelineRun: " f"selector was passed with pipeline {selector_name}", ) if job_name is None: unpacked_dict["pipeline_name"] = selector_name solids_to_execute = unpacked_dict.get("solids_to_execute") check.invariant( solids_to_execute is None or (selector_subset and set(selector_subset) == solids_to_execute), f"Conflicting solids_to_execute {solids_to_execute} in arguments to" f" PipelineRun: selector was passed with subset {selector_subset}", ) # for old runs that only have selector but no solids_to_execute if solids_to_execute is None: solids_to_execute = frozenset(selector_subset) if selector_subset else None # back compat for solid_subset => solids_to_execute if "solid_subset" in unpacked_dict: unpacked_dict["solids_to_execute"] = unpacked_dict["solid_subset"] del unpacked_dict["solid_subset"] return unpacked_dict
[docs] @whitelist_for_serdes( serializer=DagsterRunSerializer, # DagsterRun is serialized as PipelineRun so that it can be read by older (pre 0.13.x) version # of Dagster, but is read back in as a DagsterRun. storage_name="PipelineRun", old_fields={"mode": None}, storage_field_names={ "job_name": "pipeline_name", "job_snapshot_id": "pipeline_snapshot_id", "remote_job_origin": "external_pipeline_origin", "job_code_origin": "pipeline_code_origin", "op_selection": "solid_selection", "resolved_op_selection": "solids_to_execute", }, ) class DagsterRun( NamedTuple( "_DagsterRun", [ ("job_name", PublicAttr[str]), ("run_id", PublicAttr[str]), ("run_config", PublicAttr[Mapping[str, object]]), ("asset_selection", Optional[AbstractSet[AssetKey]]), ("asset_check_selection", Optional[AbstractSet[AssetCheckKey]]), ("op_selection", Optional[Sequence[str]]), ("resolved_op_selection", Optional[AbstractSet[str]]), ("step_keys_to_execute", Optional[Sequence[str]]), ("status", DagsterRunStatus), ("tags", PublicAttr[Mapping[str, str]]), ("root_run_id", Optional[str]), ("parent_run_id", Optional[str]), ("job_snapshot_id", Optional[str]), ("execution_plan_snapshot_id", Optional[str]), ("remote_job_origin", Optional["RemoteJobOrigin"]), ("job_code_origin", Optional[JobPythonOrigin]), ("has_repository_load_data", bool), ("run_op_concurrency", Optional[RunOpConcurrency]), ], ) ): """Serializable internal representation of a dagster run, as stored in a :py:class:`~dagster._core.storage.runs.RunStorage`. Attributes: job_name (str): The name of the job executed in this run run_id (str): The ID of the run run_config (Mapping[str, object]): The config for the run tags (Mapping[str, str]): The tags applied to the run """ def __new__( cls, job_name: str, run_id: Optional[str] = None, run_config: Optional[Mapping[str, object]] = None, asset_selection: Optional[AbstractSet[AssetKey]] = None, asset_check_selection: Optional[AbstractSet[AssetCheckKey]] = None, op_selection: Optional[Sequence[str]] = None, resolved_op_selection: Optional[AbstractSet[str]] = None, step_keys_to_execute: Optional[Sequence[str]] = None, status: Optional[DagsterRunStatus] = None, tags: Optional[Mapping[str, str]] = None, root_run_id: Optional[str] = None, parent_run_id: Optional[str] = None, job_snapshot_id: Optional[str] = None, execution_plan_snapshot_id: Optional[str] = None, remote_job_origin: Optional["RemoteJobOrigin"] = None, job_code_origin: Optional[JobPythonOrigin] = None, has_repository_load_data: Optional[bool] = None, run_op_concurrency: Optional[RunOpConcurrency] = None, ): check.invariant( (root_run_id is not None and parent_run_id is not None) or (root_run_id is None and parent_run_id is None), "Must set both root_run_id and parent_run_id when creating a PipelineRun that " "belongs to a run group", ) # a set which contains the names of the ops to execute resolved_op_selection = check.opt_nullable_set_param( resolved_op_selection, "resolved_op_selection", of_type=str ) # a list of op queries provided by the user # possible to be None when resolved_op_selection is set by the user directly op_selection = check.opt_nullable_sequence_param(op_selection, "op_selection", of_type=str) check.opt_nullable_sequence_param(step_keys_to_execute, "step_keys_to_execute", of_type=str) asset_selection = check.opt_nullable_set_param( asset_selection, "asset_selection", of_type=AssetKey ) asset_check_selection = check.opt_nullable_set_param( asset_check_selection, "asset_check_selection", of_type=AssetCheckKey ) # Placing this with the other imports causes a cyclic import # https://github.com/dagster-io/dagster/issues/3181 from dagster._core.remote_representation.origin import RemoteJobOrigin if status == DagsterRunStatus.QUEUED: check.inst_param( remote_job_origin, "remote_job_origin", RemoteJobOrigin, "remote_job_origin is required for queued runs", ) if run_id is None: run_id = make_new_run_id() return super(DagsterRun, cls).__new__( cls, job_name=check.str_param(job_name, "job_name"), run_id=check.str_param(run_id, "run_id"), run_config=check.opt_mapping_param(run_config, "run_config", key_type=str), op_selection=op_selection, asset_selection=asset_selection, asset_check_selection=asset_check_selection, resolved_op_selection=resolved_op_selection, step_keys_to_execute=step_keys_to_execute, status=check.opt_inst_param( status, "status", DagsterRunStatus, DagsterRunStatus.NOT_STARTED ), tags=check.opt_mapping_param(tags, "tags", key_type=str, value_type=str), root_run_id=check.opt_str_param(root_run_id, "root_run_id"), parent_run_id=check.opt_str_param(parent_run_id, "parent_run_id"), job_snapshot_id=check.opt_str_param(job_snapshot_id, "job_snapshot_id"), execution_plan_snapshot_id=check.opt_str_param( execution_plan_snapshot_id, "execution_plan_snapshot_id" ), remote_job_origin=check.opt_inst_param( remote_job_origin, "remote_job_origin", RemoteJobOrigin ), job_code_origin=check.opt_inst_param( job_code_origin, "job_code_origin", JobPythonOrigin ), has_repository_load_data=check.opt_bool_param( has_repository_load_data, "has_repository_load_data", default=False ), run_op_concurrency=check.opt_inst_param( run_op_concurrency, "run_op_concurrency", RunOpConcurrency ), ) def with_status(self, status: DagsterRunStatus) -> Self: if status == DagsterRunStatus.QUEUED: # Placing this with the other imports causes a cyclic import # https://github.com/dagster-io/dagster/issues/3181 check.not_none( self.remote_job_origin, "external_pipeline_origin is required for queued runs", ) return self._replace(status=status) def with_job_origin(self, origin: "RemoteJobOrigin") -> Self: from dagster._core.remote_representation.origin import RemoteJobOrigin check.inst_param(origin, "origin", RemoteJobOrigin) return self._replace(remote_job_origin=origin) def with_tags(self, tags: Mapping[str, str]) -> Self: return self._replace(tags=tags) def get_root_run_id(self) -> Optional[str]: return self.tags.get(ROOT_RUN_ID_TAG) def get_parent_run_id(self) -> Optional[str]: return self.tags.get(PARENT_RUN_ID_TAG) @cached_property def dagster_execution_info(self) -> Mapping[str, str]: """Key-value pairs encoding metadata about the current Dagster run, typically attached to external execution resources. Remote execution environments commonly have their own concepts of tags or labels. It's useful to include Dagster-specific metadata in these environments to help with debugging, monitoring, and linking remote resources back to Dagster. For example, the Kubernetes Executor and Pipes client are using these tags as Kubernetes labels. By default the tags include: * dagster/run-id * dagster/job And, if available: * dagster/partition * dagster/code-location * dagster/user And, for Dagster+ deployments: * dagster/deployment-name * dagster/git-repo * dagster/git-branch * dagster/git-sha """ tags = { "dagster/run-id": self.run_id, "dagster/job": self.job_name, } if self.remote_job_origin: tags["dagster/code-location"] = ( self.remote_job_origin.repository_origin.code_location_origin.location_name ) if user := self.tags.get("dagster/user"): tags["dagster/user"] = user if partition := self.tags.get("dagster/partition"): tags["dagster/partition"] = partition for env_var, tag in { "DAGSTER_CLOUD_DEPLOYMENT_NAME": "deployment-name", "DAGSTER_CLOUD_GIT_REPO": "git-repo", "DAGSTER_CLOUD_GIT_BRANCH": "git-branch", "DAGSTER_CLOUD_GIT_SHA": "git-sha", }.items(): if value := os.getenv(env_var): tags[f"dagster/{tag}"] = value return tags def tags_for_storage(self) -> Mapping[str, str]: repository_tags = {} if self.remote_job_origin: # tag the run with a label containing the repository name / location name, to allow for # per-repository filtering of runs from the Dagster UI. repository_tags[REPOSITORY_LABEL_TAG] = ( self.remote_job_origin.repository_origin.get_label() ) if not self.tags: return repository_tags return {**repository_tags, **self.tags} @public @property def is_finished(self) -> bool: """bool: If this run has completely finished execution.""" return self.status in FINISHED_STATUSES @public @property def is_cancelable(self) -> bool: """bool: If this run an be canceled.""" return self.status in CANCELABLE_RUN_STATUSES @public @property def is_success(self) -> bool: """bool: If this run has successfully finished executing.""" return self.status == DagsterRunStatus.SUCCESS @public @property def is_failure(self) -> bool: """bool: If this run has failed.""" return self.status == DagsterRunStatus.FAILURE @public @property def is_failure_or_canceled(self) -> bool: """bool: If this run has either failed or was canceled.""" return self.status == DagsterRunStatus.FAILURE or self.status == DagsterRunStatus.CANCELED @public @property def is_resume_retry(self) -> bool: """bool: If this run was created from retrying another run from the point of failure.""" return self.tags.get(RESUME_RETRY_TAG) == "true" @property def is_complete_and_waiting_to_retry(self): """Indicates if a run is waiting to be retried by the auto-reexecution system. Returns True if 1) the run is complete, 2) the run is in a failed state (therefore eligible for retry), 3) the run is marked as needing to be retried, and 4) the retried run has not been launched yet. Otherwise returns False. """ if self.status in NOT_FINISHED_STATUSES: return False if self.status != DagsterRunStatus.FAILURE: return False will_retry = get_boolean_tag_value(self.tags.get(WILL_RETRY_TAG), default_value=False) retry_not_launched = self.tags.get(AUTO_RETRY_RUN_ID_TAG) is None if will_retry: return retry_not_launched return False @property def previous_run_id(self) -> Optional[str]: # Compat return self.parent_run_id @staticmethod def tags_for_schedule( schedule: Union["InstigatorState", "RemoteSchedule", "ScheduleDefinition"], ) -> Mapping[str, str]: return {SCHEDULE_NAME_TAG: schedule.name} @staticmethod def tags_for_sensor( sensor: Union["InstigatorState", "RemoteSensor", "SensorDefinition"], ) -> Mapping[str, str]: return {SENSOR_NAME_TAG: sensor.name} @staticmethod def tags_for_backfill_id(backfill_id: str) -> Mapping[str, str]: return {BACKFILL_ID_TAG: backfill_id} @staticmethod def tags_for_tick_id(tick_id: str, has_evaluations: bool = False) -> Mapping[str, str]: if has_evaluations: automation_tags = {AUTOMATION_CONDITION_TAG: "true", ASSET_EVALUATION_ID_TAG: tick_id} else: automation_tags = {} return {TICK_ID_TAG: tick_id, **automation_tags}
[docs] @record_custom class RunsFilter(IHaveNew): """Defines a filter across job runs, for use when querying storage directly. Each field of the RunsFilter represents a logical AND with each other. For example, if you specify job_name and tags, then you will receive only runs with the specified job_name AND the specified tags. If left blank, then all values will be permitted for that field. Args: run_ids (Optional[List[str]]): A list of job run_id values. job_name (Optional[str]): Name of the job to query for. If blank, all job_names will be accepted. statuses (Optional[List[DagsterRunStatus]]): A list of run statuses to filter by. If blank, all run statuses will be allowed. tags (Optional[Dict[str, Union[str, List[str]]]]): A dictionary of run tags to query by. All tags specified here must be present for a given run to pass the filter. snapshot_id (Optional[str]): The ID of the job snapshot to query for. Intended for internal use. updated_after (Optional[DateTime]): Filter by runs that were last updated before this datetime. created_before (Optional[DateTime]): Filter by runs that were created before this datetime. exclude_subruns (Optional[bool]): If true, runs that were launched to backfill historical data will be excluded from results. """ run_ids: Optional[Sequence[str]] job_name: Optional[str] statuses: Sequence[DagsterRunStatus] tags: Mapping[str, Union[str, Sequence[str]]] snapshot_id: Optional[str] updated_after: Optional[datetime] updated_before: Optional[datetime] created_after: Optional[datetime] created_before: Optional[datetime] exclude_subruns: Optional[bool] @experimental_param(param="exclude_subruns") def __new__( cls, run_ids: Optional[Sequence[str]] = None, job_name: Optional[str] = None, statuses: Optional[Sequence[DagsterRunStatus]] = None, tags: Optional[Mapping[str, Union[str, Sequence[str]]]] = None, snapshot_id: Optional[str] = None, updated_after: Optional[datetime] = None, updated_before: Optional[datetime] = None, created_after: Optional[datetime] = None, created_before: Optional[datetime] = None, exclude_subruns: Optional[bool] = None, ): check.invariant(run_ids != [], "When filtering on run ids, a non-empty list must be used.") return super().__new__( cls, run_ids=run_ids, job_name=job_name, statuses=statuses or [], tags=tags or {}, snapshot_id=snapshot_id, updated_after=updated_after, updated_before=updated_before, created_after=created_after, created_before=created_before, exclude_subruns=exclude_subruns, ) @staticmethod def for_schedule( schedule: Union["RemoteSchedule", "InstigatorState", "ScheduleDefinition"], ) -> "RunsFilter": return RunsFilter(tags=DagsterRun.tags_for_schedule(schedule)) @staticmethod def for_sensor( sensor: Union["RemoteSensor", "InstigatorState", "SensorDefinition"], ) -> "RunsFilter": return RunsFilter(tags=DagsterRun.tags_for_sensor(sensor)) @staticmethod def for_backfill(backfill_id: str) -> "RunsFilter": return RunsFilter(tags=DagsterRun.tags_for_backfill_id(backfill_id))
class JobBucket(NamedTuple): job_names: List[str] bucket_limit: Optional[int] class TagBucket(NamedTuple): tag_key: str tag_values: List[str] bucket_limit: Optional[int]
[docs] class RunRecord( NamedTuple( "_RunRecord", [ ("storage_id", int), ("dagster_run", DagsterRun), ("create_timestamp", datetime), ("update_timestamp", datetime), ("start_time", Optional[float]), ("end_time", Optional[float]), ], ), LoadableBy[str], ): """Internal representation of a run record, as stored in a :py:class:`~dagster._core.storage.runs.RunStorage`. Users should not invoke this class directly. """ def __new__( cls, storage_id: int, dagster_run: DagsterRun, create_timestamp: datetime, update_timestamp: datetime, start_time: Optional[float] = None, end_time: Optional[float] = None, ): return super(RunRecord, cls).__new__( cls, storage_id=check.int_param(storage_id, "storage_id"), dagster_run=check.inst_param(dagster_run, "dagster_run", DagsterRun), create_timestamp=check.inst_param(create_timestamp, "create_timestamp", datetime), update_timestamp=check.inst_param(update_timestamp, "update_timestamp", datetime), # start_time and end_time fields will be populated once the run has started and ended, respectively, but will be None beforehand. start_time=check.opt_float_param(start_time, "start_time"), end_time=check.opt_float_param(end_time, "end_time"), ) @classmethod def _blocking_batch_load( cls, keys: Iterable[str], context: LoadingContext ) -> Iterable[Optional["RunRecord"]]: result_map: Dict[str, Optional[RunRecord]] = {run_id: None for run_id in keys} # this should be replaced with an async DB call records = context.instance.get_run_records(RunsFilter(run_ids=list(result_map.keys()))) for record in records: result_map[record.dagster_run.run_id] = record return result_map.values()
@whitelist_for_serdes class RunPartitionData( NamedTuple( "_RunPartitionData", [ ("run_id", str), ("partition", str), ("status", DagsterRunStatus), ("start_time", Optional[float]), ("end_time", Optional[float]), ], ) ): def __new__( cls, run_id: str, partition: str, status: DagsterRunStatus, start_time: Optional[float], end_time: Optional[float], ): return super(RunPartitionData, cls).__new__( cls, run_id=check.str_param(run_id, "run_id"), partition=check.str_param(partition, "partition"), status=check.inst_param(status, "status", DagsterRunStatus), start_time=check.opt_inst(start_time, float), end_time=check.opt_inst(end_time, float), ) ################################################################################################### # GRAVEYARD # # -|- # | # _-'~~~~~`-_ # .' '. # | R I P | # | | # | Execution | # | Selector | # | | # | | ################################################################################################### @whitelist_for_serdes class ExecutionSelector( NamedTuple("_ExecutionSelector", [("name", str), ("solid_subset", Optional[Sequence[str]])]) ): """Kept here to maintain loading of PipelineRuns from when it was still alive.""" def __new__(cls, name: str, solid_subset: Optional[Sequence[str]] = None): return super(ExecutionSelector, cls).__new__( cls, name=check.str_param(name, "name"), solid_subset=( None if solid_subset is None else check.sequence_param(solid_subset, "solid_subset", of_type=str) ), )