Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Updating Pytorch-Launcher component to work with pipelines v2 #11273

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions components/kubeflow/pytorch-launcher/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM python:3.6
FROM python:3.11

ADD requirements.txt requirements.txt

RUN pip install --no-cache-dir -r requirements.txt

ADD build /ml

ENTRYPOINT ["python", "/ml/launch_pytorchjob.py"]
ENTRYPOINT ["python", "/ml/src/launch_pytorchjob.py"]
49 changes: 0 additions & 49 deletions components/kubeflow/pytorch-launcher/component.yaml

This file was deleted.

2 changes: 2 additions & 0 deletions components/kubeflow/pytorch-launcher/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pyyaml
kubernetes
kubeflow-pytorchjob
kubeflow.training
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there minimal required version?

retrying
str2bool
219 changes: 114 additions & 105 deletions components/kubeflow/pytorch-launcher/sample.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import json
from typing import NamedTuple
from collections import namedtuple
import kfp
import kfp.dsl as dsl
from kfp import components
from kfp.dsl.types import Integer
from kfp import dsl
from typing import Optional
import uuid


def get_current_namespace():
Expand All @@ -18,11 +15,51 @@ def get_current_namespace():
return current_namespace


@dsl.component()
def create_master_spec() -> dict:
# Define master spec
master = {
"replicas": 1,
"restartPolicy": "OnFailure",
"template": {
"metadata": {
"annotations": {
# See https://github.com/kubeflow/website/issues/2011
"sidecar.istio.io/inject": "false"
}
},
"spec": {
"containers": [
{
"args": [
"--backend",
"gloo",
],
"image": "public.ecr.aws/pytorch-samples/pytorch_dist_mnist:latest",
"name": "pytorch",
"resources": {
"requests": {
"memory": "4Gi",
"cpu": "2000m",
},
"limits": {
"memory": "4Gi",
"cpu": "2000m",
},
},
}
],
},
},
}

return master


@dsl.component
def create_worker_spec(
worker_num: int = 0
) -> NamedTuple(
"CreatWorkerSpec", [("worker_spec", dict)]
):
) -> dict:
"""
Creates pytorch-job worker spec
"""
Expand Down Expand Up @@ -66,124 +103,96 @@ def create_worker_spec(
},
}

