Skip to content

Commit 32b9daa

Browse files
piotrpioJarema
andauthored
[ADDED] Service api improvements (nats-io#1160)
Co-authored-by: Tomasz Pietrek <[email protected]>
1 parent 95a7e50 commit 32b9daa

15 files changed

+2151
-698
lines changed

js.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1528,13 +1528,11 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
15281528
}
15291529

15301530
// Find the stream mapped to the subject if not bound to a stream already.
1531-
if o.stream == _EMPTY_ {
1531+
if stream == _EMPTY_ {
15321532
stream, err = js.StreamNameBySubject(subj)
15331533
if err != nil {
15341534
return nil, err
15351535
}
1536-
} else {
1537-
stream = o.stream
15381536
}
15391537

15401538
// With an explicit durable name, we can lookup the consumer first

micro/example_package_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright 2022 The NATS Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package micro
15+
16+
import (
17+
"fmt"
18+
"log"
19+
"strconv"
20+
"time"
21+
22+
"github.com/nats-io/nats.go"
23+
)
24+
25+
func Example() {
26+
s := RunServerOnPort(-1)
27+
defer s.Shutdown()
28+
29+
nc, err := nats.Connect(s.ClientURL())
30+
if err != nil {
31+
log.Fatal(err)
32+
}
33+
defer nc.Close()
34+
35+
// Service handler is a function which takes Service.Request as argument.
36+
// req.Respond or req.Error should be used to respond to the request.
37+
incrementHandler := func(req *Request) error {
38+
val, err := strconv.Atoi(string(req.Data))
39+
if err != nil {
40+
req.Error("400", "request data should be a number", nil)
41+
return nil
42+
}
43+
44+
responseData := val + 1
45+
req.Respond([]byte(strconv.Itoa(responseData)))
46+
return nil
47+
}
48+
49+
config := Config{
50+
Name: "IncrementService",
51+
Version: "0.1.0",
52+
Description: "Increment numbers",
53+
Endpoint: Endpoint{
54+
// service handler
55+
Handler: incrementHandler,
56+
// a unique subject serving as a service endpoint
57+
Subject: "numbers.increment",
58+
},
59+
}
60+
// Multiple instances of the servcice with the same name can be created.
61+
// Requests to a service with the same name will be load-balanced.
62+
for i := 0; i < 5; i++ {
63+
svc, err := AddService(nc, config)
64+
if err != nil {
65+
log.Fatal(err)
66+
}
67+
defer svc.Stop()
68+
}
69+
70+
// send a request to a service
71+
resp, err := nc.Request("numbers.increment", []byte("3"), 1*time.Second)
72+
if err != nil {
73+
log.Fatal(err)
74+
}
75+
responseVal, err := strconv.Atoi(string(resp.Data))
76+
if err != nil {
77+
log.Fatal(err)
78+
}
79+
fmt.Println(responseVal)
80+
81+
//
82+
// Output: 4
83+
//
84+
}

micro/example_test.go

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
// Copyright 2022 The NATS Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package micro
15+
16+
import (
17+
"fmt"
18+
"log"
19+
"reflect"
20+
21+
"github.com/nats-io/nats.go"
22+
)
23+
24+
func ExampleAddService() {
25+
nc, err := nats.Connect("127.0.0.1:4222")
26+
if err != nil {
27+
log.Fatal(err)
28+
}
29+
defer nc.Close()
30+
31+
echoHandler := func(req *Request) error {
32+
req.Respond(req.Data)
33+
return nil
34+
}
35+
36+
config := Config{
37+
Name: "EchoService",
38+
Version: "v1.0.0",
39+
Description: "Send back what you receive",
40+
Endpoint: Endpoint{
41+
Subject: "echo",
42+
Handler: echoHandler,
43+
},
44+
45+
// DoneHandler can be set to customize behavior on stopping a service.
46+
DoneHandler: func(srv Service) {
47+
info := srv.Info()
48+
fmt.Printf("stopped service %q with ID %q\n", info.Name, info.ID)
49+
},
50+
51+
// ErrorHandler can be used to customize behavior on service execution error.
52+
ErrorHandler: func(srv Service, err *NATSError) {
53+
info := srv.Info()
54+
fmt.Printf("Service %q returned an error on subject %q: %s", info.Name, err.Subject, err.Description)
55+
},
56+
}
57+
58+
srv, err := AddService(nc, config)
59+
if err != nil {
60+
log.Fatal(err)
61+
}
62+
defer srv.Stop()
63+
}
64+
65+
func ExampleService_Info() {
66+
nc, err := nats.Connect("127.0.0.1:4222")
67+
if err != nil {
68+
log.Fatal(err)
69+
}
70+
defer nc.Close()
71+
72+
config := Config{
73+
Name: "EchoService",
74+
Endpoint: Endpoint{
75+
Subject: "echo",
76+
Handler: func(*Request) error { return nil },
77+
},
78+
}
79+
80+
srv, _ := AddService(nc, config)
81+
82+
// service info
83+
info := srv.Info()
84+
85+
fmt.Println(info.ID)
86+
fmt.Println(info.Name)
87+
fmt.Println(info.Description)
88+
fmt.Println(info.Version)
89+
fmt.Println(info.Subject)
90+
}
91+
92+
func ExampleService_Stats() {
93+
nc, err := nats.Connect("127.0.0.1:4222")
94+
if err != nil {
95+
log.Fatal(err)
96+
}
97+
defer nc.Close()
98+
99+
config := Config{
100+
Name: "EchoService",
101+
Version: "0.1.0",
102+
Endpoint: Endpoint{
103+
Subject: "echo",
104+
Handler: func(*Request) error { return nil },
105+
},
106+
}
107+
108+
srv, _ := AddService(nc, config)
109+
110+
// stats of a service instance
111+
stats := srv.Stats()
112+
113+
fmt.Println(stats.AverageProcessingTime)
114+
fmt.Println(stats.ProcessingTime)
115+
116+
}
117+
118+
func ExampleService_Stop() {
119+
nc, err := nats.Connect("127.0.0.1:4222")
120+
if err != nil {
121+
log.Fatal(err)
122+
}
123+
defer nc.Close()
124+
125+
config := Config{
126+
Name: "EchoService",
127+
Version: "0.1.0",
128+
Endpoint: Endpoint{
129+
Subject: "echo",
130+
Handler: func(*Request) error { return nil },
131+
},
132+
}
133+
134+
srv, _ := AddService(nc, config)
135+
136+
// stop a service
137+
err = srv.Stop()
138+
if err != nil {
139+
log.Fatal(err)
140+
}
141+
142+
// stop is idempotent so multiple executions will not return an error
143+
err = srv.Stop()
144+
if err != nil {
145+
log.Fatal(err)
146+
}
147+
}
148+
149+
func ExampleService_Stopped() {
150+
nc, err := nats.Connect("127.0.0.1:4222")
151+
if err != nil {
152+
log.Fatal(err)
153+
}
154+
defer nc.Close()
155+
156+
config := Config{
157+
Name: "EchoService",
158+
Version: "0.1.0",
159+
Endpoint: Endpoint{
160+
Subject: "echo",
161+
Handler: func(*Request) error { return nil },
162+
},
163+
}
164+
165+
srv, _ := AddService(nc, config)
166+
167+
// stop a service
168+
err = srv.Stop()
169+
if err != nil {
170+
log.Fatal(err)
171+
}
172+
173+
if srv.Stopped() {
174+
fmt.Println("service stopped")
175+
}
176+
}
177+
178+
func ExampleService_Reset() {
179+
nc, err := nats.Connect("127.0.0.1:4222")
180+
if err != nil {
181+
log.Fatal(err)
182+
}
183+
defer nc.Close()
184+
185+
config := Config{
186+
Name: "EchoService",
187+
Version: "0.1.0",
188+
Endpoint: Endpoint{
189+
Subject: "echo",
190+
Handler: func(*Request) error { return nil },
191+
},
192+
}
193+
194+
srv, _ := AddService(nc, config)
195+
196+
// reset endpoint stats on this service
197+
srv.Reset()
198+
199+
empty := Stats{
200+
ServiceIdentity: srv.Info().ServiceIdentity,
201+
}
202+
if !reflect.DeepEqual(srv.Stats(), empty) {
203+
log.Fatal("Expected endpoint stats to be empty")
204+
}
205+
}
206+
207+
func ExampleControlSubject() {
208+
209+
// subject used to get PING from all services
210+
subjectPINGAll, _ := ControlSubject(PingVerb, "", "")
211+
fmt.Println(subjectPINGAll)
212+
213+
// subject used to get PING from services with provided name
214+
subjectPINGName, _ := ControlSubject(PingVerb, "CoolService", "")
215+
fmt.Println(subjectPINGName)
216+
217+
// subject used to get PING from a service with provided name and ID
218+
subjectPINGInstance, _ := ControlSubject(PingVerb, "CoolService", "123")
219+
fmt.Println(subjectPINGInstance)
220+
221+
// Output:
222+
// $SRV.PING
223+
// $SRV.PING.COOLSERVICE
224+
// $SRV.PING.COOLSERVICE.123
225+
}
226+
227+
func ExampleRequest_Respond() {
228+
handler := func(req *Request) {
229+
// respond to the request
230+
if err := req.Respond(req.Data); err != nil {
231+
log.Fatal(err)
232+
}
233+
}
234+
235+
fmt.Printf("%T", handler)
236+
}
237+
238+
func ExampleRequest_RespondJSON() {
239+
type Point struct {
240+
X int `json:"x"`
241+
Y int `json:"y"`
242+
}
243+
244+
handler := func(req *Request) {
245+
resp := Point{5, 10}
246+
// respond to the request
247+
// response will be serialized to {"x":5,"y":10}
248+
if err := req.RespondJSON(resp); err != nil {
249+
log.Fatal(err)
250+
}
251+
}
252+
253+
fmt.Printf("%T", handler)
254+
}
255+
256+
func ExampleRequest_Error() {
257+
handler := func(req *Request) error {
258+
// respond with an error
259+
// Error sets Nats-Service-Error and Nats-Service-Error-Code headers in the response
260+
if err := req.Error("400", "bad request", []byte(`{"error": "value should be a number"}`)); err != nil {
261+
return err
262+
}
263+
return nil
264+
}
265+
266+
fmt.Printf("%T", handler)
267+
}

0 commit comments

Comments
 (0)