Skip to content

Commit 1179579

Browse files
author
avva
committed
Rewrite the get commands to use async IO and select() instead of sync IO
and alarm timeouts. - the former multi-get workhorse _load_items() is gone - the new workhorse _load_multi() does all the work of sending out "get ..." commands to sockets, then reading and parsing the responses. It does this for all sockets simultaneously, multiplexing read/write operations on all of them with select(). All sockets are put into nonblocking state on entry, and their former state is restored on exit (this is temporary; in the future we'll keep them nonblocking all the time). - get_multi() is now a thin wrapper for _load_multi(), it no longer does its own sending. - get() is now a trivial wrapper for get_multi(). This is the first and most important step towards converting all IO in this module to use nonblocking IO and select(), and ditching yucky alarm timeouts altogether. Written by avva, reviewed and several bugs fixed by brad. git-svn-id: http://code.sixapart.com/svn/memcached/trunk/api/perl@172 b0b603af-a30f-0410-a34e-baf09ae79d0b
1 parent e7a1a04 commit 1179579

File tree

1 file changed

+209
-89
lines changed

1 file changed

+209
-89
lines changed

Memcached.pm

+209-89
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ sub new {
5353
$self->{'compress_threshold'} = $args->{'compress_threshold'};
5454
$self->{'compress_enable'} = 1;
5555

56+
# TODO: undocumented
57+
$self->{'select_timeout'} = $args->{'select_timeout'} || 1.0;
58+
5659
return $self;
5760
}
5861

@@ -342,32 +345,10 @@ sub _incrdecr {
342345

343346
sub get {
344347
my ($self, $key) = @_;
345-
$self->{'stats'}->{"get"}++;
346-
347-
my $sock = $self->get_sock($key);
348-
return undef unless $sock;
349-
350-
# get at the real key (we don't need the explicit hash value anymore)
351-
$key = $key->[1] if ref $key;
352-
353-
my %val;
354-
355-
local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
356-
local $SIG{'ALRM'} = sub { _dead_sock($sock); die "alarm"; };
357-
alarm($SOCK_TIMEOUT);
358-
eval {
359-
send($sock, "get $key\r\n", $FLAG_NOSIGNAL) ?
360-
_load_items($sock, \%val) :
361-
_dead_sock($sock, undef);
362-
alarm(0);
363-
if ($self->{'debug'}) {
364-
while (my ($k, $v) = each %val) {
365-
print STDERR "MemCache: got $k = $v\n";
366-
}
367-
}
368-
};
369348

370-
return $val{$key};
349+
# TODO: make a fast path for this? or just keep using get_multi?
350+
my $r = $self->get_multi($key);
351+
return $r->{$key};
371352
}
372353

373354
sub get_multi {
@@ -376,48 +357,20 @@ sub get_multi {
376357
$self->{'stats'}->{"get_multi"}++;
377358
my %val; # what we'll be returning a reference to (realkey -> value)
378359
my %sock_keys; # sockref_as_scalar -> [ realkeys ]
379-
my @socks; # unique socket refs
380360
my $sock;
381361

382362
foreach my $key (@_) {
383363
$sock = $self->get_sock($key);
384364
next unless $sock;
385365
$key = ref $key ? $key->[1] : $key;
386-
unless ($sock_keys{$sock}) {
387-
$sock_keys{$sock} = [];
388-
push @socks, $sock;
389-
}
390366
push @{$sock_keys{$sock}}, $key;
391367
}
392368
$self->{'stats'}->{"get_keys"} += @_;
393-
$self->{'stats'}->{"get_socks"} += @socks;
369+
$self->{'stats'}->{"get_socks"} += keys %sock_keys;
394370

395371
local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
396-
local $SIG{'ALRM'} = sub { _dead_sock($sock); die "alarm"; };
397-
398-
# pass 1: send out requests
399-
my @gather;
400372

401-
alarm($SOCK_TIMEOUT);
402-
foreach my $sock (@socks) {
403-
eval {
404-
if (send($sock, "get @{$sock_keys{$sock}}\r\n", $FLAG_NOSIGNAL)) {
405-
push @gather, $sock;
406-
} else {
407-
_dead_sock($sock);
408-
}
409-
};
410-
}
411-
alarm(0);
412-
413-
# pass 2: parse responses
414-
alarm($SOCK_TIMEOUT);
415-
foreach my $sock (@gather) {
416-
eval {
417-
_load_items($sock, \%val);
418-
};
419-
}
420-
alarm(0);
373+
_load_multi($self, \%sock_keys, \%val);
421374

422375
if ($self->{'debug'}) {
423376
while (my ($k, $v) = each %val) {
@@ -427,46 +380,213 @@ sub get_multi {
427380
return \%val;
428381
}
429382

430-
sub _load_items {
431-
my ($sock, $ret) = @_;
383+
sub _load_multi {
432384
use bytes; # return bytes from length()
433-
while (1) {
434-
my $decl = readline($sock);
435-
if ($decl eq "END\r\n") {
436-
return 1;
437-
} elsif ($decl =~ /^VALUE (\S+) (\d+) (\d+)\r\n$/) {
438-
my ($rkey, $flags, $len) = ($1, $2, $3);
439-
my $bneed = $len+2; # with \r\n
440-
my $offset = 0;
441-
442-
while ($bneed > 0) {
443-
my $n = read($sock, $ret->{$rkey}, $bneed, $offset);
444-
last unless $n;
445-
$offset += $n;
446-
$bneed -= $n;
385+
my ($self, $sock_keys, $ret) = @_;
386+
387+
# all keyed by a $sock:
388+
my %blocks; # old blocking value
389+
my %reading; # bool, whether we're reading from this socket
390+
my %writing; # bool, whether we're writing into this socket
391+
my %state; # reading state:
392+
# 0 = waiting for a line, N = reading N bytes
393+
my %buf; # buffers
394+
my %offset; # offsets to read into buffers
395+
my %key; # current key per socket
396+
my %flags; # flags per socket
397+
398+
foreach (keys %$sock_keys) {
399+
print STDERR "processing socket $_\n" if $self->{'debug'} >= 2;
400+
$blocks{$_} = IO::Handle::blocking($_,0);
401+
$writing{$_} = 1;
402+
$buf{$_} = "get @{$sock_keys->{$_}}\r\n";
403+
}
404+
405+
my $active_changed = 1; # force rebuilding of select sets
406+
407+
my $dead = sub {
408+
my $sock = shift;
409+
print STDERR "killing socket $sock\n" if $self->{'debug'} >= 2;
410+
delete $reading{$sock};
411+
delete $writing{$sock};
412+
delete $blocks{$sock};
413+
delete $ret->{$key{$sock}}
414+
if $key{$sock};
415+
close $sock;
416+
_dead_sock($sock);
417+
$active_changed = 1;
418+
};
419+
420+
my $finalize = sub {
421+
my $sock = shift;
422+
my $k = $key{$sock};
423+
424+
# remove trailing \r\n
425+
chop $ret->{$k}; chop $ret->{$k};
426+
427+
unless (length($ret->{$k}) == $state{$sock}-2) {
428+
$dead->($sock);
429+
return;
430+
}
431+
432+
$ret->{$k} = Compress::Zlib::memGunzip($ret->{$k})
433+
if $HAVE_ZLIB && $flags{$sock} & F_COMPRESS;
434+
$ret->{$k} = Storable::thaw($ret->{$k})
435+
if $flags{$sock} & F_STORABLE;
436+
};
437+
438+
my $read = sub {
439+
my $sock = shift;
440+
my $res;
441+
442+
# where are we reading into?
443+
if ($state{$sock}) { # reading value into $ret
444+
$res = sysread($sock, $ret->{$key{$sock}},
445+
$state{$sock} - $offset{$sock},
446+
$offset{$sock});
447+
return
448+
if !defined($res) and $!{EWOULDBLOCK};
449+
if ($res == 0) { # catches 0=conn closed or undef=error
450+
$dead->($sock);
451+
return;
452+
}
453+
$offset{$sock} += $res;
454+
if ($offset{$sock} == $state{$sock}) { # finished reading
455+
$finalize->($sock);
456+
$state{$sock} = 0; # wait for another VALUE line or END
457+
$offset{$sock} = 0;
447458
}
459+
return;
460+
}
461+
462+
# we're reading a single line.
463+
# first, read whatever's there, but be satisfied with 2048 bytes
464+
$res = sysread($sock, $buf{$sock},
465+
2048, $offset{$sock});
466+
return
467+
if !defined($res) and $!{EWOULDBLOCK};
468+
if ($res == 0) {
469+
$dead->($sock);
470+
return;
471+
}
472+
$offset{$sock} += $res;
473+
474+
SEARCH:
475+
while(1) { # may have to search many times
476+
# do we have a complete END line?
477+
if ($buf{$sock} =~ /^END\r\n/) {
478+
# okay, finished with this socket
479+
delete $reading{$sock};
480+
$active_changed = 1;
481+
return;
482+
}
483+
484+
# do we have a complete VALUE line?
485+
if ($buf{$sock} =~ /^VALUE (\S+) (\d+) (\d+)\r\n/g) {
486+
($key{$sock}, $flags{$sock}, $state{$sock}) = ($1, int($2), $3+2);
487+
my $p = pos($buf{$sock});
488+
pos($buf{$sock}) = 0;
489+
my $len = length($buf{$sock});
490+
my $copy = $len-$p > $state{$sock} ? $state{$sock} : $len-$p;
491+
$ret->{$key{$sock}} = substr($buf{$sock}, $p, $copy)
492+
if $copy;
493+
$offset{$sock} = $copy;
494+
substr($buf{$sock}, 0, $p+$copy, ''); # delete the stuff we used
495+
if ($offset{$sock} == $state{$sock}) { # have it all?
496+
$finalize->($sock);
497+
$state{$sock} = 0; # wait for another VALUE line or END
498+
$offset{$sock} = 0;
499+
next SEARCH; # look again
500+
}
501+
last SEARCH; # buffer is empty now
502+
}
503+
504+
# if we're here probably means we only have a partial VALUE
505+
# or END line in the buffer. Could happen with multi-get,
506+
# though probably very rarely. Exit the loop and let it read
507+
# more.
508+
last SEARCH;
509+
}
510+
511+
# we don't have a complete line, wait and read more when ready
512+
return;
513+
};
448514

449-
unless ($offset == $len+2) {
450-
# something messed up. let's abort.
451-
delete $ret->{$rkey};
452-
_close_sock($sock);
453-
return 0;
515+
my $write = sub {
516+
my $sock = shift;
517+
my $res;
518+
519+
$res = send($sock, $buf{$sock}, $FLAG_NOSIGNAL);
520+
return
521+
if not defined $res and $!{EWOULDBLOCK};
522+
unless ($res > 0) {
523+
$dead->($sock);
524+
return;
525+
}
526+
if ($res == length($buf{$sock})) { # all sent
527+
$buf{$sock} = "";
528+
$offset{$sock} = $state{$sock} = 0;
529+
# switch the socket from writing state to reading state
530+
delete $writing{$sock};
531+
$reading{$sock} = 1;
532+
$active_changed = 1;
533+
} else { # we only succeeded in sending some of it
534+
substr($buf{$sock}, 0, $res, ''); # delete the part we sent
535+
}
536+
return;
537+
};
538+
539+
# the bitsets for select
540+
my ($rin, $rout, $win, $wout);
541+
my $nfound;
542+
543+
# the big select loop
544+
while(1) {
545+
if ($active_changed) {
546+
last unless %reading or %writing; # no sockets left?
547+
($rin, $win) = (undef, undef);
548+
foreach (keys %reading) {
549+
vec($rin, fileno($_), 1) = 1;
454550
}
551+
foreach (keys %writing) {
552+
vec($win, fileno($_), 1) = 1;
553+
}
554+
$active_changed = 0;
555+
}
556+
# TODO: more intelligent cumulative timeout?
557+
$nfound = select($rout=$rin, $wout=$win, undef,
558+
$self->{'select_timeout'});
559+
last unless $nfound;
560+
561+
# TODO: possible robustness improvement: we could select
562+
# writing sockets for reading also, and raise hell if they're
563+
# ready (input unread from last time, etc.)
564+
# maybe do that on the first loop only?
565+
foreach (keys %writing) {
566+
if (vec($wout, fileno($_), 1)) {
567+
$write->($_);
568+
}
569+
}
570+
foreach (keys %reading) {
571+
if (vec($rout, fileno($_), 1)) {
572+
$read->($_);
573+
}
574+
}
575+
}
576+
577+
# if there're active sockets left, they need to die
578+
foreach (keys %writing) {
579+
$dead->($_);
580+
}
581+
foreach (keys %reading) {
582+
$dead->($_);
583+
}
455584

456-
# remove trailing \r\n
457-
chop $ret->{$rkey}; chop $ret->{$rkey};
458-
459-
$ret->{$rkey} = Compress::Zlib::memGunzip($ret->{$rkey})
460-
if $HAVE_ZLIB && $flags & F_COMPRESS;
461-
$ret->{$rkey} = Storable::thaw($ret->{$rkey})
462-
if $flags & F_STORABLE;
463-
} else {
464-
chomp $decl;
465-
chomp $decl;
466-
print STDERR "Error parsing memcached response. For $sock, got: $decl\n";
467-
return _dead_sock($sock,0);
468-
}
585+
# unblock sockets that made it
586+
foreach (keys %blocks) {
587+
IO::Handle::blocking($_, $blocks{$_});
469588
}
589+
return;
470590
}
471591

472592
sub _hashfunc {

0 commit comments

Comments
 (0)