armonik.common package

Subpackages

Submodules

armonik.common.channel module

armonik.common.channel.create_channel(uri: str, *, options: Tuple[Tuple[str, str]] = None, certificate_authority: str | PathLike | bytes | None = None, client_certificate: str | PathLike | bytes | None = None, client_key: str | PathLike | bytes | None = None) grpc.Channel

Create a gRPC channel for communication with the ArmoniK control plane :param uri: URI of the channel. Will start a secure channel if the scheme contains “https”. If it contains “unix”, uses a unix socket. :param options: Options to pass to the channel :param certificate_authority: Certificate authority path to read, or content as bytes :param client_certificate: Client certificate path to read, or content as bytes :param client_key: Client key path to read, or content as bytes. If set to None but client_certificate is not None, assumes the key is included with the certificate (p12 or PEM certificate)

Returns:

gRPC channel for communication

Return type:

Channel

armonik.common.enumwrapper module

class armonik.common.enumwrapper.Direction(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: IntEnum

class armonik.common.enumwrapper.EventTypes(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: IntEnum

classmethod from_string(name: str)
class armonik.common.enumwrapper.HealthCheckStatus(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: IntEnum

class armonik.common.enumwrapper.ResultStatus(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: IntEnum

static name_from_value(status: armonik.protogen.common.result_status_pb2.ResultStatus) str
class armonik.common.enumwrapper.ServiceHealthCheckStatus(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: IntEnum

class armonik.common.enumwrapper.SessionStatus(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: IntEnum

static name_from_value(status: armonik.protogen.common.session_status_pb2.SessionStatus) str
class armonik.common.enumwrapper.TaskStatus(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: IntEnum

armonik.common.events module

class armonik.common.events.Event

Bases: object

classmethod from_raw_event(raw_event)
class armonik.common.events.NewResultEvent(result_id: 'str', owner_id: 'str', status: 'ResultStatus')

Bases: Event

owner_id: str
result_id: str
status: ResultStatus
class armonik.common.events.NewTaskEvent(task_id: 'str', payload_id: 'str', origin_task_id: 'str', status: 'TaskStatus', expected_output_keys: 'List[str]', data_dependencies: 'List[str]', retry_of_ids: 'List[str]', parent_task_ids: 'List[str]')

Bases: Event

data_dependencies: List[str]
expected_output_keys: List[str]
origin_task_id: str
parent_task_ids: List[str]
payload_id: str
retry_of_ids: List[str]
status: TaskStatus
task_id: str
class armonik.common.events.ResultOwnerUpdateEvent(result_id: 'str', previous_owner_id: 'str', current_owner_id: 'str')

Bases: Event

current_owner_id: str
previous_owner_id: str
result_id: str
class armonik.common.events.ResultStatusUpdateEvent(result_id: 'str', status: 'ResultStatus')

Bases: Event

result_id: str
status: ResultStatus
class armonik.common.events.TaskStatusUpdateEvent(task_id: 'str', status: 'TaskStatus')

Bases: Event

status: TaskStatus
task_id: str

armonik.common.helpers module

armonik.common.helpers.batched(iterable: Iterable[T], n: int) Iterable[List[T]]

Batches elements from an iterable into lists of size at most ‘n’.

Parameters:
  • iterable – The input iterable.

  • n – The batch size.

Yields:

A generator yielding batches of elements from the input iterable.

armonik.common.helpers.datetime_to_timestamp(time_stamp: datetime | None) Timestamp

Helper function to convert a Python Datetime to a gRPC Timestamp

Parameters:

time_stamp – Python datetime timestamp to convert

Returns:

Equivalent gRPC Timestamp

armonik.common.helpers.duration_to_timedelta(delta: Duration) timedelta

Helper function to convert a gRPC Duration into a Python timedelta Note that timedelta has microseconds accuracy instead of nanosecond accuracy for gRPC Duration. Therefore, the conversion may not be lossless. :param delta: gRPC Duration to convert

Returns:

Equivalent Python timedelta

armonik.common.helpers.get_task_filter(session_ids: List[str] | None = None, task_ids: List[str] | None = None, included_statuses: List[TaskStatus] | None = None, excluded_statuses: List[TaskStatus] | None = None) armonik.protogen.common.submitter_common_pb2.TaskFilter

Helper function to generate a task filter from the parameters

Parameters:
  • session_ids – Optional list of session Ids to filter against, mutually exclusive with task_ids

  • task_ids – Optional list of task ids to filter against, mutually exclusive with session_ids

  • included_statuses – Optional list of task statuses to include in the filter, mutually exclusive with excluded_statuses

  • excluded_statuses – Optional list of task statuses to exclude in the filter, mutually exclusive with included_statuses

Returns:

Task filter to be used in a gRPC call to filter tasks

armonik.common.helpers.timedelta_to_duration(delta: timedelta) Duration

Helper function to convert a gRPC Duration to a Python Datetime

Parameters:

delta – Python timedelta to convert

Returns:

Equivalent gRPC Duration

armonik.common.helpers.timestamp_to_datetime(time_stamp: Timestamp) datetime | None

Helper function to convert a gRPC Timestamp to a Python Datetime Note that datetime has microseconds accuracy instead of nanosecond accuracy for gRPC Timestamp Therefore, the conversion may not be lossless. :param time_stamp: gRPC Timestamp to convert

Returns:

Equivalent Python Datetime

armonik.common.objects module

Module contents