Skip to content

Commit 02dc53c

Browse files
Add Pulsar plugin (#345)
1 parent 32fbd35 commit 02dc53c

17 files changed

+485
-5
lines changed

.gitmodules

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@
1818
#
1919
[submodule "protocol"]
2020
path = protocol
21-
url = https://github.com/apache/skywalking-data-collect-protocol
21+
url = https://github.com/apache/skywalking-data-collect-protocol.git

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
- Plugins:
1010
- Add neo4j plugin.(#312)
11+
- Add pulsar plugin.(#345)
1112

1213
- Fixes:
1314
- Fix unexpected 'No active span' IllegalStateError (#311)

Makefile

+3-1
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,11 @@ poetry:
4343
ifeq ($(OS),Windows)
4444
-powershell (Invoke-WebRequest -Uri https://install.python-poetry.org -UseBasicParsing).Content | py -
4545
poetry self update
46-
else
46+
else ifeq ($(OS),Darwin)
4747
-curl -sSL https://install.python-poetry.org | python3 -
4848
poetry self update || $(MAKE) poetry-fallback
49+
else
50+
-curl -sSL https://install.python-poetry.org | python3 - --version 1.5.1
4951
endif
5052

5153
.PHONY: gen

docs/en/setup/Plugins.md

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ or a limitation of SkyWalking auto-instrumentation (welcome to contribute!)
3636
| [neo4j](https://neo4j.com/docs/python-manual/5/) | Python >=3.7 - ['5.*']; | `sw_neo4j` |
3737
| [psycopg[binary]](https://www.psycopg.org/) | Python >=3.11 - ['3.1.*']; Python >=3.7 - ['3.0.18', '3.1.*']; | `sw_psycopg` |
3838
| [psycopg2-binary](https://www.psycopg.org/) | Python >=3.10 - NOT SUPPORTED YET; Python >=3.7 - ['2.9']; | `sw_psycopg2` |
39+
| [pulsar-client](https://github.com/apache/pulsar-client-python) | Python >=3.8 - ['3.3.0']; | `sw_pulsar` |
3940
| [pymongo](https://pymongo.readthedocs.io) | Python >=3.7 - ['3.11.*']; | `sw_pymongo` |
4041
| [pymysql](https://pymysql.readthedocs.io/en/latest/) | Python >=3.7 - ['1.0']; | `sw_pymysql` |
4142
| [pyramid](https://trypyramid.com) | Python >=3.7 - ['1.10', '2.0']; | `sw_pyramid` |

docs/en/setup/advanced/LogReporter.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Log reporter supports all three protocols including `grpc`, `http` and `kafka`,
99
If chosen `http` protocol, the logs will be batch-reported to the collector REST endpoint `oap/v3/logs`.
1010

1111
If chosen `kafka` protocol, please make sure to config
12-
[kafka-fetcher](https://skywalking.apache.org/docs/main/v9.1.0/en/setup/backend/kafka-fetcher/)
12+
[kafka-fetcher](https://skywalking.apache.org/docs/main/v10.0.1/en/setup/backend/kafka-fetcher/)
1313
on the OAP side, and make sure Python agent config `kafka_bootstrap_servers` points to your Kafka brokers.
1414

1515
**Please make sure OAP is consuming the same Kafka topic as your agent produces to, `kafka_namespace` must match OAP side configuration `plugin.kafka.namespace`**

poetry.lock

+48-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

+1
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ loguru = "^0.6.0"
156156
httpx = "^0.23.3"
157157
confluent-kafka = "^2.0.2"
158158
neo4j = "^5.9.0"
159+
pulsar-client = "3.3.0"
159160

160161
[tool.poetry.group.lint.dependencies]
161162
pylint = '2.13.9'

skywalking/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ class Component(Enum):
3636
KafkaConsumer = 41
3737
RabbitmqProducer = 52
3838
RabbitmqConsumer = 53
39+
PulsarProducer = 73
40+
PulsarConsumer = 74
3941
Elasticsearch = 47
4042
HBase = 94
4143
Neo4j = 112

skywalking/plugins/sw_pulsar.py

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
from skywalking import Layer, Component
18+
from skywalking.trace.carrier import Carrier
19+
from skywalking.trace.context import get_context
20+
from skywalking.trace.tags import TagMqTopic, TagMqBroker
21+
22+
link_vector = ['https://github.com/apache/pulsar-client-python']
23+
support_matrix = {
24+
'pulsar-client': {
25+
'>=3.8': ['3.3.0']
26+
}
27+
}
28+
note = """"""
29+
30+
31+
def install():
32+
from pulsar import Producer
33+
from pulsar import Consumer
34+
from pulsar import Client
35+
36+
__init = Client.__init__
37+
_send = Producer.send
38+
_receive = Consumer.receive
39+
_peer = ''
40+
41+
def get_peer():
42+
return _peer
43+
44+
def set_peer(value):
45+
nonlocal _peer
46+
_peer = value
47+
48+
def _sw_init(self, service_url):
49+
__init(self, service_url)
50+
set_peer(service_url)
51+
52+
def _sw_send_func(_send):
53+
def _sw_send(this, content,
54+
properties=None,
55+
partition_key=None,
56+
sequence_id=None,
57+
replication_clusters=None,
58+
disable_replication=False,
59+
event_timestamp=None,
60+
deliver_at=None,
61+
deliver_after=None,
62+
):
63+
topic = this._producer.topic().split('/')[-1]
64+
with get_context().new_exit_span(op=f'Pulsar/Topic/{topic}/Producer', peer=get_peer(),
65+
component=Component.PulsarProducer) as span:
66+
span.tag(TagMqTopic(topic))
67+
span.tag(TagMqBroker(get_peer()))
68+
span.layer = Layer.MQ
69+
70+
carrier = span.inject()
71+
if properties is None:
72+
properties = {}
73+
for item in carrier:
74+
properties[item.key] = item.val
75+
76+
return _send(this, content, properties=properties, partition_key=partition_key,
77+
sequence_id=sequence_id, replication_clusters=replication_clusters,
78+
disable_replication=disable_replication, event_timestamp=event_timestamp,
79+
deliver_at=deliver_at, deliver_after=deliver_after)
80+
81+
return _sw_send
82+
83+
def _sw_receive_func(_receive):
84+
def _sw_receive(this, timeout_millis=None):
85+
res = _receive(this, timeout_millis=timeout_millis)
86+
if res:
87+
topic = res.topic_name().split('/')[-1]
88+
properties = res.properties()
89+
carrier = Carrier()
90+
for item in carrier:
91+
if item.key in properties.keys():
92+
val = res.properties().get(item.key)
93+
if val is not None:
94+
item.val = val
95+
96+
with get_context().new_entry_span(op=f'Pulsar/Topic/{topic}/Consumer', carrier=carrier) as span:
97+
span.tag(TagMqTopic(topic))
98+
span.tag(TagMqBroker(get_peer()))
99+
span.layer = Layer.MQ
100+
span.component = Component.PulsarConsumer
101+
return res
102+
103+
return _sw_receive
104+
105+
Client.__init__ = _sw_init
106+
Producer.send = _sw_send_func(_send)
107+
Consumer.receive = _sw_receive_func(_receive)
+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
version: '2.1'
19+
20+
services:
21+
collector:
22+
extends:
23+
service: collector
24+
file: ../../docker-compose.base.yml
25+
26+
pulsar-server:
27+
image: apachepulsar/pulsar:3.2.0
28+
hostname: pulsar-server
29+
ports:
30+
- 6650:6650
31+
- 8080:8080
32+
networks:
33+
- beyond
34+
command: ["bash","-c", "bin/pulsar standalone"]
35+
healthcheck:
36+
test: ["CMD", "nc", "-nz", "127.0.0.1", "8080"]
37+
interval: 5s
38+
timeout: 60s
39+
retries: 120
40+
41+
producer:
42+
extends:
43+
service: agent
44+
file: ../../docker-compose.base.yml
45+
ports:
46+
- 9090:9090
47+
volumes:
48+
- .:/app
49+
command: ['bash', '-c', 'pip install flask && pip install -r /app/requirements.txt && sw-python run python3 /app/services/producer.py']
50+
healthcheck:
51+
test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9090"]
52+
interval: 5s
53+
timeout: 60s
54+
retries: 120
55+
depends_on:
56+
collector:
57+
condition: service_healthy
58+
pulsar-server:
59+
condition: service_healthy
60+
consumer:
61+
condition: service_healthy
62+
environment:
63+
SW_AGENT_NAME: producer
64+
SW_AGENT_LOGGING_LEVEL: INFO
65+
66+
consumer:
67+
extends:
68+
service: agent
69+
file: ../../docker-compose.base.yml
70+
ports:
71+
- 9091:9091
72+
volumes:
73+
- .:/app
74+
command: ['bash', '-c', 'pip install flask && pip install -r /app/requirements.txt && sw-python run python3 /app/services/consumer.py']
75+
healthcheck:
76+
test: ["CMD", "bash", "-c", "ps -ef | grep /app/services/consumer | grep -v grep"]
77+
interval: 5s
78+
timeout: 60s
79+
retries: 120
80+
depends_on:
81+
collector:
82+
condition: service_healthy
83+
pulsar-server:
84+
condition: service_healthy
85+
environment:
86+
SW_AGENT_NAME: consumer
87+
SW_AGENT_LOGGING_LEVEL: INFO
88+
89+
networks:
90+
beyond:

0 commit comments

Comments
 (0)