Skip to content

Andsel/verify oom behavior of parser in isolation #477

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@ repositories {
dependencies {
testImplementation 'junit:junit:4.12'
testImplementation 'org.hamcrest:hamcrest-library:1.3'
testImplementation 'org.apache.logging.log4j:log4j-core:2.17.0'
implementation 'org.apache.logging.log4j:log4j-core:2.17.0'

// needed to run ServerRunner self hosted
implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonDatabindVersion}"
implementation "com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}"
implementation "com.fasterxml.jackson.module:jackson-module-afterburner:2.15.2"

implementation "io.netty:netty-buffer:${nettyVersion}"
implementation "io.netty:netty-codec:${nettyVersion}"
implementation "io.netty:netty-common:${nettyVersion}"
Expand Down Expand Up @@ -104,3 +111,15 @@ publishing {
}
}
}

jar {
duplicatesStrategy(DuplicatesStrategy.EXCLUDE)

manifest {
attributes "Main-Class": "org.logstash.beats.ServerRunner"
}

from {
configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
}
}
78 changes: 48 additions & 30 deletions src/main/java/org/logstash/beats/BeatsParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -48,16 +49,20 @@ private enum States {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws InvalidFrameProtocolException, IOException {
if(!hasEnoughBytes(in)) {
if (decodingCompressedBuffer){
if (!hasEnoughBytes(in)) {
if (decodingCompressedBuffer) {
throw new InvalidFrameProtocolException("Insufficient bytes in compressed content to decode: " + currentState);
}
return;
}

if (!ctx.channel().isOpen()) {
logger.info("Channel is not open, {}", ctx.channel());
}

switch (currentState) {
case READ_HEADER: {
logger.trace("Running: READ_HEADER");
logger.trace("Running: READ_HEADER {}", ctx.channel());

int version = Protocol.version(in.readByte());

Expand All @@ -70,28 +75,28 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
batch = new V1Batch();
}
}
transition(States.READ_FRAME_TYPE);
transition(States.READ_FRAME_TYPE, ctx.channel());
break;
}
case READ_FRAME_TYPE: {
byte frameType = in.readByte();

switch(frameType) {
case Protocol.CODE_WINDOW_SIZE: {
transition(States.READ_WINDOW_SIZE);
transition(States.READ_WINDOW_SIZE, ctx.channel());
break;
}
case Protocol.CODE_JSON_FRAME: {
// Reading Sequence + size of the payload
transition(States.READ_JSON_HEADER);
transition(States.READ_JSON_HEADER, ctx.channel());
break;
}
case Protocol.CODE_COMPRESSED_FRAME: {
transition(States.READ_COMPRESSED_FRAME_HEADER);
transition(States.READ_COMPRESSED_FRAME_HEADER, ctx.channel());
break;
}
case Protocol.CODE_FRAME: {
transition(States.READ_DATA_FIELDS);
transition(States.READ_DATA_FIELDS, ctx.channel());
break;
}
default: {
Expand All @@ -101,8 +106,10 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
break;
}
case READ_WINDOW_SIZE: {
logger.trace("Running: READ_WINDOW_SIZE");
batch.setBatchSize((int) in.readUnsignedInt());
logger.trace("Running: READ_WINDOW_SIZE {}", ctx.channel());
int batchSize = (int) in.readUnsignedInt();
logger.trace("Batch size: {} - channel {}", batchSize, ctx.channel());
batch.setBatchSize(batchSize);

// This is unlikely to happen but I have no way to known when a frame is
// actually completely done other than checking the windows and the sequence number,
Expand All @@ -114,12 +121,12 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
batchComplete();
}

transition(States.READ_HEADER);
transition(States.READ_HEADER, ctx.channel());
break;
}
case READ_DATA_FIELDS: {
// Lumberjack version 1 protocol, which use the Key:Value format.
logger.trace("Running: READ_DATA_FIELDS");
logger.trace("Running: READ_DATA_FIELDS {}", ctx.channel());
sequence = (int) in.readUnsignedInt();
int fieldsCount = (int) in.readUnsignedInt();
int count = 0;
Expand Down Expand Up @@ -152,34 +159,36 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
out.add(batch);
batchComplete();
}
transition(States.READ_HEADER);
transition(States.READ_HEADER, ctx.channel());

break;
}
case READ_JSON_HEADER: {
logger.trace("Running: READ_JSON_HEADER");
logger.trace("Running: READ_JSON_HEADER {}", ctx.channel());

sequence = (int) in.readUnsignedInt();
logger.trace("Sequence num to read {} for channel {}", sequence, ctx.channel());
int jsonPayloadSize = (int) in.readUnsignedInt();

if(jsonPayloadSize <= 0) {
throw new InvalidFrameProtocolException("Invalid json length, received: " + jsonPayloadSize);
}

transition(States.READ_JSON, jsonPayloadSize);
transition(States.READ_JSON, jsonPayloadSize, ctx.channel());
break;
}
case READ_COMPRESSED_FRAME_HEADER: {
logger.trace("Running: READ_COMPRESSED_FRAME_HEADER");
logger.trace("Running: READ_COMPRESSED_FRAME_HEADER {}", ctx.channel());

transition(States.READ_COMPRESSED_FRAME, in.readInt());
transition(States.READ_COMPRESSED_FRAME, in.readInt(), ctx.channel());
break;
}

case READ_COMPRESSED_FRAME: {
logger.trace("Running: READ_COMPRESSED_FRAME");
logger.trace("Running: READ_COMPRESSED_FRAME {}", ctx.channel());

inflateCompressedFrame(ctx, in, (buffer) -> {
transition(States.READ_HEADER);
transition(States.READ_HEADER, ctx.channel());

decodingCompressedBuffer = true;
try {
Expand All @@ -188,23 +197,32 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
}
} finally {
decodingCompressedBuffer = false;
transition(States.READ_HEADER);
transition(States.READ_HEADER, ctx.channel());
}
});
break;
}
case READ_JSON: {
logger.trace("Running: READ_JSON");
((V2Batch)batch).addMessage(sequence, in, requiredBytes);
if(batch.isComplete()) {
if(logger.isTraceEnabled()) {
logger.trace("Sending batch size: " + this.batch.size() + ", windowSize: " + batch.getBatchSize() + " , seq: " + sequence);
logger.trace("Running: READ_JSON {}", ctx.channel());
try {
((V2Batch) batch).addMessage(sequence, in, requiredBytes);
} catch (Throwable th) {
// batch has to release its internal buffer before bubbling up the exception
batch.release();

// re throw the same error after released the internal buffer
throw th;
}

if (batch.isComplete()) {
if (logger.isTraceEnabled()) {
logger.trace("Sending batch size: " + this.batch.size() + ", windowSize: " + batch.getBatchSize() + " , seq: " + sequence + " {}", ctx.channel());
}
out.add(batch);
batchComplete();
}

transition(States.READ_HEADER);
transition(States.READ_HEADER, ctx.channel());
break;
}
}
Expand Down Expand Up @@ -238,13 +256,13 @@ private boolean hasEnoughBytes(ByteBuf in) {
return in.readableBytes() >= requiredBytes;
}

private void transition(States next) {
transition(next, next.length);
private void transition(States next, Channel ch) {
transition(next, next.length, ch);
}

private void transition(States nextState, int requiredBytes) {
private void transition(States nextState, int requiredBytes, Channel ch) {
if (logger.isTraceEnabled()) {
logger.trace("Transition, from: " + currentState + ", to: " + nextState + ", requiring " + requiredBytes + " bytes");
logger.trace("Transition, from: " + currentState + ", to: " + nextState + ", requiring " + requiredBytes + " bytes {}", ch);
}
this.currentState = nextState;
this.requiredBytes = requiredBytes;
Expand Down
56 changes: 56 additions & 0 deletions src/main/java/org/logstash/beats/FlowLimiterHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.logstash.beats;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* Configure the channel where it's installed to operate the reads in pull mode,
* disabling the autoread and explicitly invoking the read operation.
* The flow control to keep the outgoing buffer under control is done
* avoiding to read in new bytes if the outgoing direction became not writable, this
* excert back pressure to the TCP layer and ultimately to the upstream system.
* */
@Sharable
public final class FlowLimiterHandler extends ChannelInboundHandlerAdapter {

private final static Logger logger = LogManager.getLogger(FlowLimiterHandler.class);

@Override
public void channelRegistered(final ChannelHandlerContext ctx) throws Exception {
ctx.channel().config().setAutoRead(false);
super.channelRegistered(ctx);
}

@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
if (isAutoreadDisabled(ctx.channel()) && ctx.channel().isWritable()) {
ctx.channel().read();
}
}

@Override
public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
if (isAutoreadDisabled(ctx.channel()) && ctx.channel().isWritable()) {
ctx.channel().read();
}
}

private boolean isAutoreadDisabled(Channel channel) {
return !channel.config().isAutoRead();
}

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.channel().read();
super.channelWritabilityChanged(ctx);

logger.debug("Writability on channel {} changed to {}", ctx.channel(), ctx.channel().isWritable());
}

}
59 changes: 59 additions & 0 deletions src/main/java/org/logstash/beats/OOMConnectionCloser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.logstash.beats;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class OOMConnectionCloser extends ChannelInboundHandlerAdapter {

private static class DirectMemoryUsage {
private final long used;
private final long pinned;
private final short ratio;

private DirectMemoryUsage(long used, long pinned) {
this.used = used;
this.pinned = pinned;
this.ratio = (short) Math.round(((double) pinned / used) * 100);
}

static DirectMemoryUsage capture() {
PooledByteBufAllocator allocator = (PooledByteBufAllocator) ByteBufAllocator.DEFAULT;
long usedDirectMemory = allocator.metric().usedDirectMemory();
long pinnedDirectMemory = allocator.pinnedDirectMemory();
return new DirectMemoryUsage(usedDirectMemory, pinnedDirectMemory);
}
}

private final static Logger logger = LogManager.getLogger(OOMConnectionCloser.class);

public static final Pattern DIRECT_MEMORY_ERROR = Pattern.compile("^Cannot reserve \\d* bytes of direct buffer memory.*$");

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (isDirectMemoryOOM(cause)) {
DirectMemoryUsage direct = DirectMemoryUsage.capture();
logger.info("Direct memory status, used: {}, pinned: {}, ratio: {}", direct.used, direct.pinned, direct.ratio);
logger.warn("Dropping connection {} due to lack of available Direct Memory. Please lower the number of concurrent connections or reduce the batch size. " +
"Alternatively, raise -XX:MaxDirectMemorySize option in the JVM running Logstash", ctx.channel());
ctx.flush();
ctx.close();
} else {
super.exceptionCaught(ctx, cause);
}
}

private boolean isDirectMemoryOOM(Throwable th) {
if (!(th instanceof OutOfMemoryError)) {
return false;
}
Matcher m = DIRECT_MEMORY_ERROR.matcher(th.getMessage());
return m.matches();
}
}
Loading