armonik.common package
Subpackages
Submodules
armonik.common.channel module
- armonik.common.channel.create_channel(uri: str, *, options: Tuple[Tuple[str, str]] = None, certificate_authority: str | PathLike | bytes | None = None, client_certificate: str | PathLike | bytes | None = None, client_key: str | PathLike | bytes | None = None) grpc.Channel
Create a gRPC channel for communication with the ArmoniK control plane :param uri: URI of the channel. Will start a secure channel if the scheme contains “https”. If it contains “unix”, uses a unix socket. :param options: Options to pass to the channel :param certificate_authority: Certificate authority path to read, or content as bytes :param client_certificate: Client certificate path to read, or content as bytes :param client_key: Client key path to read, or content as bytes. If set to None but client_certificate is not None, assumes the key is included with the certificate (p12 or PEM certificate)
- Returns:
gRPC channel for communication
- Return type:
Channel
armonik.common.enumwrapper module
- class armonik.common.enumwrapper.Direction(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)
Bases:
IntEnum
- class armonik.common.enumwrapper.EventTypes(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)
Bases:
IntEnum- classmethod from_string(name: str)
- class armonik.common.enumwrapper.HealthCheckStatus(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)
Bases:
IntEnum
- class armonik.common.enumwrapper.ResultStatus(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)
Bases:
IntEnum- static name_from_value(status: armonik.protogen.common.result_status_pb2.ResultStatus) str
- class armonik.common.enumwrapper.ServiceHealthCheckStatus(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)
Bases:
IntEnum
- class armonik.common.enumwrapper.SessionStatus(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)
Bases:
IntEnum- static name_from_value(status: armonik.protogen.common.session_status_pb2.SessionStatus) str
- class armonik.common.enumwrapper.TaskStatus(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)
Bases:
IntEnum
armonik.common.events module
- class armonik.common.events.NewResultEvent(result_id: 'str', owner_id: 'str', status: 'ResultStatus')
Bases:
Event- owner_id: str
- result_id: str
- status: ResultStatus
- class armonik.common.events.NewTaskEvent(task_id: 'str', payload_id: 'str', origin_task_id: 'str', status: 'TaskStatus', expected_output_keys: 'List[str]', data_dependencies: 'List[str]', retry_of_ids: 'List[str]', parent_task_ids: 'List[str]')
Bases:
Event- data_dependencies: List[str]
- expected_output_keys: List[str]
- origin_task_id: str
- parent_task_ids: List[str]
- payload_id: str
- retry_of_ids: List[str]
- status: TaskStatus
- task_id: str
- class armonik.common.events.ResultOwnerUpdateEvent(result_id: 'str', previous_owner_id: 'str', current_owner_id: 'str')
Bases:
Event- current_owner_id: str
- previous_owner_id: str
- result_id: str
- class armonik.common.events.ResultStatusUpdateEvent(result_id: 'str', status: 'ResultStatus')
Bases:
Event- result_id: str
- status: ResultStatus
- class armonik.common.events.TaskStatusUpdateEvent(task_id: 'str', status: 'TaskStatus')
Bases:
Event- status: TaskStatus
- task_id: str
armonik.common.helpers module
- armonik.common.helpers.batched(iterable: Iterable[T], n: int) Iterable[List[T]]
Batches elements from an iterable into lists of size at most ‘n’.
- Parameters:
iterable – The input iterable.
n – The batch size.
- Yields:
A generator yielding batches of elements from the input iterable.
- armonik.common.helpers.datetime_to_timestamp(time_stamp: datetime | None) Timestamp
Helper function to convert a Python Datetime to a gRPC Timestamp
- Parameters:
time_stamp – Python datetime timestamp to convert
- Returns:
Equivalent gRPC Timestamp
- armonik.common.helpers.duration_to_timedelta(delta: Duration) timedelta
Helper function to convert a gRPC Duration into a Python timedelta Note that timedelta has microseconds accuracy instead of nanosecond accuracy for gRPC Duration. Therefore, the conversion may not be lossless. :param delta: gRPC Duration to convert
- Returns:
Equivalent Python timedelta
- armonik.common.helpers.get_task_filter(session_ids: List[str] | None = None, task_ids: List[str] | None = None, included_statuses: List[TaskStatus] | None = None, excluded_statuses: List[TaskStatus] | None = None) armonik.protogen.common.submitter_common_pb2.TaskFilter
Helper function to generate a task filter from the parameters
- Parameters:
session_ids – Optional list of session Ids to filter against, mutually exclusive with task_ids
task_ids – Optional list of task ids to filter against, mutually exclusive with session_ids
included_statuses – Optional list of task statuses to include in the filter, mutually exclusive with excluded_statuses
excluded_statuses – Optional list of task statuses to exclude in the filter, mutually exclusive with included_statuses
- Returns:
Task filter to be used in a gRPC call to filter tasks
- armonik.common.helpers.timedelta_to_duration(delta: timedelta) Duration
Helper function to convert a gRPC Duration to a Python Datetime
- Parameters:
delta – Python timedelta to convert
- Returns:
Equivalent gRPC Duration
- armonik.common.helpers.timestamp_to_datetime(time_stamp: Timestamp) datetime | None
Helper function to convert a gRPC Timestamp to a Python Datetime Note that datetime has microseconds accuracy instead of nanosecond accuracy for gRPC Timestamp Therefore, the conversion may not be lossless. :param time_stamp: gRPC Timestamp to convert
- Returns:
Equivalent Python Datetime