1
+ # dds cloudapi for DINO-X
2
+ from dds_cloudapi_sdk import Config
3
+ from dds_cloudapi_sdk import Client
4
+ from dds_cloudapi_sdk .tasks .dinox import DinoxTask
5
+ from dds_cloudapi_sdk .tasks .detection import DetectionTask
6
+ from dds_cloudapi_sdk import TextPrompt
7
+ from dds_cloudapi_sdk import DetectionModel
8
+ from dds_cloudapi_sdk import DetectionTarget
9
+
10
+ # using supervision for visualization
11
+ import cv2
12
+ import numpy as np
13
+ import supervision as sv
14
+ import os
15
+
16
+ """
17
+ Hyper Parameters
18
+ """
19
+ API_TOKEN = "Your API token"
20
+ VIDEO_PATH = "./assets/demo.mp4"
21
+ OUTPUT_PATH = "./annotated_demo_video.mp4"
22
+ TEXT_PROMPT = "wheel . eye . helmet . mouse . mouth . vehicle . steering wheel . ear . nose"
23
+
24
+ def process_video_with_dino_x ():
25
+ """
26
+ Process video using DINO-X object detection
27
+ """
28
+ # Step 1: Initialize config and client
29
+ config = Config (API_TOKEN )
30
+ client = Client (config )
31
+
32
+ # Prepare class mapping
33
+ classes = [x .strip ().lower () for x in TEXT_PROMPT .split ('.' ) if x ]
34
+ class_name_to_id = {name : id for id , name in enumerate (classes )}
35
+
36
+ # Open video
37
+ cap = cv2 .VideoCapture (VIDEO_PATH )
38
+
39
+ # Get video properties
40
+ width = int (cap .get (cv2 .CAP_PROP_FRAME_WIDTH ))
41
+ height = int (cap .get (cv2 .CAP_PROP_FRAME_HEIGHT ))
42
+ fps = int (cap .get (cv2 .CAP_PROP_FPS ))
43
+
44
+ # Initialize video writer
45
+ fourcc = cv2 .VideoWriter_fourcc (* 'mp4v' )
46
+ out = cv2 .VideoWriter (OUTPUT_PATH , fourcc , fps , (width , height ))
47
+
48
+ # Temporary frame for upload
49
+ temp_frame_path = "./temp_frame.jpg"
50
+
51
+ try :
52
+ # Process each frame
53
+ while cap .isOpened ():
54
+ ret , frame = cap .read ()
55
+ if not ret :
56
+ break
57
+
58
+ # Save current frame temporarily
59
+ cv2 .imwrite (temp_frame_path , frame )
60
+
61
+ # Upload and process frame
62
+ image_url = client .upload_file (temp_frame_path )
63
+ task = DinoxTask (
64
+ image_url = image_url ,
65
+ prompts = [TextPrompt (text = TEXT_PROMPT )]
66
+ )
67
+ client .run_task (task )
68
+ predictions = task .result .objects
69
+
70
+ # Decode prediction results
71
+ boxes = []
72
+ confidences = []
73
+ class_names = []
74
+ class_ids = []
75
+
76
+ for obj in predictions :
77
+ boxes .append (obj .bbox )
78
+ confidences .append (obj .score )
79
+ cls_name = obj .category .lower ().strip ()
80
+ class_names .append (cls_name )
81
+ class_ids .append (class_name_to_id [cls_name ])
82
+
83
+ boxes = np .array (boxes )
84
+ class_ids = np .array (class_ids )
85
+ labels = [
86
+ f"{ class_name } { confidence :.2f} "
87
+ for class_name , confidence
88
+ in zip (class_names , confidences )
89
+ ]
90
+
91
+ # Annotate frame
92
+ detections = sv .Detections (
93
+ xyxy = boxes ,
94
+ class_id = class_ids
95
+ )
96
+
97
+ box_annotator = sv .BoxAnnotator ()
98
+ annotated_frame = box_annotator .annotate (scene = frame .copy (), detections = detections )
99
+
100
+ label_annotator = sv .LabelAnnotator ()
101
+ annotated_frame = label_annotator .annotate (
102
+ scene = annotated_frame ,
103
+ detections = detections ,
104
+ labels = labels
105
+ )
106
+
107
+ # Write annotated frame
108
+ out .write (annotated_frame )
109
+
110
+ except Exception as e :
111
+ print (f"Error processing video: { e } " )
112
+
113
+ finally :
114
+ # Clean up resources
115
+ cap .release ()
116
+ out .release ()
117
+ cv2 .destroyAllWindows ()
118
+
119
+ # Remove temporary frame
120
+ if os .path .exists (temp_frame_path ):
121
+ os .remove (temp_frame_path )
122
+
123
+ print (f"Annotated video saved to { OUTPUT_PATH } " )
124
+
125
+ def main ():
126
+ process_video_with_dino_x ()
127
+
128
+ if __name__ == '__main__' :
129
+ main ()
0 commit comments