worker_spec_output = namedtuple(
"MyWorkerOutput", ["worker_spec"]
return worker

# container component description setting inputs and implementation
@dsl.container_component
def pytorch_job_launcher(
name: str,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd imagine code like would be inside the /src directory instead of sample.py file but I am not familiar enough with KFP codebase to support that.

kind: str = "PyTorchJob",
namespace: str = "kubeflow",
version: str = 'v2',
master_spec: dict = {},
worker_spec: dict = {},
job_timeout_minutes: int = 1440,
delete_after_done: bool = True,
clean_pod_policy: str = 'Running',
active_deadline_seconds: Optional[int] = None,
backoff_limit: Optional[int] = None,
ttl_seconds_after_finished: Optional[int] = None,
):
command_args = [
'--name', name,
'--kind', kind,
'--namespace', namespace,
'--version', version,
'--masterSpec', master_spec,
'--workerSpec', worker_spec,
'--jobTimeoutMinutes', job_timeout_minutes,
'--deleteAfterDone', delete_after_done,
'--cleanPodPolicy', clean_pod_policy,]
if active_deadline_seconds is not None and isinstance(active_deadline_seconds, int):
command_args.append(['--activeDeadlineSeconds', str(active_deadline_seconds)])
if backoff_limit is not None and isinstance(backoff_limit, int):
command_args.append(['--backoffLimit', str(backoff_limit)])
if ttl_seconds_after_finished is not None and isinstance(ttl_seconds_after_finished, int):
command_args.append(['--ttlSecondsAfterFinished', str(ttl_seconds_after_finished)])

return dsl.ContainerSpec(
image='quay.io/rh_ee_fwaters/kubeflow-pytorchjob-launcher:v2',
command=['python', '/ml/src/launch_pytorchjob.py'],
args=command_args
)
return worker_spec_output(worker)


worker_spec_op = components.func_to_container_op(
create_worker_spec,
base_image="python:slim",
)


@dsl.pipeline(
name="launch-kubeflow-pytorchjob",
description="An example to launch pytorch.",
)
def mnist_train(
namespace: str = get_current_namespace(),
def pytorch_job_pipeline(
kind: str = "PyTorchJob",
worker_replicas: int = 1,
ttl_seconds_after_finished: int = -1,
job_timeout_minutes: int = 600,
delete_after_done: bool = False,
ttl_seconds_after_finished: int = 3600,
job_timeout_minutes: int = 1440,
delete_after_done: bool = True,
clean_pod_policy: str ="Running"
):
pytorchjob_launcher_op = components.load_component_from_file(
"./component.yaml"
)

master = {
"replicas": 1,
"restartPolicy": "OnFailure",
"template": {
"metadata": {
"annotations": {
# See https://github.com/kubeflow/website/issues/2011
"sidecar.istio.io/inject": "false"
}
},
"spec": {
"containers": [
{
# To override default command
# "command": [
# "python",
# "/opt/mnist/src/mnist.py"
# ],
"args": [
"--backend",
"gloo",
],
# Or, create your own image from
# https://github.com/kubeflow/pytorch-operator/tree/master/examples/mnist
"image": "public.ecr.aws/pytorch-samples/pytorch_dist_mnist:latest",
"name": "pytorch",
"resources": {
"requests": {
"memory": "4Gi",
"cpu": "2000m",
# Uncomment for GPU
# "nvidia.com/gpu": 1,
},
"limits": {
"memory": "4Gi",
"cpu": "2000m",
# Uncomment for GPU
# "nvidia.com/gpu": 1,
},
},
}
],
# If imagePullSecrets required
# "imagePullSecrets": [
# {"name": "image-pull-secret"},
# ],
},
},
}

worker_spec_create = worker_spec_op(
worker_replicas
)

# Launch and monitor the job with the launcher
pytorchjob_launcher_op(
# Note: name needs to be a unique pytorchjob name in the namespace.
# Using RUN_ID_PLACEHOLDER is one way of getting something unique.
name=f"name-{kfp.dsl.RUN_ID_PLACEHOLDER}",

namespace = get_current_namespace()
worker_spec = create_worker_spec(worker_num=worker_replicas)
master_spec = create_master_spec()

result = pytorch_job_launcher(
name=f"mnist-train-{uuid.uuid4().hex[:8]}",
kind=kind,
namespace=namespace,
master_spec=master,
# pass worker_spec as a string because the JSON serializer will convert
# the placeholder for worker_replicas (which it sees as a string) into
# a quoted variable (eg a string) instead of an unquoted variable
# (number). If worker_replicas is quoted in the spec, it will break in
# k8s. See https://github.com/kubeflow/pipelines/issues/4776
worker_spec=worker_spec_create.outputs[
"worker_spec"
],
version="v2",
worker_spec=worker_spec.output,
master_spec=master_spec.output,
ttl_seconds_after_finished=ttl_seconds_after_finished,
job_timeout_minutes=job_timeout_minutes,
delete_after_done=delete_after_done,
clean_pod_policy=clean_pod_policy,
)


if __name__ == "__main__":
import kfp.compiler as compiler

pipeline_file = "test.tar.gz"
pipeline_file = "test.yaml"
print(
f"Compiling pipeline as {pipeline_file}"
)
compiler.Compiler().compile(
mnist_train, pipeline_file
pytorch_job_pipeline, pipeline_file
)

# # To run:
# client = kfp.Client()
# run = client.create_run_from_pipeline_package(
# pipeline_file,
# arguments={},
# run_name="test pytorchjob run"
# )
# print(f"Created run {run}")
# To run:
host="http://localhost:8080"
client = kfp.Client(host=host)
run = client.create_run_from_pipeline_package(
pipeline_file,
arguments={},
run_name="test pytorchjob run"
)
print(f"Created run {run}")
Loading
Loading