diff --git a/modules/tests/shared/src/test/scala/DescribeCacheTest.scala b/modules/tests/shared/src/test/scala/DescribeCacheTest.scala index c636e8a6..2780b489 100644 --- a/modules/tests/shared/src/test/scala/DescribeCacheTest.scala +++ b/modules/tests/shared/src/test/scala/DescribeCacheTest.scala @@ -4,16 +4,17 @@ package tests -import skunk.implicits._ -import skunk.codec.numeric.int4 -import cats.syntax.all._ import cats.effect.IO -import skunk.exception.PostgresErrorException import cats.effect.Resource -import skunk.Session +import cats.syntax.all._ import org.typelevel.otel4s.trace.Tracer +import skunk.codec.numeric.int4 +import skunk.codec.text +import skunk._ +import skunk.exception.PostgresErrorException +import skunk.implicits._ -class DescribeCacheTest extends SkunkTest { +class DescribeCacheTest extends SkunkTest(true) { implicit val tracer: Tracer[IO] = Tracer.noop @@ -82,6 +83,67 @@ class DescribeCacheTest extends SkunkTest { } yield "ok" } + val runs = 100 + // This should not fail + pooledTest("portal1 - concurrent portal with normal flatMap") { pool => + import scala.concurrent.duration._ + def cmd(idx: Int): Command[String *: Int *: EmptyTuple] = sql"INSERT INTO scalars VALUES (${text.varchar}, ${int4}) -- #${idx.toString}".command + 1.to(runs).toList.parTraverse{ idx => + pool.use { s => + s.transaction.use { _ => + s.prepare(cmd(idx)).flatMap(_.execute((s"name$idx", idx))).flatMap (_ => + IO.sleep(1.second) >> s.prepare(cmd(idx)).flatMap(_.execute((s"name$idx", idx))).void + ) + } + } + } + } + + // This should not fail + pooledTest("portal2 - concurrent portal with map / identity *inside tx*") { pool => + import scala.concurrent.duration._ + def cmd(idx: Int): Command[String *: Int *: EmptyTuple] = sql"INSERT INTO scalars VALUES (${text.varchar}, ${int4}) -- #${idx.toString}".command + 1.to(runs).toList.parTraverse{ idx => + pool.use { s => + s.transaction.use { _ => + s.prepare(cmd(idx)).flatMap(_.execute((s"name$idx", idx))).map (_ => + IO.sleep(1.second) >> s.prepare(cmd(idx)).flatMap(_.execute((s"name$idx", idx))).void + ).flatMap(identity) + } + } + } + } + + // This will fail if run enough + pooledTest("portal3 - concurrent portal with map / identity *outside tx*") { pool => + import scala.concurrent.duration._ + def cmd(idx: Int): Command[String *: Int *: EmptyTuple] = sql"INSERT INTO scalars VALUES (${text.varchar}, ${int4}) -- #${idx.toString}".command + 1.to(runs).toList.parTraverse{ idx => + pool.use { s => + s.transaction.use { _ => + s.prepare(cmd(idx)).flatMap(_.execute((s"name$idx", idx))).map (_ => + IO.sleep(1.second) >> s.prepare(cmd(idx)).flatMap(_.execute((s"name$idx", idx))).void + ) + } + }.flatMap(identity) + } + } + + // This will fail if run enough + pooledTest("portal4 - concurrent portal with flatten *outside tx*") { pool => + import scala.concurrent.duration._ + def cmd(idx: Int): Command[String *: Int *: EmptyTuple] = sql"INSERT INTO scalars VALUES (${text.varchar}, ${int4}) -- #${idx.toString}".command + 1.to(runs).toList.parTraverse{ idx => + pool.use { s => + s.transaction.use { _ => + s.prepare(cmd(idx)).flatMap(_.execute((s"name$idx", idx))).map (_ => + IO.sleep(1.second) >> s.prepare(cmd(idx)).flatMap(_.execute((s"name$idx", idx))).void + ) + } + }.flatten + } + } + sessionTest("command should not be cached after cache is cleared") { s => val cmd = sql"commit".command for { diff --git a/world/world.sql b/world/world.sql index ec93a04d..6a52692e 100644 --- a/world/world.sql +++ b/world/world.sql @@ -7,6 +7,11 @@ CREATE TYPE myenum AS ENUM ('foo', 'bar'); +CREATE TABLE IF NOT EXISTS scalars( + a_string varchar not null, + a_int integer not null +); + CREATE TABLE IF NOT EXISTS city ( id integer NOT NULL, name varchar NOT NULL,