diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..44c9325 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,5 @@ +.git +.gitignore +.DS_Store +/target +/bin diff --git a/.gitignore b/.gitignore index e08c794..311cc1e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ target/ .DS_Store +/bin diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..1f8d451 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,43 @@ +FROM java:8 +MAINTAINER DaWanda + +ARG SBT_VERSION="0.13.8" + +ENV SQLTAP_HTTP_PORT="3000" \ + SQLTAP_THREADS="16" \ + SQLTAP_SCHEMA="/etc/sqltap-schema.xml" \ + SCHEMA_URL="" \ + SQLTAP_OPTS="" \ + MYSQL_HOST="127.0.0.1" \ + MYSQL_PORT="3306" \ + MYSQL_USER="fetch" \ + MYSQL_DATABASE="test" \ + MYSQL_NUMCONNS="6" \ + MYSQL_QUEUELEN="2500" \ + JMX_PORT="9191" \ + RMI_BIND="127.0.0.1" \ + JAVA_XMX="16384M" \ + CACHE_BACKEND="memcache" \ + MEMCACHE_HOST="" \ + MEMCACHE_PORT="11211" \ + MEMCACHE_QUEUELEN="8192" \ + MEMCACHE_NUMCONNS="20" \ + STATSD_PREFIX="sqltap" + +ADD https://dl.bintray.com/sbt/debian/sbt-$SBT_VERSION.deb /tmp/sbt.deb +RUN dpkg -i /tmp/sbt.deb && rm -f /tmp/sbt.deb +RUN sbt + +ADD project /usr/src/project/ +ADD src /usr/src/src/ +ADD build.sbt /usr/src/ + +RUN cd /usr/src && \ + sbt assembly && \ + cp -vpi /usr/src/target/scala-*/sqltap.jar /usr/lib/sqltap.jar && \ + rm -rf /usr/src/* + +EXPOSE $SQLTAP_HTTP_PORT + +ADD bootup.sh /bootup.sh +CMD ["/bootup.sh"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..9c7b446 --- /dev/null +++ b/Makefile @@ -0,0 +1,22 @@ +# This file is part of the "sqltap" project, http://github.com/dawanda/sqltap> +# (c) 2016 Christian Parpart +# +# Licensed under the MIT License (the "License"); you may not use this +# file except in compliance with the License. You may obtain a copy of +# the License at: http://opensource.org/licenses/MIT + +IMAGE = sqltap +VERSION = $(shell grep ^version build.sbt | cut -d\" -f2) + +image: + docker build -t ${IMAGE}:${VERSION} . + +push: image + docker push ${IMAGE}:${VERSION} + +clean: + rm -rf target + +.PHONY: image clean push + +# vim:ts=8:noet diff --git a/bootup.sh b/bootup.sh new file mode 100755 index 0000000..ffefc52 --- /dev/null +++ b/bootup.sh @@ -0,0 +1,100 @@ +#!/bin/bash +set -e + +SQLTAP_JARFILE="/usr/lib/sqltap.jar" + +if [[ $SCHEMA_URL == http* ]]; then + curl -sL $SCHEMA_URL -o "${SQLTAP_SCHEMA}" +fi + +require_arg() { + local name="$1" + local val="${!name}" + + echo "${name}: ${val}" + + if [[ "${val}" == "" ]]; then + echo "Error. Required argument ${name} missing." 1>&2 + exit 1 + fi +} + +require_args() { + while [[ $# -ne 0 ]]; do + require_arg $1 + shift + done +} + +require_args SQLTAP_SCHEMA \ + MYSQL_PORT \ + SQLTAP_HTTP_PORT \ + SQLTAP_THREADS \ + SCHEMA_URL \ + MYSQL_HOST \ + MYSQL_PORT \ + MYSQL_USER \ + MYSQL_DATABASE \ + MYSQL_NUMCONNS \ + MYSQL_QUEUELEN \ + JMX_PORT \ + RMI_BIND \ + JAVA_XMX \ + CACHE_BACKEND + +opts="" +opts="$opts --config ${SQLTAP_SCHEMA}" +opts="$opts --http ${SQLTAP_HTTP_PORT}" +opts="$opts --disable-keepalive" +opts="$opts -t ${SQLTAP_THREADS}" +opts="$opts --mysql-host ${MYSQL_HOST}" +opts="$opts --mysql-port ${MYSQL_PORT}" +opts="$opts --mysql-user ${MYSQL_USER}" + +if [[ "x${MYSQL_PASSWORD}" != "x" ]]; then + opts="$opts --mysql-password ${MYSQL_PASSWORD}" +fi + +opts="$opts --mysql-database ${MYSQL_DATABASE}" +opts="$opts --mysql-numconns ${MYSQL_NUMCONNS}" +opts="$opts --mysql-queuelen ${MYSQL_QUEUELEN}" +opts="$opts --cache-backend ${CACHE_BACKEND}" + +if [[ "${CACHE_BACKEND}" == "memcache" ]]; then + require_args MEMCACHE_HOST \ + MEMCACHE_PORT \ + MEMCACHE_QUEUELEN \ + MEMCACHE_NUMCONNS + + opts="$opts --memcache-host ${MEMCACHE_HOST}" + opts="$opts --memcache-port ${MEMCACHE_PORT}" + opts="$opts --memcache-queuelen ${MEMCACHE_QUEUELEN}" + opts="$opts --memcache-numconns ${MEMCACHE_NUMCONNS}" +fi + +opts="$opts ${SQLTAP_OPTS}" + +report_to_statsd() { + if [[ -z "$STATSD_HOST" ]] || [[ -z "$STATSD_PORT" ]]; then + return + fi + sleep 10 # give some time to start sqltap + while sleep 1; do + curl -s "http://localhost:${SQLTAP_HTTP_PORT}/stats" | \ + sed -e 's/,/\n/g' | \ + sed -e 's/^[^"]*"//g' -e 's/": *"/:/g' -e 's/".*$//g' -e 's/^/'$STATSD_PREFIX'./g' -e 's/\.[^:.]*$//' -e 's/$/|c/' > \ + /dev/udp/${STATSD_HOST}/${STATSD_PORT} + done +} + +report_to_statsd& + +exec java \ + -Djava.rmi.server.hostname="${RMI_BIND}" \ + -Dcom.sun.management.jmxremote \ + -Dcom.sun.management.jmxremote.port="${JMX_PORT}" \ + -Dcom.sun.management.jmxremote.ssl=false \ + -Dcom.sun.management.jmxremote.authenticate=false \ + -Xmx"${JAVA_XMX}" -XX:GCTimeRatio=99 -XX:+UseConcMarkSweepGC \ + -jar "${SQLTAP_JARFILE}" \ + $opts diff --git a/build.sbt b/build.sbt index cad9983..88dbeb7 100644 --- a/build.sbt +++ b/build.sbt @@ -4,16 +4,18 @@ name := "SQLTap" organization := "com.paulasmuth" -version := "0.7.21" +version := "0.8.6" mainClass in (Compile, run) := Some("com.paulasmuth.sqltap.SQLTap") -scalaSource in Compile <<= baseDirectory(_ / "src") - -scalaVersion := "2.9.1" +scalaVersion := "2.11.7" assemblySettings -jarName in assembly := "sqltap_0.7.21.jar" +jarName in assembly := { s"${name.value.toLowerCase}.jar" } fork in run := true + +libraryDependencies += "org.scala-lang.modules" %% "scala-xml" % "1.0.2" + +libraryDependencies += "org.scalatest" % "scalatest_2.11" % "2.2.4" % "test" diff --git a/patches/sqltap-encoding-fix.diff b/patches/sqltap-encoding-fix.diff new file mode 100644 index 0000000..5bb509c --- /dev/null +++ b/patches/sqltap-encoding-fix.diff @@ -0,0 +1,62 @@ +diff --git a/src/com/paulasmuth/sqltap/ExpirationHandlerFactory.scala b/src/com/paulasmuth/sqltap/ExpirationHandlerFactory.scala +index 2145f56..cf17011 100644 +--- a/src/com/paulasmuth/sqltap/ExpirationHandlerFactory.scala ++++ b/src/com/paulasmuth/sqltap/ExpirationHandlerFactory.scala +@@ -1,5 +1,5 @@ + // This file is part of the "SQLTap" project +-// (c) 2011-2013 Paul Asmuth ++// (c) 2014 Paul Asmuth, Google Inc. + // + // Licensed under the MIT License (the "License"); you may not use this + // file except in compliance with the License. You may obtain a copy of +@@ -30,8 +30,10 @@ object ExpirationHandlerFactory { + case "noop" => + handler = new NoopExpirationHandler() + +- case "purge" => ++ case "purge" => { + handler = new PurgeExpirationHandler() ++ ReplicationFeed.start() ++ } + + case _ => + throw new ParseException("unknown expiration handler: " + name) +diff --git a/src/com/paulasmuth/sqltap/HTTPParser.scala b/src/com/paulasmuth/sqltap/HTTPParser.scala +index 22b27be..33cd26d 100644 +--- a/src/com/paulasmuth/sqltap/HTTPParser.scala ++++ b/src/com/paulasmuth/sqltap/HTTPParser.scala +@@ -1,5 +1,5 @@ + // This file is part of the "SQLTap" project +-// (c) 2011-2013 Paul Asmuth ++// (c) 2014 Paul Asmuth, Google Inc. + // + // Licensed under the MIT License (the "License"); you may not use this + // file except in compliance with the License. You may obtain a copy of +@@ -130,7 +130,7 @@ class HTTPParser { + } + + def uri_parts() : List[String] = { +- val uri = URLDecoder.decode(http_uri, "UTF-8") ++ val uri = URLDecoder.decode(http_uri) + var pos = uri.length + var cur = pos - 1 + var ret = new ListBuffer[String]() +diff --git a/src/com/paulasmuth/sqltap/SQLTap.scala b/src/com/paulasmuth/sqltap/SQLTap.scala +index 357fc20..05c8921 100644 +--- a/src/com/paulasmuth/sqltap/SQLTap.scala ++++ b/src/com/paulasmuth/sqltap/SQLTap.scala +@@ -1,5 +1,5 @@ + // This file is part of the "SQLTap" project +-// (c) 2011-2013 Paul Asmuth ++// (c) 2014 Paul Asmuth, Google Inc. + // + // Licensed under the MIT License (the "License"); you may not use this + // file except in compliance with the License. You may obtain a copy of +@@ -108,7 +108,6 @@ object SQLTap{ + Manifest.load(new File(Config.get('config_base))) + RelationTrace.load(Manifest.resources) + ExpirationHandlerFactory.configure(Config.get('expiration_handler)) +- ReplicationFeed.start() + + val server = new Server(Config.get('threads).toInt) + server.run(Config.get('http_port).toInt) diff --git a/patches/sqltap-log-slow-queries.diff b/patches/sqltap-log-slow-queries.diff new file mode 100644 index 0000000..959f46b --- /dev/null +++ b/patches/sqltap-log-slow-queries.diff @@ -0,0 +1,100 @@ +diff -r -u sqltap/src/com/paulasmuth/sqltap/HTTPConnection.scala sqltap-0.7.21/src/com/paulasmuth/sqltap/HTTPConnection.scala +--- sqltap/src/com/paulasmuth/sqltap/HTTPConnection.scala 2014-02-12 16:13:05.009387526 +0100 ++++ sqltap-0.7.21/src/com/paulasmuth/sqltap/HTTPConnection.scala 2014-09-04 14:31:28.000000000 +0200 +@@ -1,10 +1,9 @@ + // This file is part of the "SQLTap" project +-// (c) 2011-2013 Paul Asmuth ++// (c) 2014 Paul Asmuth, Google Inc. + // + // Licensed under the MIT License (the "License"); you may not use this + // file except in compliance with the License. You may obtain a copy of + // the License at: http://opensource.org/licenses/MIT +- + package com.paulasmuth.sqltap + + import java.nio.channels.{SocketChannel,SelectionKey} +@@ -23,6 +22,7 @@ + private val parser = new HTTPParser() + private var state = HTTP_STATE_INIT + private var last_event : SelectionKey = null ++ private var last_uri : String = "" // for debugging only + private var keepalive : Boolean = false + private var resp_buf : ByteBuffer = null + +@@ -190,6 +190,7 @@ + + idle_timer.cancel() + stime = System.nanoTime ++ last_uri = parser.http_uri + seq += 1 + + if (parser.http_version == "1.1") +@@ -302,8 +303,13 @@ + } + + def finish() : Unit = { +- Statistics.incr('http_request_time_mean, +- (System.nanoTime - stime) / 1000000.0) ++ val runtime_millis = (System.nanoTime - stime) / 1000000.0 ++ Statistics.incr('http_request_time_mean, runtime_millis) ++ ++ if (Config.has_key('log_slow_queries) && ++ runtime_millis >= Config.get('log_slow_queries).toInt) { ++ Logger.log("[HTTP] [Slow Query] (" + runtime_millis + "ms): " + last_uri) ++ } + + if (!keepalive) + return close() +diff -r -u sqltap/src/com/paulasmuth/sqltap/mysql/SQLQuery.scala sqltap-0.7.21/src/com/paulasmuth/sqltap/mysql/SQLQuery.scala +--- sqltap/src/com/paulasmuth/sqltap/mysql/SQLQuery.scala 2014-02-12 16:13:05.013389526 +0100 ++++ sqltap-0.7.21/src/com/paulasmuth/sqltap/mysql/SQLQuery.scala 2014-09-04 14:31:38.000000000 +0200 +@@ -1,10 +1,9 @@ + // This file is part of the "SQLTap" project +-// (c) 2011-2013 Paul Asmuth ++// (c) 2014 Paul Asmuth, Google Inc. + // + // Licensed under the MIT License (the "License"); you may not use this + // file except in compliance with the License. You may obtain a copy of + // the License at: http://opensource.org/licenses/MIT +- + package com.paulasmuth.sqltap.mysql + + import com.paulasmuth.sqltap._ +@@ -44,8 +43,14 @@ + tok = System.nanoTime + qtime = tok - tik + +- Statistics.incr('sql_request_time_mean, qtime / 1000000.0) +- Logger.debug("Finished (" + (qtime / 1000000.0) + "ms): " + query) ++ val runtime_millis = qtime / 1000000.0 ++ Statistics.incr('sql_request_time_mean, runtime_millis) ++ Logger.debug("Finished (" + runtime_millis + "ms): " + query) ++ ++ if (Config.has_key('log_slow_queries) && ++ runtime_millis >= Config.get('log_slow_queries).toInt) { ++ Logger.log("[SQL] [Slow Query] (" + runtime_millis + "ms): " + query) ++ } + } + + def error(err: Throwable) : Unit = { +diff -r -u sqltap/src/com/paulasmuth/sqltap/SQLTap.scala sqltap-0.7.21/src/com/paulasmuth/sqltap/SQLTap.scala +--- sqltap/src/com/paulasmuth/sqltap/SQLTap.scala 2014-02-25 17:15:47.049982142 +0100 ++++ sqltap-0.7.21/src/com/paulasmuth/sqltap/SQLTap.scala 2014-09-04 14:35:51.000000000 +0200 +@@ -72,6 +71,9 @@ + else if (args(n) == "--disable-keepalive") + { Config.set('http_keepalive, "false"); n += 1 } + ++ else if (args(n) == "--log-slow-queries") ++ { Config.set('log_slow_queries, args(n+1)); n += 2 } ++ + else if ((args(n) == "-t") || (args(n) == "--threads")) + { Config.set('threads, args(n+1)); n += 2 } + +@@ -138,6 +139,7 @@ + println(" --memcache-queuelen max mysql queue size per worker ") + println(" --memcache-numconns max number of mysql connections per worker ") + println(" --memcache-mode replication mode (copy, shard) ") ++ println(" --log-slow-queries log all queries with a runtime > val in ms ") + println(" -h, --help you're reading it... ") + println(" -d, --debug debug mode ") + } diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..9685838 --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1,3 @@ +resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns) + +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") diff --git a/src/com/paulasmuth/sqltap/MemcacheConnection.scala b/src/com/paulasmuth/sqltap/MemcacheConnection.scala deleted file mode 100644 index cfe2dd5..0000000 --- a/src/com/paulasmuth/sqltap/MemcacheConnection.scala +++ /dev/null @@ -1,324 +0,0 @@ -// This file is part of the "SQLTap" project -// (c) 2011-2013 Paul Asmuth -// -// Licensed under the MIT License (the "License"); you may not use this -// file except in compliance with the License. You may obtain a copy of -// the License at: http://opensource.org/licenses/MIT - -package com.paulasmuth.sqltap - -import scala.collection.mutable.{ListBuffer} -import java.nio.channels.{SocketChannel,SelectionKey} -import java.nio.{ByteBuffer,ByteOrder} -import java.net.{InetSocketAddress,ConnectException} - -class MemcacheConnection(pool: MemcacheConnectionPool) extends TimeoutCallback { - - var hostname : String = "127.0.0.1" - var port : Int = 11211 - - private val MC_STATE_INIT = 0 - private val MC_STATE_CONN = 1 - private val MC_STATE_IDLE = 2 - private val MC_STATE_CMD_DELETE = 4 - private val MC_STATE_CMD_SET = 5 - private val MC_STATE_CMD_MGET = 6 - private val MC_STATE_READ = 7 - private val MC_STATE_CLOSE = 8 - - private val MC_WRITE_BUF_LEN = 65535 - private val MC_READ_BUF_LEN = (65535 * 8) - - private var state = MC_STATE_INIT - private var last_event : SelectionKey = null - private val read_buf = ByteBuffer.allocate(MC_READ_BUF_LEN) - private val write_buf = ByteBuffer.allocate(MC_WRITE_BUF_LEN) - write_buf.order(ByteOrder.LITTLE_ENDIAN) - - private val sock = SocketChannel.open() - sock.configureBlocking(false) - - private var timer = TimeoutScheduler.schedule(1000, this) - private var requests : List[CacheRequest] = null - private var cur_buf : ElasticBuffer = null - private var cur_len = 0 - - def connect() : Unit = { - Statistics.incr('memcache_connections_open) - - val addr = new InetSocketAddress(hostname, port) - sock.connect(addr) - state = MC_STATE_CONN - - timer.start() - - sock - .register(pool.loop, SelectionKey.OP_CONNECT) - .attach(this) - } - - def execute_mget(keys: List[String], _requests: List[CacheRequest]) : Unit = { - requests = _requests - - if (state != MC_STATE_IDLE) - throw new ExecutionException("memcache connection busy") - - timer.start() - - write_buf.clear - write_buf.put("get".getBytes) - - for (key <- keys) { - write_buf.put(32.toByte) - write_buf.put(key.getBytes("UTF-8")) - } - - write_buf.put(13.toByte) - write_buf.put(10.toByte) - write_buf.flip - - state = MC_STATE_CMD_MGET - last_event.interestOps(SelectionKey.OP_WRITE) - } - - def execute_set(key: String, request: CacheStoreRequest) : Unit = { - val buf = request.buffer.buffer - val len = buf.position - - request.ready() - buf.position(0) - buf.limit(len) - - if (state != MC_STATE_IDLE) - throw new ExecutionException("memcache connection busy") - - timer.start() - - write_buf.clear - write_buf.put("set".getBytes) - write_buf.put(32.toByte) - write_buf.put(key.getBytes("UTF-8")) - write_buf.put(32.toByte) - write_buf.put(48.toByte) - write_buf.put(32.toByte) - write_buf.put(48.toByte) - write_buf.put(32.toByte) - write_buf.put(len.toString.getBytes("UTF-8")) - write_buf.put(13.toByte) - write_buf.put(10.toByte) - write_buf.put(buf) - write_buf.put(13.toByte) - write_buf.put(10.toByte) - write_buf.flip - - state = MC_STATE_CMD_SET - last_event.interestOps(SelectionKey.OP_WRITE) - } - - def execute_delete(key: String) : Unit = { - if (state != MC_STATE_IDLE) - throw new ExecutionException("memcache connection busy") - - timer.start() - - write_buf.clear - write_buf.put("delete".getBytes) - write_buf.put(32.toByte) - write_buf.put(key.getBytes("UTF-8")) - write_buf.put(13.toByte) - write_buf.put(10.toByte) - write_buf.flip - - state = MC_STATE_CMD_DELETE - last_event.interestOps(SelectionKey.OP_WRITE) - } - - - def ready(event: SelectionKey) : Unit = { - try { - sock.finishConnect - } catch { - case e: ConnectException => { - Logger.error("[Memcache] connection failed: " + e.toString, false) - return close(e) - } - } - - idle(event) - } - - /** - * Read algo: - * - fill read buffer until at least one response is ready - * - read all full responses from the read buffer - * - copy the remainder of the read buffer to the beginning of the read buffer - */ - def read(event: SelectionKey) : Unit = { - val chunk = sock.read(read_buf) - - if (chunk <= 0) { - Logger.error("[Memcache] read end of file ", false) - close(new ExecutionException("memcache connection closed")) - return - } - - var cur = 0 - var pos = 0 - - while (cur < read_buf.position) { - if (state == MC_STATE_READ) { - cur = math.min(read_buf.position, pos + cur_len) - - cur_len -= cur - pos - cur_buf.write(read_buf.array, pos, cur - pos) - - if (cur_len == 0) { - cur += 2 - cur_buf.buffer.flip() - state = MC_STATE_CMD_MGET - } - - pos = cur - } else { - if (read_buf.get(cur) == 10) { - next(new String(read_buf.array, pos, cur - 1 - pos, "UTF-8")) - pos = cur + 1 - } - - cur += 1 - } - } - - if (cur < read_buf.position) { - println("READ REMAINING") - read_buf.limit(read_buf.position) - read_buf.position(cur) - read_buf.compact() - } else { - read_buf.clear() - } - } - - def write(event: SelectionKey) : Unit = { - try { - sock.write(write_buf) - } catch { - case e: Exception => { - Logger.error("[Memcache] conn error: " + e.toString, false) - return close(e) - } - } - - if (write_buf.remaining == 0) { - write_buf.clear - event.interestOps(SelectionKey.OP_READ) - } - } - - def close(err: Throwable = null) : Unit = { - if (state == MC_STATE_CLOSE) - return - - try { - if (requests != null) { - for (req <- requests) { - req.ready() - } - } - } catch { - case e: Exception => { - Logger.exception(e, false) - } - } - - state = MC_STATE_CLOSE - - pool.close(this) - sock.close() - Statistics.decr('sql_connections_open) - } - - def timeout() : Unit = { - Logger.error("[Memcache] connection timed out...", false) - close() - } - - private def next(cmd: String) : Unit = { - state match { - - case MC_STATE_CMD_DELETE => { - cmd match { - - case "DELETED" => { - idle(last_event) - } - - case "NOT_FOUND" => { - idle(last_event) - } - - } - } - - case MC_STATE_CMD_SET => { - cmd match { - - case "STORED" => { - idle(last_event) - } - - case "NOT_STORED" => { - idle(last_event) - } - - } - } - - case MC_STATE_CMD_MGET => { - val parts = cmd.split(" ") - - if (parts.length == 1 && parts.head == "END") { - for (req <- requests) { - req.ready() - } - - return idle(last_event) - } - - if (parts.length != 4) { - throw new ExecutionException("[Memcache] protocol error: " + cmd) - } - - for (req <- requests) { - if (req.buffer == null && req.key == parts(1)) { - val buf = new ElasticBuffer(65535 * 8) - req.buffer = buf - - cur_buf = buf - cur_len = parts(3).toInt - - state = MC_STATE_READ - return - } - } - } - - case _ => { - throw new ExecutionException( - "unexpected token " + cmd + " (" + state.toString + ")") - } - - } - } - - private def idle(event: SelectionKey) : Unit = { - timer.cancel() - state = MC_STATE_IDLE - event.interestOps(0) - last_event = event - requests = null - pool.ready(this) - } - - -} diff --git a/src/com/paulasmuth/sqltap/StubCache.scala b/src/com/paulasmuth/sqltap/StubCache.scala deleted file mode 100644 index 7fdb2cc..0000000 --- a/src/com/paulasmuth/sqltap/StubCache.scala +++ /dev/null @@ -1,45 +0,0 @@ -// This file is part of the "SQLTap" project -// (c) 2011-2013 Paul Asmuth -// -// Licensed under the MIT License (the "License"); you may not use this -// file except in compliance with the License. You may obtain a copy of -// the License at: http://opensource.org/licenses/MIT - -package com.paulasmuth.sqltap - -import scala.collection.mutable.{HashMap} - -// STUB! -class StubCache extends CacheBackend { - - val stubcache = new HashMap[String,ElasticBuffer]() - - def connect() : Unit = () - - def execute(requests: List[CacheRequest]) = { - for (req <- requests) { - req match { - case get: CacheGetRequest => { - Logger.debug("[CACHE] retrieve: " + req.key) - stubcache.get(req.key) match { - case Some(buf: ElasticBuffer) => { - get.buffer = buf.clone() - } - case None => () - } - } - case set: CacheStoreRequest => { - Logger.debug("[CACHE] store: " + req.key) - stubcache.put(req.key, set.buffer) - } - case purge: CachePurgeRequest => { - Logger.debug("[CACHE] purge: " + req.key) - stubcache.remove(req.key) - } - } - - req.ready() - } - } - -} diff --git a/src/com/paulasmuth/sqltap/AbstractWrappedBuffer.scala b/src/main/scala/com/paulasmuth/sqltap/AbstractWrappedBuffer.scala similarity index 100% rename from src/com/paulasmuth/sqltap/AbstractWrappedBuffer.scala rename to src/main/scala/com/paulasmuth/sqltap/AbstractWrappedBuffer.scala diff --git a/src/com/paulasmuth/sqltap/CTree.scala b/src/main/scala/com/paulasmuth/sqltap/CTree.scala similarity index 98% rename from src/com/paulasmuth/sqltap/CTree.scala rename to src/main/scala/com/paulasmuth/sqltap/CTree.scala index 2d2f941..8d4a626 100644 --- a/src/com/paulasmuth/sqltap/CTree.scala +++ b/src/main/scala/com/paulasmuth/sqltap/CTree.scala @@ -12,6 +12,7 @@ class CTree(doc: xml.Node) { val name : String = elem.attr("name", true) val query : String = elem.attr("query", true) + val expire : Int = elem.attr("expire", false, "300").toInt val allow_conditions : Boolean = elem.attr("allow_conditions", false, "true").equals("true") diff --git a/src/com/paulasmuth/sqltap/CTreeBuffer.scala b/src/main/scala/com/paulasmuth/sqltap/CTreeBuffer.scala similarity index 100% rename from src/com/paulasmuth/sqltap/CTreeBuffer.scala rename to src/main/scala/com/paulasmuth/sqltap/CTreeBuffer.scala diff --git a/src/com/paulasmuth/sqltap/CTreeCache.scala b/src/main/scala/com/paulasmuth/sqltap/CTreeCache.scala similarity index 93% rename from src/com/paulasmuth/sqltap/CTreeCache.scala rename to src/main/scala/com/paulasmuth/sqltap/CTreeCache.scala index 4287492..bf98e3d 100644 --- a/src/com/paulasmuth/sqltap/CTreeCache.scala +++ b/src/main/scala/com/paulasmuth/sqltap/CTreeCache.scala @@ -17,7 +17,7 @@ object CTreeCache { CTreeMarshal.serialize(ctree_buf, ctree.stack.head, ins) - val request = new CacheStoreRequest(key, buf) + val request = new CacheStoreRequest(key, buf, ctree.expire) request.worker = worker worker.cache.enqueue(request) @@ -46,7 +46,7 @@ object CTreeCache { * @param record_id the primary id of the resource/record to be expired * @param resource_name the name of the resource to be expired */ - def expire(worker: Worker, resource_name: String, record_id: Int) : Unit = { + def expire(worker: Worker, resource_name: String, record_id: Long) : Unit = { if (!Manifest.has_resource(resource_name)) throw new ParseException("unknown resource: " + resource_name) diff --git a/src/com/paulasmuth/sqltap/CTreeIndex.scala b/src/main/scala/com/paulasmuth/sqltap/CTreeIndex.scala similarity index 82% rename from src/com/paulasmuth/sqltap/CTreeIndex.scala rename to src/main/scala/com/paulasmuth/sqltap/CTreeIndex.scala index e528950..1017313 100644 --- a/src/com/paulasmuth/sqltap/CTreeIndex.scala +++ b/src/main/scala/com/paulasmuth/sqltap/CTreeIndex.scala @@ -36,11 +36,12 @@ object CTreeIndex { score += ctree.base_score Logger.debug("CTree: evaluating candidate: '" + ctree.name + - "' (score: " + score + ", cost: " + cost + ") for: " + root.resource_name) + "' (score: " + score + ", cost: " + cost + ") for: " + + root.resource_name) - var matches = (cost == 0 && winner_cost > 0) - matches ||= (score > top_score && winner_cost != 0) - matches ||= (score == top_score && cost > winner_cost) + val matches = (cost == 0 && winner_cost > 0) || + (score > top_score && winner_cost != 0) || + (score == top_score && cost > winner_cost) if (matches) { winner = ctree diff --git a/src/com/paulasmuth/sqltap/CTreeInstruction.scala b/src/main/scala/com/paulasmuth/sqltap/CTreeInstruction.scala similarity index 100% rename from src/com/paulasmuth/sqltap/CTreeInstruction.scala rename to src/main/scala/com/paulasmuth/sqltap/CTreeInstruction.scala diff --git a/src/com/paulasmuth/sqltap/CTreeMarshal.scala b/src/main/scala/com/paulasmuth/sqltap/CTreeMarshal.scala similarity index 100% rename from src/com/paulasmuth/sqltap/CTreeMarshal.scala rename to src/main/scala/com/paulasmuth/sqltap/CTreeMarshal.scala diff --git a/src/com/paulasmuth/sqltap/CacheAdapter.scala b/src/main/scala/com/paulasmuth/sqltap/CacheAdapter.scala similarity index 100% rename from src/com/paulasmuth/sqltap/CacheAdapter.scala rename to src/main/scala/com/paulasmuth/sqltap/CacheAdapter.scala diff --git a/src/com/paulasmuth/sqltap/CacheBackend.scala b/src/main/scala/com/paulasmuth/sqltap/CacheBackend.scala similarity index 100% rename from src/com/paulasmuth/sqltap/CacheBackend.scala rename to src/main/scala/com/paulasmuth/sqltap/CacheBackend.scala diff --git a/src/com/paulasmuth/sqltap/CacheBackendFactory.scala b/src/main/scala/com/paulasmuth/sqltap/CacheBackendFactory.scala similarity index 72% rename from src/com/paulasmuth/sqltap/CacheBackendFactory.scala rename to src/main/scala/com/paulasmuth/sqltap/CacheBackendFactory.scala index ec883b6..c5fadc0 100644 --- a/src/com/paulasmuth/sqltap/CacheBackendFactory.scala +++ b/src/main/scala/com/paulasmuth/sqltap/CacheBackendFactory.scala @@ -8,25 +8,16 @@ package com.paulasmuth.sqltap object CacheBackendFactory { - def get(worker: Worker) : CacheBackend = { val name = Config.get('cache_backend) val backend = name match { - - case "memcache" => - new MemcacheConnectionPool() - - case "noop" => - new NoopCacheBackend() - - case _ => - throw new ParseException("unknown cache backend: " + name) - + case "memcache" => new MemcacheConnectionPool() + case "noop" => new NoopCacheBackend() + case _ => throw new ParseException("unknown cache backend: " + name) } backend.loop = worker.loop backend } - } diff --git a/src/com/paulasmuth/sqltap/CacheGetRequest.scala b/src/main/scala/com/paulasmuth/sqltap/CacheGetRequest.scala similarity index 100% rename from src/com/paulasmuth/sqltap/CacheGetRequest.scala rename to src/main/scala/com/paulasmuth/sqltap/CacheGetRequest.scala diff --git a/src/com/paulasmuth/sqltap/CachePurgeRequest.scala b/src/main/scala/com/paulasmuth/sqltap/CachePurgeRequest.scala similarity index 100% rename from src/com/paulasmuth/sqltap/CachePurgeRequest.scala rename to src/main/scala/com/paulasmuth/sqltap/CachePurgeRequest.scala diff --git a/src/com/paulasmuth/sqltap/CacheRequest.scala b/src/main/scala/com/paulasmuth/sqltap/CacheRequest.scala similarity index 100% rename from src/com/paulasmuth/sqltap/CacheRequest.scala rename to src/main/scala/com/paulasmuth/sqltap/CacheRequest.scala diff --git a/src/com/paulasmuth/sqltap/CacheStoreRequest.scala b/src/main/scala/com/paulasmuth/sqltap/CacheStoreRequest.scala similarity index 84% rename from src/com/paulasmuth/sqltap/CacheStoreRequest.scala rename to src/main/scala/com/paulasmuth/sqltap/CacheStoreRequest.scala index e39c009..578300c 100644 --- a/src/com/paulasmuth/sqltap/CacheStoreRequest.scala +++ b/src/main/scala/com/paulasmuth/sqltap/CacheStoreRequest.scala @@ -9,7 +9,7 @@ package com.paulasmuth.sqltap import scala.collection.mutable.{ListBuffer} -class CacheStoreRequest(_key: String, _buf: ElasticBuffer) extends CacheRequest { +class CacheStoreRequest(_key: String, _buf: ElasticBuffer, val expire: Int) extends CacheRequest { val key : String = _key buffer = _buf diff --git a/src/com/paulasmuth/sqltap/Config.scala b/src/main/scala/com/paulasmuth/sqltap/Config.scala similarity index 91% rename from src/com/paulasmuth/sqltap/Config.scala rename to src/main/scala/com/paulasmuth/sqltap/Config.scala index 94bab39..bb5f093 100644 --- a/src/com/paulasmuth/sqltap/Config.scala +++ b/src/main/scala/com/paulasmuth/sqltap/Config.scala @@ -24,7 +24,10 @@ object Config { 'memcache_mode -> "copy", 'memcache_queue_max_len -> "4096", 'memcache_max_connections -> "10", + 'memcache_host -> "127.0.0.1", + 'memcache_port -> "11211", 'threads -> "4", + 'log_queries -> "false", 'expiration_handler -> "purge", 'cache_backend -> "memcache" ) diff --git a/src/com/paulasmuth/sqltap/CountInstruction.scala b/src/main/scala/com/paulasmuth/sqltap/CountInstruction.scala similarity index 100% rename from src/com/paulasmuth/sqltap/CountInstruction.scala rename to src/main/scala/com/paulasmuth/sqltap/CountInstruction.scala diff --git a/src/com/paulasmuth/sqltap/DeltaStatistic.scala b/src/main/scala/com/paulasmuth/sqltap/DeltaStatistic.scala similarity index 75% rename from src/com/paulasmuth/sqltap/DeltaStatistic.scala rename to src/main/scala/com/paulasmuth/sqltap/DeltaStatistic.scala index 8a8b45b..c1b4da9 100644 --- a/src/com/paulasmuth/sqltap/DeltaStatistic.scala +++ b/src/main/scala/com/paulasmuth/sqltap/DeltaStatistic.scala @@ -11,25 +11,23 @@ import java.util.concurrent.atomic.{AtomicInteger} import java.text.{DecimalFormat} class DeltaStatistic extends Statistic { - - private val bucket = new AtomicInteger() - private var value : Double = 0.0 + private val current = new AtomicInteger() + private var last : Double = 0.0 private val format = new DecimalFormat("0.00") def incr(delta: Double) : Unit= { - bucket.getAndAdd(delta.toInt) + current.getAndAdd(delta.toInt) } def decr(delta: Double) : Unit = { - bucket.getAndAdd(delta.toInt * -1) + current.getAndAdd(delta.toInt * -1) } def get() : String = { - format.format(value) + format.format(last) } def flush(f: Double) : Unit = { - value = bucket.getAndSet(0) / f + last = current.getAndSet(0) / f } - } diff --git a/src/com/paulasmuth/sqltap/ElasticBuffer.scala b/src/main/scala/com/paulasmuth/sqltap/ElasticBuffer.scala similarity index 100% rename from src/com/paulasmuth/sqltap/ElasticBuffer.scala rename to src/main/scala/com/paulasmuth/sqltap/ElasticBuffer.scala diff --git a/src/com/paulasmuth/sqltap/Exceptions.scala b/src/main/scala/com/paulasmuth/sqltap/Exceptions.scala similarity index 81% rename from src/com/paulasmuth/sqltap/Exceptions.scala rename to src/main/scala/com/paulasmuth/sqltap/Exceptions.scala index 36313df..3480876 100644 --- a/src/com/paulasmuth/sqltap/Exceptions.scala +++ b/src/main/scala/com/paulasmuth/sqltap/Exceptions.scala @@ -28,7 +28,7 @@ class NotFoundException(cur: Instruction = null) extends Exception { if (cur == null) "not found" else - "could not find record '" + - (if (cur.relation == null) "null" else cur.relation.name) + - (if (cur.record.has_id) "' with id #" + cur.record.id.toString else "") + "could not find record " + + (if (cur.relation == null) "null" else "'" + cur.relation.name + "'") + + (if (cur.record.has_id) " with id #" + cur.record.id.toString else "") } diff --git a/src/com/paulasmuth/sqltap/ExpirationHandler.scala b/src/main/scala/com/paulasmuth/sqltap/ExpirationHandler.scala similarity index 100% rename from src/com/paulasmuth/sqltap/ExpirationHandler.scala rename to src/main/scala/com/paulasmuth/sqltap/ExpirationHandler.scala diff --git a/src/com/paulasmuth/sqltap/ExpirationHandlerFactory.scala b/src/main/scala/com/paulasmuth/sqltap/ExpirationHandlerFactory.scala similarity index 100% rename from src/com/paulasmuth/sqltap/ExpirationHandlerFactory.scala rename to src/main/scala/com/paulasmuth/sqltap/ExpirationHandlerFactory.scala diff --git a/src/com/paulasmuth/sqltap/ExpirationJob.scala b/src/main/scala/com/paulasmuth/sqltap/ExpirationJob.scala similarity index 97% rename from src/com/paulasmuth/sqltap/ExpirationJob.scala rename to src/main/scala/com/paulasmuth/sqltap/ExpirationJob.scala index 1d8f04b..58716c2 100644 --- a/src/com/paulasmuth/sqltap/ExpirationJob.scala +++ b/src/main/scala/com/paulasmuth/sqltap/ExpirationJob.scala @@ -34,7 +34,7 @@ class ExpirationJob(worker: Worker, ctree: CTree) extends ReadyCallback[Record] * * @param record the primary record id */ - def execute(record_id: Int) : Unit = { + def execute(record_id: Long) : Unit = { val primary_id = ctree.resource.id_field Logger.debug( @@ -45,7 +45,7 @@ class ExpirationJob(worker: Worker, ctree: CTree) extends ReadyCallback[Record] if (tuple._1 == primary_id) { val key = ctree.key(tuple._1, record_id.toString, tuple._2) handler.execute(worker, key) - keys = keys - tuple + keys = keys.filter(_ != tuple) } } } diff --git a/src/com/paulasmuth/sqltap/FindMultiInstruction.scala b/src/main/scala/com/paulasmuth/sqltap/FindMultiInstruction.scala similarity index 98% rename from src/com/paulasmuth/sqltap/FindMultiInstruction.scala rename to src/main/scala/com/paulasmuth/sqltap/FindMultiInstruction.scala index 9d7a62c..f597796 100644 --- a/src/com/paulasmuth/sqltap/FindMultiInstruction.scala +++ b/src/main/scala/com/paulasmuth/sqltap/FindMultiInstruction.scala @@ -29,7 +29,7 @@ class FindMultiInstruction extends SQLInstruction with CTreeInstruction { var offset : String = null var expanded : Boolean = false - var join_id : Int = 0 + var join_id : Long = 0 var join_conditions : String = null def execute(_worker: Worker) : Unit = { @@ -75,7 +75,7 @@ class FindMultiInstruction extends SQLInstruction with CTreeInstruction { else if (relation.join_field_local != null && prev.is_finished) { state = INS_STATE_READY - join_id = prev.record.get(relation.join_field_local).toInt + join_id = prev.record.get(relation.join_field_local).toLong } } @@ -186,7 +186,7 @@ class FindMultiInstruction extends SQLInstruction with CTreeInstruction { var found = false val this_id = row( - query.columns.indexOf(relation.resource.id_field)).toInt + query.columns.indexOf(relation.resource.id_field)).toLong while (!found && n > 0) { val this_ins = next(n - 1) diff --git a/src/com/paulasmuth/sqltap/FindSingleInstruction.scala b/src/main/scala/com/paulasmuth/sqltap/FindSingleInstruction.scala similarity index 98% rename from src/com/paulasmuth/sqltap/FindSingleInstruction.scala rename to src/main/scala/com/paulasmuth/sqltap/FindSingleInstruction.scala index e53ca26..a5765f5 100644 --- a/src/com/paulasmuth/sqltap/FindSingleInstruction.scala +++ b/src/main/scala/com/paulasmuth/sqltap/FindSingleInstruction.scala @@ -26,7 +26,7 @@ class FindSingleInstruction extends SQLInstruction with CTreeInstruction { var order : String = null var join_field : String = null - var join_id : Int = 0 + var join_id : Long = 0 var allow_empty : Boolean = false def execute(_worker: Worker) : Unit = { @@ -66,7 +66,7 @@ class FindSingleInstruction extends SQLInstruction with CTreeInstruction { } state = INS_STATE_READY - join_id = join_id_str.toInt + join_id = join_id_str.toLong if (join_id == 0) { state = INS_STATE_DONE diff --git a/src/com/paulasmuth/sqltap/GZIPTranscoder.scala b/src/main/scala/com/paulasmuth/sqltap/GZIPTranscoder.scala similarity index 97% rename from src/com/paulasmuth/sqltap/GZIPTranscoder.scala rename to src/main/scala/com/paulasmuth/sqltap/GZIPTranscoder.scala index 0be3aa3..50a56a9 100644 --- a/src/com/paulasmuth/sqltap/GZIPTranscoder.scala +++ b/src/main/scala/com/paulasmuth/sqltap/GZIPTranscoder.scala @@ -28,7 +28,7 @@ class GZIPTranscoder(buffer: ElasticBuffer) { buf.put(target.toByteArray()) } - def decode() : Unit = try { + def decode() : Unit = { val buf = buffer.retrieve() val source = new ByteArrayInputStream(buf.array.clone()) val gzip = new GZIPInputStream(source) @@ -50,7 +50,6 @@ class GZIPTranscoder(buffer: ElasticBuffer) { gzip.close source.close } - } diff --git a/src/com/paulasmuth/sqltap/HTTPConnection.scala b/src/main/scala/com/paulasmuth/sqltap/HTTPConnection.scala similarity index 92% rename from src/com/paulasmuth/sqltap/HTTPConnection.scala rename to src/main/scala/com/paulasmuth/sqltap/HTTPConnection.scala index b146876..cdb282c 100644 --- a/src/com/paulasmuth/sqltap/HTTPConnection.scala +++ b/src/main/scala/com/paulasmuth/sqltap/HTTPConnection.scala @@ -209,6 +209,16 @@ class HTTPConnection(sock: SocketChannel, worker: Worker) extends ReadyCallback[ else if (route.length > 1 && route.head == "expire") execute_expire(route.tail) + else if (route.length == 1 && route.head == "log_queries") { + Config.set('log_queries, "true") + execute_text(200, "sqltap query logging enabled\r\n") + } + + else if (route.length == 1 && route.head == "no_log_queries") { + Config.set('log_queries, "false") + execute_text(200, "sqltap query logging disabled\r\n") + } + else http_error(404, "not found") @@ -216,22 +226,28 @@ class HTTPConnection(sock: SocketChannel, worker: Worker) extends ReadyCallback[ Statistics.incr('http_requests_per_second) } - private def execute_ping() : Unit = { + private def execute_text(code: Integer, text: String) : Unit = { val http_buf = new HTTPWriter(buf) buf.clear - http_buf.write_status(200) - http_buf.write_content_length(6) + val body = text.getBytes + + http_buf.write_status(code) + http_buf.write_content_length(body.length) http_buf.write_default_headers() http_buf.finish_headers() - buf.put("pong\r\n".getBytes) + buf.put(body) buf.flip worker.requests_success.incrementAndGet() flush() } + private def execute_ping() : Unit = { + execute_text(200, "pong\r\n") + } + private def execute_request(params: List[String]) : Unit = { if (seq > 1) worker.requests_queued.incrementAndGet() diff --git a/src/com/paulasmuth/sqltap/HTTPParser.scala b/src/main/scala/com/paulasmuth/sqltap/HTTPParser.scala similarity index 100% rename from src/com/paulasmuth/sqltap/HTTPParser.scala rename to src/main/scala/com/paulasmuth/sqltap/HTTPParser.scala diff --git a/src/com/paulasmuth/sqltap/HTTPWriter.scala b/src/main/scala/com/paulasmuth/sqltap/HTTPWriter.scala similarity index 100% rename from src/com/paulasmuth/sqltap/HTTPWriter.scala rename to src/main/scala/com/paulasmuth/sqltap/HTTPWriter.scala diff --git a/src/com/paulasmuth/sqltap/Instruction.scala b/src/main/scala/com/paulasmuth/sqltap/Instruction.scala similarity index 100% rename from src/com/paulasmuth/sqltap/Instruction.scala rename to src/main/scala/com/paulasmuth/sqltap/Instruction.scala diff --git a/src/com/paulasmuth/sqltap/InstructionFactory.scala b/src/main/scala/com/paulasmuth/sqltap/InstructionFactory.scala similarity index 100% rename from src/com/paulasmuth/sqltap/InstructionFactory.scala rename to src/main/scala/com/paulasmuth/sqltap/InstructionFactory.scala diff --git a/src/com/paulasmuth/sqltap/InstructionStack.scala b/src/main/scala/com/paulasmuth/sqltap/InstructionStack.scala similarity index 100% rename from src/com/paulasmuth/sqltap/InstructionStack.scala rename to src/main/scala/com/paulasmuth/sqltap/InstructionStack.scala diff --git a/src/com/paulasmuth/sqltap/IntegralStatistic.scala b/src/main/scala/com/paulasmuth/sqltap/IntegralStatistic.scala similarity index 100% rename from src/com/paulasmuth/sqltap/IntegralStatistic.scala rename to src/main/scala/com/paulasmuth/sqltap/IntegralStatistic.scala diff --git a/src/com/paulasmuth/sqltap/JSONWriter.scala b/src/main/scala/com/paulasmuth/sqltap/JSONWriter.scala similarity index 90% rename from src/com/paulasmuth/sqltap/JSONWriter.scala rename to src/main/scala/com/paulasmuth/sqltap/JSONWriter.scala index 94b4502..576ce3b 100644 --- a/src/com/paulasmuth/sqltap/JSONWriter.scala +++ b/src/main/scala/com/paulasmuth/sqltap/JSONWriter.scala @@ -81,10 +81,15 @@ class JSONWriter(buf: WrappedBuffer) { val b = byte & 0x000000ff if (b == 0xA) { - buf.write(Array(0x5C.toByte, 0x6E.toByte)) + buf.write(Array(0x5C.toByte, 0x6E.toByte)) // \n } else if (b == 0x22) { - buf.write(Array(0x5C.toByte, 0x22.toByte)) - } else if ((b == 0) || ((b >= 0x20) && (b != 0x5C))) { + buf.write(Array(0x5C.toByte, 0x22.toByte)) // \" + } else if (b == 0x5C) { + buf.write(Array(0x5C.toByte, 0x5C.toByte)) // \\ + } else if (b < 0x20) { + buf.write(Array(0x5C.toByte, 0x75.toByte)) + buf.write("%04x".format(b).getBytes) // \u000b + } else { buf.write(byte) } } diff --git a/src/com/paulasmuth/sqltap/Logger.scala b/src/main/scala/com/paulasmuth/sqltap/Logger.scala similarity index 82% rename from src/com/paulasmuth/sqltap/Logger.scala rename to src/main/scala/com/paulasmuth/sqltap/Logger.scala index 0cf4469..6b003a6 100644 --- a/src/com/paulasmuth/sqltap/Logger.scala +++ b/src/main/scala/com/paulasmuth/sqltap/Logger.scala @@ -19,6 +19,11 @@ object Logger { println("[" + df.format(new Date()) + "] " + msg) } + def fatal(msg: String) : Unit = { + log("[FATAL] " + msg) + System.exit(1) + } + def error(msg: String, fatal: Boolean) : Unit = { log("[ERROR] " + msg) @@ -26,6 +31,14 @@ object Logger { System.exit(1) } + def notice(msg: String) : Unit = { + log("[NOTICE] " + msg) + } + + def info(msg: String) : Unit = { + log("[INFO] " + msg) + } + def debug(msg: String) : Unit = { if (Config.debug) log("[DEBUG] " + msg) diff --git a/src/com/paulasmuth/sqltap/Manifest.scala b/src/main/scala/com/paulasmuth/sqltap/Manifest.scala similarity index 100% rename from src/com/paulasmuth/sqltap/Manifest.scala rename to src/main/scala/com/paulasmuth/sqltap/Manifest.scala diff --git a/src/com/paulasmuth/sqltap/MeanStatistic.scala b/src/main/scala/com/paulasmuth/sqltap/MeanStatistic.scala similarity index 100% rename from src/com/paulasmuth/sqltap/MeanStatistic.scala rename to src/main/scala/com/paulasmuth/sqltap/MeanStatistic.scala diff --git a/src/main/scala/com/paulasmuth/sqltap/MemcacheConnection.scala b/src/main/scala/com/paulasmuth/sqltap/MemcacheConnection.scala new file mode 100644 index 0000000..c43829b --- /dev/null +++ b/src/main/scala/com/paulasmuth/sqltap/MemcacheConnection.scala @@ -0,0 +1,372 @@ +// This file is part of the "SQLTap" project +// (c) 2011-2013 Paul Asmuth +// +// Licensed under the MIT License (the "License"); you may not use this +// file except in compliance with the License. You may obtain a copy of +// the License at: http://opensource.org/licenses/MIT + +package com.paulasmuth.sqltap + +import scala.collection.mutable.{ListBuffer} +import java.nio.channels.{SocketChannel,SelectionKey} +import java.nio.{ByteBuffer,ByteOrder} +import java.net.{InetSocketAddress,ConnectException} + +/** + * @param pool pool that owns this connection + * @param hostname memcached-server's hostname + * @param port memcached-server's port number + */ +class MemcacheConnection(pool: MemcacheConnectionPool, hostname : String, port : Int) extends TimeoutCallback { + private val CR = 13.toByte + private val LF = 10.toByte + private val SP = 32.toByte + + // states: + // Uninitialized, Connecting, Idle, Cmd{Delete,Set,MGet}, Reading, Closed. + private val MC_STATE_INIT = 0 // just created, no connect() invoked yet (idle) + private val MC_STATE_CONN = 1 // just connected (idle) + private val MC_STATE_IDLE = 2 // idle (after a command) + private val MC_STATE_CMD_DELETE = 4 // + private val MC_STATE_CMD_SET = 5 // + private val MC_STATE_CMD_MGET = 6 // executing a multi-key GET + private val MC_STATE_READ = 7 // reading an execute_mget value chunk + private val MC_STATE_CLOSE = 8 // connection closed + + private val MC_WRITE_BUF_LEN = (65535 * 3) + private val MC_READ_BUF_LEN = (MC_WRITE_BUF_LEN * 8) + + private var state = MC_STATE_INIT + private var last_event : SelectionKey = null + private val read_buf = ByteBuffer.allocate(MC_READ_BUF_LEN) + private val write_buf = ByteBuffer.allocate(MC_WRITE_BUF_LEN) + write_buf.order(ByteOrder.LITTLE_ENDIAN) + + private val sock = SocketChannel.open() + sock.configureBlocking(false) + + private var timer = TimeoutScheduler.schedule(1000, this) + + private var requests : List[CacheRequest] = null + + // buffer for the current value to be received + private var cur_buf : ElasticBuffer = null + + // length in bytes of the currently received value still to be processed. + private var cur_len = 0 + + /** + * @brief Asynchronously establishes connection to the Memcached server. + * + * @see ready(event: SelectionKey) + */ + def connect() : Unit = { + Statistics.incr('memcache_connections_open) + + val addr = new InetSocketAddress(hostname, port) + sock.connect(addr) + state = MC_STATE_CONN + + timer.start() + + last_event = sock.register(pool.loop, SelectionKey.OP_CONNECT) + last_event.attach(this) + } + + /** + * @brief Callback, invoked upon non-blocking connect() completion. + * + * Moves MemcacheConnection state from MC_STATE_CONN into MC_STATE_IDLE state. + */ + def ready(event: SelectionKey) : Unit = { + try { + sock.finishConnect + } catch { + case e: ConnectException => { + Logger.error("[Memcache] connection failed: " + e.toString, false) + return close(e) + } + } + + idle() + } + + /** + * @brief Puts connection into ready state and then back into the idle pool. + */ + private def idle() : Unit = { + timer.cancel() + state = MC_STATE_IDLE + last_event.interestOps(0) + requests = null + pool.ready(this) + } + + /** + * @brief Retrieves values for multiple keys. + * + * @param keys list of keys to retrieve + * @param _requests list of CacheRequest objects to store the values to + */ + def execute_mget(keys: List[String], _requests: List[CacheRequest]) : Unit = { + Logger.debug("[Memcache] mget: " + keys.mkString(", ")) + + requests = _requests + + if (state != MC_STATE_IDLE) + throw new ExecutionException("memcache connection busy") + + timer.start() + + write_buf.clear + write_buf.put("get".getBytes) + + for (key <- keys) { + write_buf.put(SP) + write_buf.put(key.getBytes("UTF-8")) + } + + write_buf.put(CR) + write_buf.put(LF) + write_buf.flip + + state = MC_STATE_CMD_MGET + last_event.interestOps(SelectionKey.OP_WRITE) + } + + def execute_set(key: String, request: CacheStoreRequest) : Unit = { + Logger.debug("[Memcache] store: " + key) + + val buf = request.buffer.buffer + val len = buf.position + + request.ready() + + if (len > MC_WRITE_BUF_LEN - 512) { + // some hacky safe margin + // minus 512 because of the first command line + return; + } + + buf.position(0) + buf.limit(len) + + if (state != MC_STATE_IDLE) + throw new ExecutionException("memcache connection busy") + + timer.start() + + // "set $key 0 $expiry $len\r\n$buf\r\n" + write_buf.clear + write_buf.put("set".getBytes) + write_buf.put(SP) + write_buf.put(key.getBytes("UTF-8")) + write_buf.put(SP) + write_buf.put('0'.toByte) + write_buf.put(SP) + write_buf.put(request.expire.toString.getBytes("UTF-8")) + write_buf.put(SP) + write_buf.put(len.toString.getBytes("UTF-8")) + write_buf.put(CR) + write_buf.put(LF) + write_buf.put(buf) + write_buf.put(CR) + write_buf.put(LF) + write_buf.flip + + state = MC_STATE_CMD_SET + last_event.interestOps(SelectionKey.OP_WRITE) + } + + def execute_delete(key: String) : Unit = { + Logger.debug("[Memcache] delete: " + key) + + if (state != MC_STATE_IDLE) + throw new ExecutionException("memcache connection busy") + + timer.start() + + write_buf.clear + write_buf.put("delete".getBytes) + write_buf.put(SP) + write_buf.put(key.getBytes("UTF-8")) + write_buf.put(CR) + write_buf.put(LF) + write_buf.flip + + state = MC_STATE_CMD_DELETE + last_event.interestOps(SelectionKey.OP_WRITE) + } + + /** + * @brief Callback, invoked when underlying socket is non-blocking readable. + * + * Processes any incoming data, i.e. the response from the underlying + * memcached server. + */ + def read(event: SelectionKey) : Unit = { + val chunk = sock.read(read_buf) + + if (chunk <= 0) { + Logger.error("[Memcache] read end of file ", false) + close(new ExecutionException("memcache connection closed")) + return + } + + while (read_buf.position > 0) { + if (state == MC_STATE_READ) { + // process response body chunk, from pos to min(maxpos, pos + cur_len) + val cur_chunk_len = math.min(read_buf.position, cur_len) + + cur_len -= cur_chunk_len + cur_buf.write(read_buf.array, 0, cur_chunk_len) + + // GET-value response chunk fully consumed? + if (cur_len == 0) { + cur_buf.retrieve.limit(cur_buf.retrieve.limit() - 2) + cur_buf.buffer.flip() + state = MC_STATE_CMD_MGET + } + + read_buf.limit(read_buf.position) + read_buf.position(cur_chunk_len) + read_buf.compact() + } else { + var found = false; + var i = 0; + + while (!found && i < read_buf.position) { + if (read_buf.get(i) == LF) { + val headline = new String(read_buf.array, 0, i - 1, "UTF-8") + read_buf.limit(read_buf.position) + read_buf.position(i + 1) + read_buf.compact() + next(headline) + found = true; + } + i = i + 1; + } + + if (!found) { + return; + } + } + } + } + + /** + * @brief Callback, invoked when underlying connection is non-blocking + * writable. + * + * When all data has been flushed out to the memcached server, + * we will stop watching for WRITE events and switch back to READ. + */ + def write(event: SelectionKey) : Unit = { + try { + sock.write(write_buf) + } catch { + case e: Exception => { + Logger.error("[Memcache] conn error: " + e.toString, false) + return close(e) + } + } + + if (write_buf.remaining == 0) { + write_buf.clear + last_event.interestOps(SelectionKey.OP_READ) + } + } + + /** + * @brief Closes this connection and notifies the owning pool about the close. + * + * @param err The exception that potentially caused the close. + */ + def close(err: Throwable = null) : Unit = { + if (state == MC_STATE_CLOSE) + return + + try { + if (requests != null) { + requests.foreach(_.ready()) + } + } catch { + case e: Exception => { + Logger.exception(e, false) + } + } + + state = MC_STATE_CLOSE + + pool.close(this) + sock.close() + Statistics.decr('memcache_connections_open) + } + + /** @brief Callback, invoked upon I/O completion timeout. + * + * Closes the memcache connection. + */ + def timeout() : Unit = { + Logger.error("[Memcache] connection timed out...", false) + close() + } + + /** @brief Retrieves the corresponding CacheRequest to the given @p key. + * + * @return never null but the CacheRequest object. + */ + private def get_request_by_key(key: String) : CacheRequest = { + requests.find(r => r.key == key && r.buffer == null) match { + case Some(r) => r + case None => throw new ExecutionException("[Memcache] invalid response key: " + key) + } + } + + /** + * @brief Processes a command response. + * + * @param the first line of the response + * + * Usually commands have a response of only one command, thus, they'll + * directly transition the connection to the idle-state. + * + * Other commands (such as GET) may require reading more data. + */ + private def next(cmd: String) : Unit = { + state match { + case MC_STATE_CMD_DELETE => cmd match { + case "DELETED" => idle() + case "NOT_FOUND" => idle() + } + case MC_STATE_CMD_SET => cmd match { + case "STORED" => idle() + case "NOT_STORED" => idle() + } + case MC_STATE_CMD_MGET => { + val parts = cmd.split(" ") + + if (parts.length == 1 && parts.head == "END") { + requests.foreach(_.ready()) + return idle() + } + + // expect ["VALUE", key, 0, dataLength] + if (parts.length != 4) { + throw new ExecutionException("[Memcache] protocol error: " + cmd) + } + + val req = get_request_by_key(parts(1)) + cur_len = parts(3).toInt + 2 + cur_buf = new ElasticBuffer(65535 * 8) // FIXME why not of size cur_len? + req.buffer = cur_buf + + state = MC_STATE_READ + } + case _ => { + throw new ExecutionException("unexpected token " + cmd + + " (" + state.toString + ")") + } + } + } +} diff --git a/src/com/paulasmuth/sqltap/MemcacheConnectionPool.scala b/src/main/scala/com/paulasmuth/sqltap/MemcacheConnectionPool.scala similarity index 84% rename from src/com/paulasmuth/sqltap/MemcacheConnectionPool.scala rename to src/main/scala/com/paulasmuth/sqltap/MemcacheConnectionPool.scala index 6c45c8f..0f42f4e 100644 --- a/src/com/paulasmuth/sqltap/MemcacheConnectionPool.scala +++ b/src/main/scala/com/paulasmuth/sqltap/MemcacheConnectionPool.scala @@ -28,8 +28,7 @@ class MemcacheConnectionPool extends CacheBackend { if (queue.length >= max_queue_len) { requests.foreach(_.ready()) - Logger.exception( - new TemporaryException("memcache queue is full"), false) + Logger.exception(new TemporaryException("memcache queue is full"), false) return } @@ -91,7 +90,6 @@ class MemcacheConnectionPool extends CacheBackend { batch += req } - Logger.debug("[Memcache] mget: " + keys.mkString(", ")) conn.execute_mget(keys.toList, batch.toList) execute_next() @@ -108,7 +106,9 @@ class MemcacheConnectionPool extends CacheBackend { } private def connect() : Unit = { - val conn = new MemcacheConnection(this) + val port: Int = Config.get('memcache_port).toInt + val host: String = Config.get('memcache_host) + val conn = new MemcacheConnection(this, host, port) conn.connect() connections += conn @@ -116,18 +116,8 @@ class MemcacheConnectionPool extends CacheBackend { private def execute(connection: MemcacheConnection, req: CacheRequest) = { req match { - - case set: CacheStoreRequest => { - Logger.debug("[Memcache] store: " + req.key) - connection.execute_set(req.key, set) - } - - case purge: CachePurgeRequest => { - Logger.debug("[Memcache] delete: " + req.key) - connection.execute_delete(purge.key) - } - + case set: CacheStoreRequest => connection.execute_set(req.key, set) + case purge: CachePurgeRequest => connection.execute_delete(purge.key) } } - } diff --git a/src/com/paulasmuth/sqltap/NoopCacheBackend.scala b/src/main/scala/com/paulasmuth/sqltap/NoopCacheBackend.scala similarity index 100% rename from src/com/paulasmuth/sqltap/NoopCacheBackend.scala rename to src/main/scala/com/paulasmuth/sqltap/NoopCacheBackend.scala diff --git a/src/com/paulasmuth/sqltap/NoopExpirationHandler.scala b/src/main/scala/com/paulasmuth/sqltap/NoopExpirationHandler.scala similarity index 100% rename from src/com/paulasmuth/sqltap/NoopExpirationHandler.scala rename to src/main/scala/com/paulasmuth/sqltap/NoopExpirationHandler.scala diff --git a/src/com/paulasmuth/sqltap/PhiInstruction.scala b/src/main/scala/com/paulasmuth/sqltap/PhiInstruction.scala similarity index 100% rename from src/com/paulasmuth/sqltap/PhiInstruction.scala rename to src/main/scala/com/paulasmuth/sqltap/PhiInstruction.scala diff --git a/src/com/paulasmuth/sqltap/PurgeExpirationHandler.scala b/src/main/scala/com/paulasmuth/sqltap/PurgeExpirationHandler.scala similarity index 100% rename from src/com/paulasmuth/sqltap/PurgeExpirationHandler.scala rename to src/main/scala/com/paulasmuth/sqltap/PurgeExpirationHandler.scala diff --git a/src/com/paulasmuth/sqltap/Query.scala b/src/main/scala/com/paulasmuth/sqltap/Query.scala similarity index 100% rename from src/com/paulasmuth/sqltap/Query.scala rename to src/main/scala/com/paulasmuth/sqltap/Query.scala diff --git a/src/com/paulasmuth/sqltap/QueryParser.scala b/src/main/scala/com/paulasmuth/sqltap/QueryParser.scala similarity index 97% rename from src/com/paulasmuth/sqltap/QueryParser.scala rename to src/main/scala/com/paulasmuth/sqltap/QueryParser.scala index 2fe6edc..b56459e 100644 --- a/src/com/paulasmuth/sqltap/QueryParser.scala +++ b/src/main/scala/com/paulasmuth/sqltap/QueryParser.scala @@ -18,6 +18,11 @@ object QueryParser { private val PARSER_STATE_BODY = 5 def parse(stack: InstructionStack, qry: String) : Unit = { + + if (Config.get('log_queries).equals("true")) { + Logger.info("[QueryParser] parse: " + qry) + } + var args = new ListBuffer[String]() var state = PARSER_STATE_NEXT diff --git a/src/com/paulasmuth/sqltap/RawSQLInstruction.scala b/src/main/scala/com/paulasmuth/sqltap/RawSQLInstruction.scala similarity index 100% rename from src/com/paulasmuth/sqltap/RawSQLInstruction.scala rename to src/main/scala/com/paulasmuth/sqltap/RawSQLInstruction.scala diff --git a/src/com/paulasmuth/sqltap/ReadyCallback.scala b/src/main/scala/com/paulasmuth/sqltap/ReadyCallback.scala similarity index 100% rename from src/com/paulasmuth/sqltap/ReadyCallback.scala rename to src/main/scala/com/paulasmuth/sqltap/ReadyCallback.scala diff --git a/src/com/paulasmuth/sqltap/Record.scala b/src/main/scala/com/paulasmuth/sqltap/Record.scala similarity index 93% rename from src/com/paulasmuth/sqltap/Record.scala rename to src/main/scala/com/paulasmuth/sqltap/Record.scala index b655062..6f094c4 100644 --- a/src/com/paulasmuth/sqltap/Record.scala +++ b/src/main/scala/com/paulasmuth/sqltap/Record.scala @@ -15,16 +15,16 @@ class Record(_resource: ResourceManifest) { var fields = ListBuffer[String]() var data = ListBuffer[String]() - def id() : Int = { - get(resource.id_field).toInt + def id() : Long = { + get(resource.id_field).toLong } - def set_id(id: Int) : Unit = { + def set_id(id: Long) : Unit = { set(resource.id_field, id.toString) } def set_id(id: String) : Unit = { - set_id(id.toInt) + set_id(id.toLong) } def has_id() : Boolean = { diff --git a/src/com/paulasmuth/sqltap/RecordLookupJob.scala b/src/main/scala/com/paulasmuth/sqltap/RecordLookupJob.scala similarity index 96% rename from src/com/paulasmuth/sqltap/RecordLookupJob.scala rename to src/main/scala/com/paulasmuth/sqltap/RecordLookupJob.scala index ca4030a..900cfa1 100644 --- a/src/com/paulasmuth/sqltap/RecordLookupJob.scala +++ b/src/main/scala/com/paulasmuth/sqltap/RecordLookupJob.scala @@ -14,7 +14,7 @@ class RecordLookupJob(worker: Worker, resource: ResourceManifest) extends ReadyC private val callbacks = new ListBuffer[ReadyCallback[Record]]() - def execute(record_id: Int) : Unit = { + def execute(record_id: Long) : Unit = { if (callbacks.length == 0) { return // RecordLookupJob is a noop without callbacks } diff --git a/src/com/paulasmuth/sqltap/RelationTrace.scala b/src/main/scala/com/paulasmuth/sqltap/RelationTrace.scala similarity index 100% rename from src/com/paulasmuth/sqltap/RelationTrace.scala rename to src/main/scala/com/paulasmuth/sqltap/RelationTrace.scala diff --git a/src/com/paulasmuth/sqltap/ReplicationFeed.scala b/src/main/scala/com/paulasmuth/sqltap/ReplicationFeed.scala similarity index 96% rename from src/com/paulasmuth/sqltap/ReplicationFeed.scala rename to src/main/scala/com/paulasmuth/sqltap/ReplicationFeed.scala index 6f76440..0ac2ac5 100644 --- a/src/com/paulasmuth/sqltap/ReplicationFeed.scala +++ b/src/main/scala/com/paulasmuth/sqltap/ReplicationFeed.scala @@ -29,7 +29,7 @@ object ReplicationFeed extends Worker with AbstractSQLConnectionPool { CTreeCache.expire(this, Manifest.resource_name_for_table(evt.table_name), - evt.primary_key.toInt) + evt.primary_key.toLong) } } @@ -42,7 +42,7 @@ object ReplicationFeed extends Worker with AbstractSQLConnectionPool { } else { val row = query.rows.last val position = row.last.toInt - val filename = row.first + val filename = row.head conn.start_binlog(filename, position) } diff --git a/src/com/paulasmuth/sqltap/Request.scala b/src/main/scala/com/paulasmuth/sqltap/Request.scala similarity index 100% rename from src/com/paulasmuth/sqltap/Request.scala rename to src/main/scala/com/paulasmuth/sqltap/Request.scala diff --git a/src/com/paulasmuth/sqltap/ResourceManifest.scala b/src/main/scala/com/paulasmuth/sqltap/ResourceManifest.scala similarity index 96% rename from src/com/paulasmuth/sqltap/ResourceManifest.scala rename to src/main/scala/com/paulasmuth/sqltap/ResourceManifest.scala index 6471c98..af2df00 100644 --- a/src/com/paulasmuth/sqltap/ResourceManifest.scala +++ b/src/main/scala/com/paulasmuth/sqltap/ResourceManifest.scala @@ -31,7 +31,7 @@ class ResourceManifest(doc: xml.Node) { elem.attr("id_field", false, "id") val default_order : String = - elem.attr("default_order", false, id_field + " DESC") + elem.attr("default_order", false, "`" + id_field + "` DESC") val relations = ((List[ResourceRelation]() /: (doc \ "relation")) diff --git a/src/com/paulasmuth/sqltap/ResourceRelation.scala b/src/main/scala/com/paulasmuth/sqltap/ResourceRelation.scala similarity index 100% rename from src/com/paulasmuth/sqltap/ResourceRelation.scala rename to src/main/scala/com/paulasmuth/sqltap/ResourceRelation.scala diff --git a/src/com/paulasmuth/sqltap/SQLBuilder.scala b/src/main/scala/com/paulasmuth/sqltap/SQLBuilder.scala similarity index 98% rename from src/com/paulasmuth/sqltap/SQLBuilder.scala rename to src/main/scala/com/paulasmuth/sqltap/SQLBuilder.scala index c074a3f..7ff6da7 100644 --- a/src/com/paulasmuth/sqltap/SQLBuilder.scala +++ b/src/main/scala/com/paulasmuth/sqltap/SQLBuilder.scala @@ -12,7 +12,7 @@ object SQLBuilder { def select( res: ResourceManifest, id_field: String, - id: Int, + id: Long, fields: List[String], cond: String, order: String, @@ -45,7 +45,7 @@ object SQLBuilder { def count( res: ResourceManifest, id_field: String, - id: Int, + id: Long, cond: String ) : String = ( diff --git a/src/com/paulasmuth/sqltap/SQLHelper.scala b/src/main/scala/com/paulasmuth/sqltap/SQLHelper.scala similarity index 100% rename from src/com/paulasmuth/sqltap/SQLHelper.scala rename to src/main/scala/com/paulasmuth/sqltap/SQLHelper.scala diff --git a/src/com/paulasmuth/sqltap/SQLInstruction.scala b/src/main/scala/com/paulasmuth/sqltap/SQLInstruction.scala similarity index 100% rename from src/com/paulasmuth/sqltap/SQLInstruction.scala rename to src/main/scala/com/paulasmuth/sqltap/SQLInstruction.scala diff --git a/src/com/paulasmuth/sqltap/SQLTap.scala b/src/main/scala/com/paulasmuth/sqltap/SQLTap.scala similarity index 91% rename from src/com/paulasmuth/sqltap/SQLTap.scala rename to src/main/scala/com/paulasmuth/sqltap/SQLTap.scala index 8f6477b..4227dbd 100644 --- a/src/com/paulasmuth/sqltap/SQLTap.scala +++ b/src/main/scala/com/paulasmuth/sqltap/SQLTap.scala @@ -19,7 +19,7 @@ import java.io.File object SQLTap{ - val VERSION = "v0.7.21" + val VERSION = "v0.8.6" def main(args: Array[String]) : Unit = { var n = 0 @@ -56,9 +56,6 @@ object SQLTap{ else if (args(n) == "--cache-backend") { Config.set('cache_backend, args(n+1)); n += 2 } - else if (args(n) == "--memcache-hosts") - { Config.set('memcache_hosts, args(n+1)); n += 2 } - else if (args(n) == "--memcache-mode") { Config.set('memcache_mode, args(n+1)); n += 2 } @@ -74,9 +71,18 @@ object SQLTap{ else if (args(n) == "--log-slow-queries") { Config.set('log_slow_queries, args(n+1)); n += 2 } + else if (args(n) == "--memcache-host") + { Config.set('memcache_host, args(n+1)); n += 2 } + + else if (args(n) == "--memcache-port") + { Config.set('memcache_port, args(n+1)); n += 2 } + else if ((args(n) == "-t") || (args(n) == "--threads")) { Config.set('threads, args(n+1)); n += 2 } + else if (args(n) == "--log-queries") + { Config.set('log_queries, "true"); n += 1 } + else if ((args(n) == "-c") || (args(n) == "--config")) { Config.set('config_base, args(n+1)); n += 2 } @@ -135,7 +141,8 @@ object SQLTap{ println(" --mysql-numconns max number of mysql connections per worker ") println(" --expiration-handler expiration handler (noop, purge, refresh) ") println(" --cache-backend cache backend (memcache) ") - println(" --memcache-hosts comma-seperated memcache servers (host:port) ") + println(" --memcache-host memcache server host ") + println(" --memcache-port memcache server port ") println(" --memcache-queuelen max mysql queue size per worker ") println(" --memcache-numconns max number of mysql connections per worker ") println(" --memcache-mode replication mode (copy, shard) ") diff --git a/src/com/paulasmuth/sqltap/Server.scala b/src/main/scala/com/paulasmuth/sqltap/Server.scala similarity index 100% rename from src/com/paulasmuth/sqltap/Server.scala rename to src/main/scala/com/paulasmuth/sqltap/Server.scala diff --git a/src/com/paulasmuth/sqltap/Statistic.scala b/src/main/scala/com/paulasmuth/sqltap/Statistic.scala similarity index 100% rename from src/com/paulasmuth/sqltap/Statistic.scala rename to src/main/scala/com/paulasmuth/sqltap/Statistic.scala diff --git a/src/com/paulasmuth/sqltap/Statistics.scala b/src/main/scala/com/paulasmuth/sqltap/Statistics.scala similarity index 91% rename from src/com/paulasmuth/sqltap/Statistics.scala rename to src/main/scala/com/paulasmuth/sqltap/Statistics.scala index f943ef8..f795129 100644 --- a/src/com/paulasmuth/sqltap/Statistics.scala +++ b/src/main/scala/com/paulasmuth/sqltap/Statistics.scala @@ -21,6 +21,9 @@ object Statistics { 'sql_requests_total -> new IntegralStatistic, 'sql_requests_per_second -> new DeltaStatistic, 'sql_request_time_mean -> new MeanStatistic, + 'sql_slow_queries_total -> new IntegralStatistic, + 'sql_slow_queries_per_second -> new DeltaStatistic, + 'sql_slow_queries_time_mean -> new MeanStatistic, 'memcache_requests_total -> new IntegralStatistic, 'memcache_requests_per_second -> new DeltaStatistic, 'memcache_connections_open -> new IntegralStatistic diff --git a/src/com/paulasmuth/sqltap/Timeout.scala b/src/main/scala/com/paulasmuth/sqltap/Timeout.scala similarity index 100% rename from src/com/paulasmuth/sqltap/Timeout.scala rename to src/main/scala/com/paulasmuth/sqltap/Timeout.scala diff --git a/src/com/paulasmuth/sqltap/TimeoutCallback.scala b/src/main/scala/com/paulasmuth/sqltap/TimeoutCallback.scala similarity index 100% rename from src/com/paulasmuth/sqltap/TimeoutCallback.scala rename to src/main/scala/com/paulasmuth/sqltap/TimeoutCallback.scala diff --git a/src/com/paulasmuth/sqltap/TimeoutScheduler.scala b/src/main/scala/com/paulasmuth/sqltap/TimeoutScheduler.scala similarity index 100% rename from src/com/paulasmuth/sqltap/TimeoutScheduler.scala rename to src/main/scala/com/paulasmuth/sqltap/TimeoutScheduler.scala diff --git a/src/com/paulasmuth/sqltap/Watchdog.scala b/src/main/scala/com/paulasmuth/sqltap/Watchdog.scala similarity index 100% rename from src/com/paulasmuth/sqltap/Watchdog.scala rename to src/main/scala/com/paulasmuth/sqltap/Watchdog.scala diff --git a/src/com/paulasmuth/sqltap/Worker.scala b/src/main/scala/com/paulasmuth/sqltap/Worker.scala similarity index 94% rename from src/com/paulasmuth/sqltap/Worker.scala rename to src/main/scala/com/paulasmuth/sqltap/Worker.scala index 3b7a97b..40ae809 100644 --- a/src/com/paulasmuth/sqltap/Worker.scala +++ b/src/main/scala/com/paulasmuth/sqltap/Worker.scala @@ -80,6 +80,11 @@ class Worker() extends Thread { conn.write(event) } catch { + case e: NotFoundException => { + // but do not log it, as it's a client side error we do not care + // about in the server side log + conn.close(e) + } case e: Exception => { Logger.error("[SQL] exception: " + e.toString, false) Logger.exception(e, false) diff --git a/src/com/paulasmuth/sqltap/WrappedBuffer.scala b/src/main/scala/com/paulasmuth/sqltap/WrappedBuffer.scala similarity index 100% rename from src/com/paulasmuth/sqltap/WrappedBuffer.scala rename to src/main/scala/com/paulasmuth/sqltap/WrappedBuffer.scala diff --git a/src/com/paulasmuth/sqltap/XMLHelper.scala b/src/main/scala/com/paulasmuth/sqltap/XMLHelper.scala similarity index 100% rename from src/com/paulasmuth/sqltap/XMLHelper.scala rename to src/main/scala/com/paulasmuth/sqltap/XMLHelper.scala diff --git a/src/com/paulasmuth/sqltap/mysql/AbstractSQLConnectionPool.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/AbstractSQLConnectionPool.scala similarity index 100% rename from src/com/paulasmuth/sqltap/mysql/AbstractSQLConnectionPool.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/AbstractSQLConnectionPool.scala diff --git a/src/com/paulasmuth/sqltap/mysql/AuthSwitchResponsePacket.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/AuthSwitchResponsePacket.scala similarity index 100% rename from src/com/paulasmuth/sqltap/mysql/AuthSwitchResponsePacket.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/AuthSwitchResponsePacket.scala diff --git a/src/com/paulasmuth/sqltap/mysql/BinaryInteger.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/BinaryInteger.scala similarity index 73% rename from src/com/paulasmuth/sqltap/mysql/BinaryInteger.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/BinaryInteger.scala index c3a9108..d67c571 100644 --- a/src/com/paulasmuth/sqltap/mysql/BinaryInteger.scala +++ b/src/main/scala/com/paulasmuth/sqltap/mysql/BinaryInteger.scala @@ -18,4 +18,13 @@ object BinaryInteger { return value } + def readLong(data: Array[Byte], pos: Int, len: Int) : Long = { + var value : Long = 0 + + for (n <- (0 until len)) + value += (data(pos + n) & 0x000000ff) << (8*n) + + return value + } + } diff --git a/src/com/paulasmuth/sqltap/mysql/BinaryString.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/BinaryString.scala similarity index 100% rename from src/com/paulasmuth/sqltap/mysql/BinaryString.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/BinaryString.scala diff --git a/src/com/paulasmuth/sqltap/mysql/BinlogDumpPacket.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/BinlogDumpPacket.scala similarity index 100% rename from src/com/paulasmuth/sqltap/mysql/BinlogDumpPacket.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/BinlogDumpPacket.scala diff --git a/src/com/paulasmuth/sqltap/mysql/BinlogEvent.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/BinlogEvent.scala similarity index 100% rename from src/com/paulasmuth/sqltap/mysql/BinlogEvent.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/BinlogEvent.scala diff --git a/src/com/paulasmuth/sqltap/mysql/BinlogEventPacket.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/BinlogEventPacket.scala similarity index 100% rename from src/com/paulasmuth/sqltap/mysql/BinlogEventPacket.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/BinlogEventPacket.scala diff --git a/src/com/paulasmuth/sqltap/mysql/ColumnDefinition.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/ColumnDefinition.scala similarity index 100% rename from src/com/paulasmuth/sqltap/mysql/ColumnDefinition.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/ColumnDefinition.scala diff --git a/src/com/paulasmuth/sqltap/mysql/FormatDescriptionBinlogEvent.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/FormatDescriptionBinlogEvent.scala similarity index 100% rename from src/com/paulasmuth/sqltap/mysql/FormatDescriptionBinlogEvent.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/FormatDescriptionBinlogEvent.scala diff --git a/src/com/paulasmuth/sqltap/mysql/HandshakePacket.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/HandshakePacket.scala similarity index 100% rename from src/com/paulasmuth/sqltap/mysql/HandshakePacket.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/HandshakePacket.scala diff --git a/src/com/paulasmuth/sqltap/mysql/HandshakeResponsePacket.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/HandshakeResponsePacket.scala similarity index 100% rename from src/com/paulasmuth/sqltap/mysql/HandshakeResponsePacket.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/HandshakeResponsePacket.scala diff --git a/src/com/paulasmuth/sqltap/mysql/LengthEncodedInteger.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/LengthEncodedInteger.scala similarity index 100% rename from src/com/paulasmuth/sqltap/mysql/LengthEncodedInteger.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/LengthEncodedInteger.scala diff --git a/src/com/paulasmuth/sqltap/mysql/LengthEncodedString.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/LengthEncodedString.scala similarity index 87% rename from src/com/paulasmuth/sqltap/mysql/LengthEncodedString.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/LengthEncodedString.scala index 9d0fafa..bfd753e 100644 --- a/src/com/paulasmuth/sqltap/mysql/LengthEncodedString.scala +++ b/src/main/scala/com/paulasmuth/sqltap/mysql/LengthEncodedString.scala @@ -24,7 +24,7 @@ object LengthEncodedString { offset += 3 } - else if ((data(0) & 0x000000ff) == 0xfd) { + else if ((data(pos) & 0x000000ff) == 0xfd) { length += (data(pos + 1) & 0x000000ff) length += (data(pos + 2) & 0x000000ff) << 8 length += (data(pos + 3) & 0x000000ff) << 16 @@ -35,7 +35,7 @@ object LengthEncodedString { throw new SQLProtocolError("length encoded string too large!") else - throw new SQLProtocolError("invalid length encoded string") + throw new SQLProtocolError("invalid length encoded string. pos: " + pos + ", type: " + (data(pos) & 0x000000ff)) val string = new String(data, offset, length, "UTF-8") diff --git a/src/com/paulasmuth/sqltap/mysql/OldPasswordAuthentication.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/OldPasswordAuthentication.scala similarity index 100% rename from src/com/paulasmuth/sqltap/mysql/OldPasswordAuthentication.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/OldPasswordAuthentication.scala diff --git a/src/com/paulasmuth/sqltap/mysql/PingPacket.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/PingPacket.scala similarity index 100% rename from src/com/paulasmuth/sqltap/mysql/PingPacket.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/PingPacket.scala diff --git a/src/com/paulasmuth/sqltap/mysql/RowsBinlogEvent.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/RowsBinlogEvent.scala similarity index 97% rename from src/com/paulasmuth/sqltap/mysql/RowsBinlogEvent.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/RowsBinlogEvent.scala index f4a954b..e9e6cc5 100644 --- a/src/com/paulasmuth/sqltap/mysql/RowsBinlogEvent.scala +++ b/src/main/scala/com/paulasmuth/sqltap/mysql/RowsBinlogEvent.scala @@ -65,8 +65,8 @@ trait RowsBinlogEvent extends BinlogEvent { num._1 } - def read_int(bytes: Int) : Int = { - val num = BinaryInteger.read(data, cur, bytes) + def read_int(bytes: Int) : Long = { + val num = BinaryInteger.readLong(data, cur, bytes) cur += bytes num } diff --git a/src/com/paulasmuth/sqltap/mysql/SQLConnection.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/SQLConnection.scala similarity index 99% rename from src/com/paulasmuth/sqltap/mysql/SQLConnection.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/SQLConnection.scala index 56131bb..77aae01 100644 --- a/src/com/paulasmuth/sqltap/mysql/SQLConnection.scala +++ b/src/main/scala/com/paulasmuth/sqltap/mysql/SQLConnection.scala @@ -177,6 +177,8 @@ class SQLConnection(pool: AbstractSQLConnectionPool) extends TimeoutCallback { if (cur_qry != null) cur_qry.error(err) + else + Logger.exception(err, false) state = SQL_STATE_CLOSE heartbeat.cancel() diff --git a/src/com/paulasmuth/sqltap/mysql/SQLConnectionPool.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/SQLConnectionPool.scala similarity index 100% rename from src/com/paulasmuth/sqltap/mysql/SQLConnectionPool.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/SQLConnectionPool.scala diff --git a/src/com/paulasmuth/sqltap/mysql/SQLPacket.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/SQLPacket.scala similarity index 100% rename from src/com/paulasmuth/sqltap/mysql/SQLPacket.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/SQLPacket.scala diff --git a/src/com/paulasmuth/sqltap/mysql/SQLQuery.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/SQLQuery.scala similarity index 92% rename from src/com/paulasmuth/sqltap/mysql/SQLQuery.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/SQLQuery.scala index deeac0e..7c896b3 100644 --- a/src/com/paulasmuth/sqltap/mysql/SQLQuery.scala +++ b/src/main/scala/com/paulasmuth/sqltap/mysql/SQLQuery.scala @@ -49,6 +49,10 @@ class SQLQuery(query_str: String) extends TimeoutCallback { if (Config.has_key('log_slow_queries) && runtime_millis >= Config.get('log_slow_queries).toInt) { + Statistics.incr('sql_slow_queries_total) + Statistics.incr('sql_slow_queries_per_second) + Statistics.incr('sql_slow_queries_time_mean, runtime_millis) + Logger.log("[SQL] [Slow Query] (" + runtime_millis + "ms): " + query) } } diff --git a/src/com/paulasmuth/sqltap/mysql/SecurePasswordAuthentication.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/SecurePasswordAuthentication.scala similarity index 100% rename from src/com/paulasmuth/sqltap/mysql/SecurePasswordAuthentication.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/SecurePasswordAuthentication.scala diff --git a/src/com/paulasmuth/sqltap/mysql/TableMapBinlogEvent.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/TableMapBinlogEvent.scala similarity index 98% rename from src/com/paulasmuth/sqltap/mysql/TableMapBinlogEvent.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/TableMapBinlogEvent.scala index 4c1f0c0..214bb45 100644 --- a/src/com/paulasmuth/sqltap/mysql/TableMapBinlogEvent.scala +++ b/src/main/scala/com/paulasmuth/sqltap/mysql/TableMapBinlogEvent.scala @@ -46,6 +46,7 @@ class TableMapBinlogEvent(data: Array[Byte], ts: Long, fmt: FormatDescriptionBin case 0x0c => 0 // 0x0c DATETIME case 0x0d => 0 // 0x0d YEAR case 0x0f => read_int(2) // 0x0f VARCHAR + case 0x11 => 0 // 0x11 TIMESTAMP2 case 0x12 => 0 // 0x12 DATETIME2 case 0xf6 => read_int(2) // 0xf6 NEWDECIMAL case 0xfc => read_int(1) // 0xfc BLOB diff --git a/src/com/paulasmuth/sqltap/mysql/UnknownBinlogEvent.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/UnknownBinlogEvent.scala similarity index 100% rename from src/com/paulasmuth/sqltap/mysql/UnknownBinlogEvent.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/UnknownBinlogEvent.scala diff --git a/src/com/paulasmuth/sqltap/mysql/UpdateRowsBinlogEvent.scala b/src/main/scala/com/paulasmuth/sqltap/mysql/UpdateRowsBinlogEvent.scala similarity index 100% rename from src/com/paulasmuth/sqltap/mysql/UpdateRowsBinlogEvent.scala rename to src/main/scala/com/paulasmuth/sqltap/mysql/UpdateRowsBinlogEvent.scala diff --git a/src/test/scala/com/paulasmuth/sqltap/JSONWriterSpec.scala b/src/test/scala/com/paulasmuth/sqltap/JSONWriterSpec.scala new file mode 100644 index 0000000..0e2866b --- /dev/null +++ b/src/test/scala/com/paulasmuth/sqltap/JSONWriterSpec.scala @@ -0,0 +1,48 @@ +package com.paulasmuth.sqltap + +import java.nio.ByteBuffer + +class JSONWriterSpec extends UnitSpec { + + describe("write_escaped") { + it("0x20 -> ' '") { + val str = write_escaped(Array(0x20.toByte)) + assert(str == " ") + } + + it("0xA -> '\\n'") { + val str = write_escaped(Array(0xA.toByte)) + assert(str == "\\n") + } + + it("0x22 -> '\\\"'") { + val str = write_escaped(Array(0x22.toByte)) + assert(str == "\\\"") + } + + it("0x5C -> '\\\\'") { + val str = write_escaped(Array(0x5C.toByte)) + assert(str == "\\\\") + } + + it("0xB -> '\\u000b'") { + val str = write_escaped(Array(0xB.toByte)) + assert(str == "\\u000b") + } + } + + def buffer_to_string(buf: ByteBuffer) : String = { + val bytes = new Array[Byte](buf.position()) + buf.rewind() + buf.get(bytes) + new String(bytes, "UTF-8") + } + + def write_escaped(in_bytes: Array[Byte]) : String = { + val buf = ByteBuffer.allocate(10) + val writer = new JSONWriter(new WrappedBuffer(buf)) + writer.write_escaped(new String(in_bytes, "UTF-8")) + buffer_to_string(buf) + } + +} diff --git a/src/test/scala/com/paulasmuth/sqltap/UnitSpec.scala b/src/test/scala/com/paulasmuth/sqltap/UnitSpec.scala new file mode 100644 index 0000000..e2bc883 --- /dev/null +++ b/src/test/scala/com/paulasmuth/sqltap/UnitSpec.scala @@ -0,0 +1,7 @@ +package com.paulasmuth.sqltap + +import org.scalatest.FunSpec +import org.scalatest.BeforeAndAfter +import org.scalatest.Assertions._ + +abstract class UnitSpec extends FunSpec with BeforeAndAfter