Skip to content

Commit 547e522

Browse files
authored
Support event-filter (voiceip#10)
* Support event-filter * Update README.md
1 parent 12898d5 commit 547e522

File tree

4 files changed

+68
-11
lines changed

4 files changed

+68
-11
lines changed

README.md

+5-2
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
11
# mod_event_kafka
22
Freeswitch Kafka Plugin
33

4-
![Build Status](https://travis-ci.com/voiceip/mod_event_kafka.svg?branch=master)
4+
[![Build Status](https://github.com/voiceip/mod_event_kafka/actions/workflows/main.yml/badge.svg?branch=master)](https://github.com/voiceip/mod_event_kafka/actions/workflows/main.yml)
55

66
Install this plugin to publish all of the freeswitch generated events to Kafka reliably from the freeswitch server. To enable just configure the `event_kafka.conf.xml`
77

88
```xml
99
<configuration name="event_kafka.conf" description="Kafka Event Configuration">
1010
<settings>
1111
<param name="bootstrap-servers" value="localhost:9092"/>
12-
<param name="topic-prefix" value="topic_prefix"/>
12+
<param name="topic" value="kafa-topic-name" />
13+
<param name="username" value="" />
14+
<param name="password" value="" />
1315
<param name="buffer-size" value="256" />
1416
<param name="compression" value="snappy"/>
17+
<param name="event-filter" value=""/>
1518
</settings>
1619
</configuration>
1720
```

event_kafka.conf.xml

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22
<settings>
33
<param name="bootstrap-servers" value="localhost:9092"/>
44
<param name="topic-prefix" value="topic_prefix"/>
5-
<param name="topic" value="" /> <!-- set either topic-prefix or topic, incase both are defiend topic value would be used. -->
5+
<param name="topic" value="" /> <!-- set either topic-prefix or topic, incase both are defined topic value would be used. -->
66
<param name="username" value="" /> <!-- set it only if you have sasl enabled on your kafka cluster -->
77
<param name="password" value="" />
88
<param name="buffer-size" value="16" />
99
<param name="compression" value="snappy"/>
10+
<param name="event-filter" value=""/> <!-- set it to a valid list of events if you want to filter. Default empty doesn't filter any event -->
1011
</settings>
1112
</configuration>

mod_event_kafka.cpp

+51-8
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ namespace mod_event_kafka {
5959
10, NULL, "buffer-size", "queue.buffering.max.messages"),
6060
SWITCH_CONFIG_ITEM("compression", SWITCH_CONFIG_STRING, CONFIG_RELOADABLE, &globals.compression,
6161
"snappy", NULL, "snappy / lz4 ", "Compression"),
62+
SWITCH_CONFIG_ITEM("event-filter", SWITCH_CONFIG_STRING, CONFIG_RELOADABLE, &globals.event_filter,
63+
"", NULL, "comma separated value of event names", "Event Filter"),
6264
SWITCH_CONFIG_ITEM_END()
6365
};
6466

@@ -252,14 +254,49 @@ namespace mod_event_kafka {
252254

253255
KafkaModule(switch_loadable_module_interface_t **module_interface, switch_memory_pool_t *pool): _publisher() {
254256

255-
// Subscribe to all switch events of any subclass
256-
// Store a pointer to ourself in the user data
257-
if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler,
258-
static_cast<void*>(&_publisher), &_node)
259-
!= SWITCH_STATUS_SUCCESS) {
260-
throw std::runtime_error("Couldn't bind to switch events.");
257+
char *event_filter_name[SWITCH_EVENT_ALL];
258+
char *switch_event_custom = (char*) std::string("SWITCH_EVENT_CUSTOM::").c_str();
259+
profile.event_subscriptions = switch_separate_string(globals.event_filter, ',', event_filter_name, (sizeof(event_filter_name) / sizeof(event_filter_name[0])));
260+
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Found %d subscriptions\n", profile.event_subscriptions);
261+
for (int i = 0; i < profile.event_subscriptions; i++) {
262+
if (switch_name_event(event_filter_name[i], &(profile.event_ids[i])) != SWITCH_STATUS_SUCCESS && !switch_strstr(event_filter_name[i],switch_event_custom) ) {
263+
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "The switch event %s was not recognised.\n", event_filter_name[i]);
264+
} else {
265+
// switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Found subscription for %s event.\n", argv[arg]);
266+
}
267+
}
268+
269+
if (profile.event_subscriptions > 0 ) {
270+
/* Subscribe events */
271+
for (int i = 0; i < profile.event_subscriptions; i++) {
272+
if ( switch_strstr(event_filter_name[i], switch_event_custom)) {
273+
if (switch_event_bind_removable(modname, SWITCH_EVENT_CUSTOM, event_filter_name[i] + strlen("SWITCH_EVENT_CUSTOM::"),
274+
event_handler, static_cast<void*>(&_publisher),&(profile.event_nodes[i])) != SWITCH_STATUS_SUCCESS) {
275+
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot bind to event handler %d!\n",(int)profile.event_ids[i]);
276+
throw std::invalid_argument("Failed to bind event handler for " + std::string(event_filter_name[i]));
277+
}
278+
} else {
279+
if (switch_event_bind_removable(modname, profile.event_ids[i], SWITCH_EVENT_SUBCLASS_ANY,
280+
event_handler, static_cast<void*>(&_publisher), &(profile.event_nodes[i])) != SWITCH_STATUS_SUCCESS) {
281+
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot bind to event handler %d!\n",(int)profile.event_ids[i]);
282+
throw std::invalid_argument( "Failed to bind event handler for " + std::string(event_filter_name[i]));
283+
}
284+
}
285+
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Subscribed to %s event.\n", event_filter_name[i]);
286+
}
287+
288+
} else {
289+
// Subscribe to all switch events of any subclass
290+
// Store a pointer to ourself in the user data
291+
if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler,
292+
static_cast<void*>(&_publisher), &_node)
293+
!= SWITCH_STATUS_SUCCESS) {
294+
throw std::runtime_error("Couldn't bind to switch events.");
295+
}
296+
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Subscribed to ALL events\n");
297+
261298
}
262-
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Subscribed to events\n");
299+
263300

264301
// Create our module interface registration
265302
*module_interface = switch_loadable_module_create_module_interface(pool, modname);
@@ -276,7 +313,13 @@ namespace mod_event_kafka {
276313

277314
~KafkaModule() {
278315
// Unsubscribe from the switch events
279-
switch_event_unbind(&_node);
316+
if (profile.event_subscriptions > 0 ) {
317+
for (int i = 0; i < profile.event_subscriptions; i++) {
318+
switch_event_unbind(&(profile.event_nodes[i]));
319+
}
320+
} else {
321+
switch_event_unbind(&_node);
322+
}
280323
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Module shut down\n");
281324
}
282325

mod_event_kafka.hpp

+10
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,18 @@ namespace mod_event_kafka {
1515
char *password;
1616
int buffer_size;
1717
char *compression;
18+
char *event_filter;
1819
} globals;
1920

21+
22+
static struct {
23+
/* Array to store the possible event subscriptions */
24+
int event_subscriptions;
25+
switch_event_node_t *event_nodes[SWITCH_EVENT_ALL];
26+
switch_event_types_t event_ids[SWITCH_EVENT_ALL];
27+
switch_event_node_t *eventNode;
28+
} profile;
29+
2030
SWITCH_MODULE_LOAD_FUNCTION(mod_event_kafka_load);
2131
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_event_kafka_shutdown);
2232

0 commit comments

Comments
 (0)