Skip to content

redisAsyncAppend family of commands #822

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 170 additions & 35 deletions async.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ void __redisSetError(redisContext *c, int type, const char *str);

/* Functions managing dictionary of callbacks for pub/sub. */
static unsigned int callbackHash(const void *key) {
return dictGenHashFunction((const unsigned char *)key,
sdslen((const sds)key));
return dictGenHashFunction(key, sdslen((const sds) key));
}

static void *callbackValDup(void *privdata, const void *src) {
Expand All @@ -70,13 +69,15 @@ static void *callbackValDup(void *privdata, const void *src) {
}

static int callbackKeyCompare(void *privdata, const void *key1, const void *key2) {
int l1, l2;
size_t l1, l2;
((void) privdata);

l1 = sdslen((const sds)key1);
l2 = sdslen((const sds)key2);

if (l1 != l2) return 0;
return memcmp(key1,key2,l1) == 0;

return 0 == memcmp(key1, key2, l1);
}

static void callbackKeyDestructor(void *privdata, void *key) {
Expand Down Expand Up @@ -162,6 +163,16 @@ static void __redisAsyncCopyError(redisAsyncContext *ac) {
ac->errstr = c->errstr;
}

static void __redisAsyncSetError(redisAsyncContext *ctx, int err_type, const char *msg) {
/* Set the errstr if not already set */
if (NULL == ctx->errstr) {
ctx->errstr = ctx->c.errstr;
}

ctx->err = err_type;
__redisSetError(&ctx->c, err_type, msg);
}

redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options) {
redisOptions myOptions = *options;
redisContext *c;
Expand Down Expand Up @@ -282,7 +293,7 @@ static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisRe
/* Helper function to free the context. */
static void __redisAsyncFree(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisCallback cb;
redisCallback cb, *pcb;
dictIterator *it;
dictEntry *de;

Expand All @@ -298,8 +309,10 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
if (ac->sub.channels) {
it = dictGetIterator(ac->sub.channels);
if (it != NULL) {
while ((de = dictNext(it)) != NULL)
__redisRunCallback(ac,dictGetEntryVal(de),NULL);
while ((de = dictNext(it)) != NULL) {
pcb = dictGetEntryVal(de);
if (pcb) __redisRunCallback(ac,pcb,NULL);
}
dictReleaseIterator(it);
}

Expand All @@ -309,8 +322,10 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
if (ac->sub.patterns) {
it = dictGetIterator(ac->sub.patterns);
if (it != NULL) {
while ((de = dictNext(it)) != NULL)
__redisRunCallback(ac,dictGetEntryVal(de),NULL);
while ((de = dictNext(it)) != NULL) {
pcb = dictGetEntryVal(de);
if (pcb) __redisRunCallback(ac,pcb,NULL);
}
dictReleaseIterator(it);
}

Expand Down Expand Up @@ -424,6 +439,8 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
de = dictFind(callbacks,sname);
if (de != NULL) {
cb = dictGetEntryVal(de);
if (cb == NULL)
goto oom;

/* If this is an subscribe reply decrease pending counter. */
if (strcasecmp(stype+pvariant,"subscribe") == 0) {
Expand Down Expand Up @@ -455,6 +472,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
}
return REDIS_OK;
oom:
sdsfree(sname);
__redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory");
return REDIS_ERR;
}
Expand Down Expand Up @@ -634,7 +652,7 @@ void redisAsyncHandleTimeout(redisAsyncContext *ac) {
}

if (!c->err) {
__redisSetError(c, REDIS_ERR_TIMEOUT, "Timeout");
__redisAsyncSetError(ac, REDIS_ERR_TIMEOUT, "Timeout");
}

if (!(c->flags & REDIS_CONNECTED) && ac->onConnect) {
Expand All @@ -661,17 +679,17 @@ static const char *nextArgument(const char *start, const char **str, size_t *len
if (p == NULL) return NULL;
}

*len = (int)strtol(p+1,NULL,10);
*len = (size_t) strtoull(p+1,NULL,10);
p = strchr(p,'\r');
assert(p);
*str = p+2;
return p+2+(*len)+2;
}

/* Helper function for the redisAsyncCommand* family of functions. Writes a
/* Helper function for the redisAsyncAppend* family of functions. Writes a
* formatted command to the output buffer and registers the provided callback
* function with the context. */
static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
static int __redisAsyncAppend(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
redisContext *c = &(ac->c);
redisCallback cb;
struct dict *cbdict;
Expand All @@ -693,21 +711,39 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
cb.pending_subs = 1;

/* Find out which command will be appended. */
p = nextArgument(cmd,&cstr,&clen);
assert(p != NULL);
if (NULL == (p = nextArgument(cmd,&cstr,&clen))) {
__redisAsyncSetError(ac, REDIS_ERR_PROTOCOL, "Invalid command format");
return REDIS_ERR;
}

hasnext = (p[0] == '$');
pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0;
cstr += pvariant;
clen -= pvariant;

/* NOTICE:
* Further non thread-safe actions [ we append to buffer before pushing having proper callback ]
* Since hiredis is already not thread-safe let's just append first to check for OOM errors */

/* Make sure we can append command before making it's callback - Delegate error */
if (REDIS_OK != (ret = __redisAppendCommand(c,cmd,len))) return ret;

#define PUSH_CALLBACK(cb_list) \
if (REDIS_OK != (ret = __redisPushCallback(&ac->cb_list, &cb))) { \
__redisAsyncSetError(ac, ret, "PushCallback failure"); \
goto oom_error; \
}

if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
c->flags |= REDIS_SUBSCRIBED;

/* Add every channel/pattern to the list of subscription callbacks. */
while ((p = nextArgument(p,&astr,&alen)) != NULL) {
sname = sdsnewlen(astr,alen);
if (sname == NULL)
goto oom;
if (sname == NULL) {
__redisAsyncSetError(ac, REDIS_ERR_OOM, "Out of memory");
goto oom_error;
}

if (pvariant)
cbdict = ac->sub.patterns;
Expand All @@ -718,6 +754,9 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void

if (de != NULL) {
existcb = dictGetEntryVal(de);
if (existcb == NULL)
goto oom_error;

cb.pending_subs = existcb->pending_subs + 1;
}

Expand All @@ -733,42 +772,131 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
/* (P)UNSUBSCRIBE does not have its own response: every channel or
* pattern that is unsubscribed will receive a message. This means we
* should not append a callback function for this command. */
} else if(strncasecmp(cstr,"monitor\r\n",9) == 0) {
/* Set monitor flag and push callback */
c->flags |= REDIS_MONITORING;
__redisPushCallback(&ac->replies,&cb);
} else if(strncasecmp(cstr,"monitor\r\n",9) == 0) {
/* Set monitor flag and push callback */
c->flags |= REDIS_MONITORING;
PUSH_CALLBACK(replies)
} else {
if (c->flags & REDIS_SUBSCRIBED)
if (c->flags & REDIS_SUBSCRIBED) {
/* This will likely result in an error reply, but it needs to be
* received and passed to the callback. */
__redisPushCallback(&ac->sub.invalid,&cb);
else
__redisPushCallback(&ac->replies,&cb);
PUSH_CALLBACK(sub.invalid)
} else {
PUSH_CALLBACK(replies)
}
}

__redisAppendCommand(c,cmd,len);

/* Always schedule a write when the write buffer is non-empty */
_EL_ADD_WRITE(ac);
#undef PUSH_CALLBACK

return REDIS_OK;
oom:
__redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory");

oom_error:
/* Rollback the obuf data */
clen = sdslen(c->obuf) - len;
sdssetlen(c->obuf, clen);
c->obuf[clen] = '\0';

/* Free unused obuf space */
c->obuf = sdsRemoveFreeSpace(c->obuf);

/* Return error occured */
__redisAsyncCopyError(ac);
return REDIS_ERR;
}

int redisvAsyncAppend(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
char *cmd;
int len;
int status;
len = redisvFormatCommand(&cmd,format,ap);

/* We don't want to pass -1 or -2 to future functions as a length. */
if (-1 == len) {
__redisAsyncSetError(ac,REDIS_ERR_OOM,"Out of memory");
return REDIS_ERR;
} else if (-2 == len) {
__redisAsyncSetError(ac,REDIS_ERR_OTHER,"Invalid format string");
return REDIS_ERR;
} else if (0 > len) {
__redisAsyncSetError(ac,REDIS_ERR_OTHER,"Unknown format error");
return REDIS_ERR;
}

status = __redisAsyncAppend(ac, fn, privdata, cmd, (size_t)len);
free(cmd);
return status;
}

int redisAsyncAppend(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) {
va_list ap;
int status;
va_start(ap,format);
status = redisvAsyncCommand(ac,fn,privdata,format,ap);
va_end(ap);
return status;
}

int redisAsyncAppendArgv(redisAsyncContext *ac, redisCallbackFn *fn,
void *privdata, int argc, const char **argv,
const size_t *argvlen) {
sds cmd;
int len;
int status;
len = redisFormatSdsCommandArgv(&cmd, argc, argv, argvlen);

if (-1 == len) {
__redisAsyncSetError(ac,REDIS_ERR_OOM,"Out of memory");
return REDIS_ERR;
} else if (0 > len) {
__redisAsyncSetError(ac,REDIS_ERR_OTHER,"Unknown format error");
return REDIS_ERR;
}

status = __redisAsyncAppend(ac, fn, privdata, cmd, (size_t)len);
sdsfree(cmd);
return status;
}

int redisAsyncFormattedAppend(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
int status = __redisAsyncAppend(ac,fn,privdata,cmd,len);
return status;
}

/* Helper function for the redisAsyncCommand* family of functions. Writes a
* formatted command to the output buffer and registers the provided callback
* function with the context and triggers the scheduler 'ADD_WRITE' event. */
static inline int __redisAsyncCommand( redisAsyncContext *ac, redisCallbackFn *fn,
void *privdata, const char *cmd, size_t len) {
int ret = __redisAsyncAppend(ac, fn, privdata, cmd, len);
if (REDIS_OK == ret) {
/* Always schedule a write when the write buffer is non-empty */
_EL_ADD_WRITE(ac);
}

return ret;
}

int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
char *cmd;
int len;
int status;
len = redisvFormatCommand(&cmd,format,ap);

/* We don't want to pass -1 or -2 to future functions as a length. */
if (len < 0)
if (-1 == len) {
__redisAsyncSetError(ac,REDIS_ERR_OOM,"Out of memory");
return REDIS_ERR;
} else if (-2 == len) {
__redisAsyncSetError(ac,REDIS_ERR_OTHER,"Invalid format string");
return REDIS_ERR;
} else if (0 > len) {
__redisAsyncSetError(ac,REDIS_ERR_OTHER,"Unknown format error");
return REDIS_ERR;
}

status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
status = __redisAsyncCommand(ac, fn, privdata, cmd, (size_t)len);
hi_free(cmd);

return status;
}

Expand All @@ -786,9 +914,16 @@ int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *priv
int len;
int status;
len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen);
if (len < 0)

if (-1 == len) {
__redisAsyncSetError(ac,REDIS_ERR_OOM,"Out of memory");
return REDIS_ERR;
} else if (0 > len) {
__redisAsyncSetError(ac,REDIS_ERR_OTHER,"Unknown format error");
return REDIS_ERR;
status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
}

status = __redisAsyncCommand(ac, fn, privdata, cmd, (size_t)len);
sdsfree(cmd);
return status;
}
Expand Down
9 changes: 7 additions & 2 deletions async.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,13 @@ void redisAsyncHandleTimeout(redisAsyncContext *ac);
void redisAsyncRead(redisAsyncContext *ac);
void redisAsyncWrite(redisAsyncContext *ac);

/* Command functions for an async context. Write the command to the
* output buffer and register the provided callback. */
/* Append functions for an async context. Write the command to the output buffer and register the provided callback. */
int redisvAsyncAppend(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap);
int redisAsyncAppend(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...);
int redisAsyncAppendArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen);
int redisAsyncFormattedAppend(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len);

/* Command functions for an async context. Append the command and trigger WRITE-EVENT */
int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap);
int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...);
int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen);
Expand Down
2 changes: 1 addition & 1 deletion dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ static int _dictInit(dict *ht, dictType *type, void *privDataPtr);

/* Generic hash function (a popular one from Bernstein).
* I tested a few and this was the best. */
static unsigned int dictGenHashFunction(const unsigned char *buf, int len) {
static unsigned int dictGenHashFunction(const unsigned char *buf, size_t len) {
unsigned int hash = 5381;

while (len--)
Expand Down
2 changes: 1 addition & 1 deletion dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ typedef struct dictIterator {
#define dictSize(ht) ((ht)->used)

/* API */
static unsigned int dictGenHashFunction(const unsigned char *buf, int len);
static unsigned int dictGenHashFunction(const unsigned char *buf, size_t len);
static dict *dictCreate(dictType *type, void *privDataPtr);
static int dictExpand(dict *ht, unsigned long size);
static int dictAdd(dict *ht, void *key, void *val);
Expand Down