Skip to content

Commit aa27bf0

Browse files
author
Denis Abrantes
committed
fixed sentiment_analysis sample
1 parent 13495a4 commit aa27bf0

19 files changed

+842
-273
lines changed

sentiment-analysis/container/deploy/Dockerfile

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ COPY requirements.txt /app
66
RUN pip install -r requirements.txt
77

88
COPY deploy.py /app
9+
COPY common.py /app
910
COPY finbert_handler_grpc.py /app
1011

1112
ENV PYTHONUNBUFFERED=1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,360 @@
1+
import argparse
2+
import os
3+
import shutil
4+
import time
5+
from functools import partial
6+
7+
import yaml
8+
from determined.common.experimental import ModelVersion
9+
from google.cloud import storage
10+
from kserve import (
11+
V1beta1InferenceService,
12+
V1beta1InferenceServiceSpec,
13+
V1beta1PredictorSpec,
14+
V1beta1TorchServeSpec,
15+
constants,
16+
)
17+
from kubernetes import client
18+
from kubernetes.client import V1ResourceRequirements, V1Toleration
19+
20+
# =====================================================================================
21+
22+
csv_ = partial(str.split, sep=",")
23+
24+
25+
def parse_args():
26+
parser = argparse.ArgumentParser(description="Deploy a model to KServe")
27+
parser.add_argument(
28+
"--deployment-name",
29+
type=str,
30+
help="Name of the resulting KServe InferenceService",
31+
required=True,
32+
)
33+
parser.add_argument(
34+
"--wait",
35+
type=bool,
36+
help="Wait for the inference service to be ready before exiting. Only availble for cloud models",
37+
default=False,
38+
)
39+
parser.add_argument(
40+
"--cloud-model-host",
41+
type=str,
42+
help="aws and gcp supported currently for storing model artifacts",
43+
default=None,
44+
choices=["gcp", "aws"],
45+
)
46+
parser.add_argument(
47+
"--cloud-model-bucket",
48+
type=str,
49+
help="Cloud Bucket name to use for storing model artifacts",
50+
default=None,
51+
)
52+
parser.add_argument(
53+
"--google-application-credentials",
54+
type=str,
55+
help="Path to Google Application Credentials file",
56+
default=None,
57+
)
58+
parser.add_argument(
59+
"--service-account-name",
60+
type=str,
61+
help="For non-cloud deploys, the Service Account Name for Pachyderm Access",
62+
default=None,
63+
)
64+
parser.add_argument(
65+
"--k8s-config-file",
66+
type=str,
67+
help="The path to the k8s config file",
68+
default=None,
69+
)
70+
parser.add_argument(
71+
"--tolerations",
72+
type=csv_,
73+
help="a comma separated list of tolerations to apply to the deployment in the format of key=value",
74+
default=None,
75+
)
76+
parser.add_argument(
77+
"--resource-requests",
78+
type=csv_,
79+
help="",
80+
default=None,
81+
)
82+
parser.add_argument(
83+
"--resource-limits",
84+
type=csv_,
85+
help="",
86+
default=None,
87+
)
88+
return parser.parse_args()
89+
90+
91+
# =====================================================================================
92+
93+
94+
def save_to_pfs(model_name, files):
95+
for file in files:
96+
if "config" in str(file):
97+
folder = "config"
98+
else:
99+
folder = "model-store"
100+
101+
prefix = f"{model_name}/{folder}/"
102+
os.makedirs("/pfs/out/" + prefix, exist_ok=True)
103+
shutil.copyfile(file, f"/pfs/out/{prefix}{file}")
104+
print("Save to output repo complete.")
105+
106+
107+
def upload_model_to_s3(model_name, files, bucket_name):
108+
import boto3
109+
110+
storage_client = boto3.client("s3")
111+
for file in files:
112+
if "config" in str(file):
113+
folder = "config"
114+
else:
115+
folder = "model-store"
116+
117+
prefix = f"{model_name}/{folder}/"
118+
storage_client.upload_file("./" + file, bucket_name, prefix + file)
119+
120+
print("Upload to S3 complete.")
121+
122+
123+
def upload_model_to_gcs(model_name, files, bucket_name):
124+
storage_client = storage.Client()
125+
126+
bucket = storage_client.get_bucket(bucket_name)
127+
128+
for file in files:
129+
if "config" in str(file):
130+
folder = "config"
131+
else:
132+
folder = "model-store"
133+
blob = bucket.blob(model_name + "/" + folder + "/" + file)
134+
blob.upload_from_filename("./" + file)
135+
136+
print("Upload to GCS complete.")
137+
138+
139+
def upload_model(model_name, files, cloud_provider=None, bucket_name=None):
140+
if not cloud_provider:
141+
save_to_pfs(model_name, files)
142+
return
143+
print(
144+
f"Uploading model files to model repository to cloud provider {cloud_provider} in bucket {bucket_name}..."
145+
)
146+
if cloud_provider.lower() == "gcp":
147+
upload_model_to_gcs(model_name, files, bucket_name)
148+
elif cloud_provider.lower() == "aws":
149+
upload_model_to_s3(model_name, files, bucket_name)
150+
else:
151+
raise Exception(f"Invalid cloud provider {cloud_provider} specified")
152+
153+
154+
# =====================================================================================
155+
156+
157+
def wait_for_deployment(KServe, k8s_namespace, deployment_name, model_name):
158+
while not KServe.is_isvc_ready(deployment_name, namespace=k8s_namespace):
159+
print(
160+
f"Inference Service '{deployment_name}' is NOT READY. Waiting..."
161+
)
162+
time.sleep(5)
163+
print(
164+
f"Inference Service '{deployment_name}' in Namespace '{k8s_namespace}' is READY."
165+
)
166+
response = KServe.get(deployment_name, namespace=k8s_namespace)
167+
print(
168+
"Model "
169+
+ model_name
170+
+ " is "
171+
+ str(response["status"]["modelStatus"]["states"]["targetModelState"])
172+
+ " and available at "
173+
+ str(response["status"]["address"]["url"])
174+
+ " for predictions."
175+
)
176+
177+
178+
# =====================================================================================
179+
180+
181+
def get_version(client, model_name, model_version) -> ModelVersion:
182+
for version in client.get_model(model_name).get_versions():
183+
if version.name == model_version:
184+
return version
185+
186+
raise AssertionError(
187+
f"Version '{model_version}' not found inside model '{model_name}'"
188+
)
189+
190+
191+
# =====================================================================================
192+
193+
194+
def create_inference_service(
195+
kclient,
196+
k8s_namespace,
197+
model_name,
198+
deployment_name,
199+
pach_id,
200+
replace: bool,
201+
cloud_provider=None,
202+
bucket_name=None,
203+
tolerations=None,
204+
resource_requirements={"requests": {}, "limits": {}},
205+
sa=None,
206+
):
207+
repo = os.environ["PPS_PIPELINE_NAME"]
208+
project = os.environ["PPS_PROJECT_NAME"]
209+
commit = os.environ["PACH_JOB_ID"]
210+
kserve_version = "v1beta1"
211+
api_version = constants.KSERVE_GROUP + "/" + kserve_version
212+
tol = []
213+
if tolerations:
214+
for toleration in tolerations:
215+
key, value = toleration.split("=")
216+
tol.append(
217+
V1Toleration(
218+
effect="NoSchedule",
219+
key=key,
220+
value=value,
221+
operator="Equal",
222+
)
223+
)
224+
if cloud_provider == "gcp":
225+
predictor_spec = V1beta1PredictorSpec(
226+
tolerations=tol,
227+
pytorch=(
228+
V1beta1TorchServeSpec(
229+
protocol_version="v2",
230+
storage_uri=f"gs://{bucket_name}/{model_name}",
231+
resources=(
232+
V1ResourceRequirements(
233+
requests=resource_requirements["requests"],
234+
limits=resource_requirements["limits"],
235+
)
236+
),
237+
)
238+
),
239+
)
240+
elif cloud_provider == "aws":
241+
predictor_spec = V1beta1PredictorSpec(
242+
tolerations=tol,
243+
pytorch=(
244+
V1beta1TorchServeSpec(
245+
protocol_version="v2",
246+
storage_uri=f"s3://{bucket_name}/{model_name}",
247+
resources=(
248+
V1ResourceRequirements(
249+
requests=resource_requirements["requests"],
250+
limits=resource_requirements["limits"],
251+
)
252+
),
253+
)
254+
),
255+
)
256+
else:
257+
predictor_spec = V1beta1PredictorSpec(
258+
tolerations=tol,
259+
pytorch=(
260+
V1beta1TorchServeSpec(
261+
protocol_version="v2",
262+
storage_uri=f"s3://{commit}.master.{repo}.{project}/{model_name}",
263+
resources=(
264+
V1ResourceRequirements(
265+
requests=resource_requirements["requests"],
266+
limits=resource_requirements["limits"],
267+
)
268+
),
269+
)
270+
),
271+
service_account_name=sa,
272+
)
273+
isvc = V1beta1InferenceService(
274+
api_version=api_version,
275+
kind=constants.KSERVE_KIND,
276+
metadata=client.V1ObjectMeta(
277+
name=deployment_name,
278+
namespace=k8s_namespace,
279+
annotations={
280+
"sidecar.istio.io/inject": "false",
281+
"pach_id": pach_id,
282+
},
283+
),
284+
spec=V1beta1InferenceServiceSpec(predictor=predictor_spec),
285+
)
286+
if replace:
287+
print("Replacing InferenceService with new version...")
288+
kclient.replace(deployment_name, isvc)
289+
print(f"InferenceService replaced with new version '{pach_id}'.")
290+
else:
291+
print(f"Creating KServe InferenceService for model '{model_name}'.")
292+
kclient.create(isvc)
293+
print(f"Inference Service '{deployment_name}' created.")
294+
295+
296+
# =====================================================================================
297+
298+
299+
def check_existence(kclient, deployment_name, k8s_namespace):
300+
print(
301+
f"Checking if previous version of InferenceService '{deployment_name}' exists..."
302+
)
303+
304+
try:
305+
kclient.get(deployment_name, namespace=k8s_namespace)
306+
exists = True
307+
print(
308+
f"Previous version of InferenceService '{deployment_name}' exists."
309+
)
310+
except RuntimeError:
311+
exists = False
312+
print(
313+
f"Previous version of InferenceService '{deployment_name}' does not exist."
314+
)
315+
316+
return exists
317+
318+
319+
# =====================================================================================
320+
321+
322+
class DeterminedInfo:
323+
def __init__(self):
324+
self.master = os.getenv("DET_MASTER")
325+
self.username = os.getenv("DET_USER")
326+
self.password = os.getenv("DET_PASSWORD")
327+
328+
329+
# =====================================================================================
330+
331+
332+
class KServeInfo:
333+
def __init__(self):
334+
self.namespace = os.getenv("KSERVE_NAMESPACE")
335+
336+
337+
# =====================================================================================
338+
339+
340+
class ModelInfo:
341+
def __init__(self, file):
342+
print(f"Reading model info file: {file}")
343+
info = {}
344+
with open(file, "r") as stream:
345+
try:
346+
info = yaml.safe_load(stream)
347+
348+
self.name = info["name"]
349+
self.version = info["version"]
350+
self.pipeline = info["pipeline"]
351+
self.repository = info["repo"]
352+
353+
print(
354+
f"Loaded model info: name='{self.name}', version='{self.version}', pipeline='{self.pipeline}', repo='{self.repository}'"
355+
)
356+
except yaml.YAMLError as exc:
357+
print(exc)
358+
359+
360+
# =====================================================================================

0 commit comments

Comments
 (0)