diff --git a/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java b/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java index 37417e21267d..7b413e2d3168 100644 --- a/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java +++ b/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; @@ -46,6 +47,7 @@ public class LogAggregator { private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>(); private final LogLevel minLogLevel; private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private final Object bufferWait = new Object(); private final AtomicInteger logCount = new AtomicInteger(0); /** @@ -77,6 +79,7 @@ public void collectLog(LogEntry logEntry) { } buffer.offer(logEntry); + bufferWake(); if (logCount.incrementAndGet() >= BUFFER_THRESHOLD) { flushBuffer(); @@ -106,15 +109,29 @@ private void flushBuffer() { } private void startBufferFlusher() { - executorService.execute(() -> { + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + scheduler.scheduleAtFixedRate(() -> { while (!Thread.currentThread().isInterrupted()) { try { - Thread.sleep(5000); // Flush every 5 seconds. + synchronized (bufferWait) { + if (buffer.isEmpty()) { + bufferWait.wait(); + } + } flushBuffer(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } - }); + }, 5, 5, TimeUnit.SECONDS); } -} + + /** + * Wakes up buffer. + */ + public void bufferWake() { + synchronized (bufferWait) { + bufferWait.notifyAll(); + } + } +} \ No newline at end of file diff --git a/microservices-log-aggregation/src/test/java/com/iluwatar/logaggregation/LogAggregatorTest.java b/microservices-log-aggregation/src/test/java/com/iluwatar/logaggregation/LogAggregatorTest.java index 6c385f35c6c3..52832df79757 100644 --- a/microservices-log-aggregation/src/test/java/com/iluwatar/logaggregation/LogAggregatorTest.java +++ b/microservices-log-aggregation/src/test/java/com/iluwatar/logaggregation/LogAggregatorTest.java @@ -24,13 +24,16 @@ */ package com.iluwatar.logaggregation; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.time.LocalDateTime; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; diff --git a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java index 7042ff7b79a2..587d1b28733b 100644 --- a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java +++ b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -88,7 +89,8 @@ public static void main(String[] args) { // Create e service which should process the submitted jobs. final var srvRunnable = new ServiceExecutor(msgQueue); - + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + scheduler.scheduleAtFixedRate(srvRunnable, 1, 1, TimeUnit.SECONDS); // Create a ThreadPool of 2 threads and // submit all Runnable task for execution to executor executor = Executors.newFixedThreadPool(2); diff --git a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/MessageQueue.java b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/MessageQueue.java index 1d28faa54ca5..c42723703882 100644 --- a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/MessageQueue.java +++ b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/MessageQueue.java @@ -36,6 +36,7 @@ public class MessageQueue { private final BlockingQueue blkQueue; + public final Object serviceExecutorWait = new Object(); // Default constructor when called creates Blocking Queue object. public MessageQueue() { @@ -50,6 +51,9 @@ public void submitMsg(Message msg) { try { if (null != msg) { blkQueue.add(msg); + synchronized (serviceExecutorWait) { + serviceExecutorWait.notifyAll(); + } } } catch (Exception e) { LOGGER.error(e.getMessage()); diff --git a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java index 02530042b370..92b5379d6eea 100644 --- a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java +++ b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java @@ -32,9 +32,7 @@ */ @Slf4j public class ServiceExecutor implements Runnable { - private final MessageQueue msgQueue; - public ServiceExecutor(MessageQueue msgQueue) { this.msgQueue = msgQueue; } @@ -51,9 +49,10 @@ public void run() { LOGGER.info(msg + " is served."); } else { LOGGER.info("Service Executor: Waiting for Messages to serve .. "); + synchronized (msgQueue.serviceExecutorWait) { + msgQueue.serviceExecutorWait.wait(); + } } - - Thread.sleep(1000); } } catch (Exception e) { LOGGER.error(e.getMessage()); diff --git a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/TaskGenerator.java b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/TaskGenerator.java index 9b1407277a27..de280a518005 100644 --- a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/TaskGenerator.java +++ b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/TaskGenerator.java @@ -66,9 +66,8 @@ public void run() { try { while (count > 0) { var statusMsg = "Message-" + count + " submitted by " + Thread.currentThread().getName(); - this.submit(new Message(statusMsg)); - LOGGER.info(statusMsg); + this.submit(new Message(statusMsg)); // reduce the message count. count--; diff --git a/queue-based-load-leveling/src/test/java/com/iluwatar/queue/load/leveling/TaskGenSrvExeTest.java b/queue-based-load-leveling/src/test/java/com/iluwatar/queue/load/leveling/TaskGenSrvExeTest.java index 0a03bc560a1d..2407ca2116a1 100644 --- a/queue-based-load-leveling/src/test/java/com/iluwatar/queue/load/leveling/TaskGenSrvExeTest.java +++ b/queue-based-load-leveling/src/test/java/com/iluwatar/queue/load/leveling/TaskGenSrvExeTest.java @@ -24,14 +24,19 @@ */ package com.iluwatar.queue.load.leveling; +import static java.util.concurrent.CompletableFuture.anyOf; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; /** * Test case for submitting Message to Blocking Queue by TaskGenerator and retrieve the message by * ServiceExecutor. */ +@Slf4j class TaskGenSrvExeTest { @Test @@ -53,4 +58,37 @@ void taskGeneratorTest() { assertNotNull(srvExeThr); } + /** + * Tests that service executor waits at start since no message is sent to execute upon. + * @throws InterruptedException + */ + @Test + void serviceExecutorStartStateTest() throws InterruptedException { + var msgQueue = new MessageQueue(); + var srvRunnable = new ServiceExecutor(msgQueue); + var srvExeThr = new Thread(srvRunnable); + srvExeThr.start(); + Thread.sleep(200); // sleep a little until service executor thread waits + LOGGER.info("Current Service Executor State: " + srvExeThr.getState()); + assertEquals(srvExeThr.getState(), Thread.State.WAITING); + + } + + @Test + void serviceExecutorWakeStateTest() throws InterruptedException { + var msgQueue = new MessageQueue(); + var srvRunnable = new ServiceExecutor(msgQueue); + var srvExeThr = new Thread(srvRunnable); + srvExeThr.start(); + Thread.sleep(200); // sleep a little until service executor thread waits + synchronized (msgQueue.serviceExecutorWait){ + msgQueue.serviceExecutorWait.notifyAll(); + } + var srvExeState = srvExeThr.getState(); + LOGGER.info("Current Service Executor State: " + srvExeState); + // assert that state changes from waiting + assertTrue(srvExeState != Thread.State.WAITING); + + } + } diff --git a/server-session/README.md b/server-session/README.md index 4ce452b53f7d..a03444b6268d 100644 --- a/server-session/README.md +++ b/server-session/README.md @@ -47,26 +47,43 @@ The `main` application starts a server and assigns handlers to manage login and ```java public class App { - private static Map sessions = new HashMap<>(); - private static Map sessionCreationTimes = new HashMap<>(); - private static final long SESSION_EXPIRATION_TIME = 10000; + // Map to store session data (simulated using a HashMap) + private static Map sessions = new HashMap(); + private static Map sessionCreationTimes = new HashMap(); + private static final long SESSION_EXPIRATION_TIME = 10000; + private static Object sessionExpirationWait=new Object(); // used to make expiration task wait or work based on event (login request sent or not) public static void main(String[] args) throws IOException { + // Create HTTP server listening on port 8000 HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0); + // Set up session management endpoints server.createContext("/login", new LoginHandler(sessions, sessionCreationTimes)); server.createContext("/logout", new LogoutHandler(sessions, sessionCreationTimes)); + // Start the server server.start(); + // Start background task to check for expired sessions sessionExpirationTask(); + + LOGGER.info("Server started. Listening on port 8080..."); } private static void sessionExpirationTask() { - new Thread(() -> { + new Thread(() -> { while (true) { try { - Thread.sleep(SESSION_EXPIRATION_TIME); + synchronized (sessions) + { + if(sessions.isEmpty()) + synchronized (sessionExpirationWait) + { + sessionExpirationWait.wait(); // Make Session expiration Checker wait until at least a single login request is sent. + } + } + LOGGER.info("Session expiration checker started..."); + Thread.sleep(SESSION_EXPIRATION_TIME); // Sleep for expiration time Instant currentTime = Instant.now(); synchronized (sessions) { synchronized (sessionCreationTimes) { @@ -75,18 +92,30 @@ public class App { while (iterator.hasNext()) { Map.Entry entry = iterator.next(); if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(currentTime)) { + LOGGER.info("User " + entry.getValue() + " removed"); sessions.remove(entry.getKey()); iterator.remove(); } } } } + LOGGER.info("Session expiration checker finished!"); } catch (InterruptedException e) { + LOGGER.error("An error occurred: ", e); Thread.currentThread().interrupt(); } } }).start(); } + + public static void expirationTaskWake() //Wake up sleeping Expiration task thread + { + synchronized (sessionExpirationWait) + { + sessionExpirationWait.notify(); + } + } + } ``` diff --git a/server-session/pom.xml b/server-session/pom.xml index e7cdcf82c201..530140fccb30 100644 --- a/server-session/pom.xml +++ b/server-session/pom.xml @@ -49,5 +49,4 @@ test - \ No newline at end of file diff --git a/server-session/src/main/java/com/iluwatar/sessionserver/App.java b/server-session/src/main/java/com/iluwatar/sessionserver/App.java index a3c66d3ff634..26dad15a8bc7 100644 --- a/server-session/src/main/java/com/iluwatar/sessionserver/App.java +++ b/server-session/src/main/java/com/iluwatar/sessionserver/App.java @@ -31,8 +31,12 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; + /** * The server session pattern is a behavioral design pattern concerned with assigning the responsibility * of storing session data on the server side. Within the context of stateless protocols like HTTP all @@ -54,10 +58,12 @@ public class App { // Map to store session data (simulated using a HashMap) + private static Map sessions = new HashMap<>(); private static Map sessionCreationTimes = new HashMap<>(); private static final long SESSION_EXPIRATION_TIME = 10000; + /** * Main entry point. * @param args arguments @@ -75,37 +81,34 @@ public static void main(String[] args) throws IOException { server.start(); // Start background task to check for expired sessions - sessionExpirationTask(); LOGGER.info("Server started. Listening on port 8080..."); + + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + scheduler.scheduleAtFixedRate(App::sessionExpirationTask, SESSION_EXPIRATION_TIME, SESSION_EXPIRATION_TIME, TimeUnit.MILLISECONDS); } private static void sessionExpirationTask() { - new Thread(() -> { - while (true) { - try { - LOGGER.info("Session expiration checker started..."); - Thread.sleep(SESSION_EXPIRATION_TIME); // Sleep for expiration time - Instant currentTime = Instant.now(); - synchronized (sessions) { - synchronized (sessionCreationTimes) { - Iterator> iterator = - sessionCreationTimes.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(currentTime)) { - sessions.remove(entry.getKey()); - iterator.remove(); - } - } + LOGGER.info("Session expiration checker started..."); + Instant currentTime = Instant.now(); + try { + synchronized (sessions) { + synchronized (sessionCreationTimes) { + Iterator> iterator = + sessionCreationTimes.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(currentTime)) { + LOGGER.info("User " + entry.getValue() + " removed"); + sessions.remove(entry.getKey()); + iterator.remove(); } } - LOGGER.info("Session expiration checker finished!"); - } catch (InterruptedException e) { - LOGGER.error("An error occurred: ", e); - Thread.currentThread().interrupt(); } } - }).start(); + } catch (Exception e) { + LOGGER.error("An error occurred: ", e); + } + LOGGER.info("Session expiration checker finished!"); } } \ No newline at end of file diff --git a/server-session/src/main/java/com/iluwatar/sessionserver/LoginHandler.java b/server-session/src/main/java/com/iluwatar/sessionserver/LoginHandler.java index 1e36ac052570..d49a66a54b91 100644 --- a/server-session/src/main/java/com/iluwatar/sessionserver/LoginHandler.java +++ b/server-session/src/main/java/com/iluwatar/sessionserver/LoginHandler.java @@ -42,6 +42,9 @@ public class LoginHandler implements HttpHandler { private Map sessions; private Map sessionCreationTimes; + /** + * Handles new login requests. + */ public LoginHandler(Map sessions, Map sessionCreationTimes) { this.sessions = sessions; this.sessionCreationTimes = sessionCreationTimes; @@ -60,7 +63,6 @@ public void handle(HttpExchange exchange) { // Set session ID as cookie exchange.getResponseHeaders().add("Set-Cookie", "sessionID=" + sessionId); - // Send response String response = "Login successful!\n" + "Session ID: " + sessionId; try { diff --git a/server-session/src/main/java/com/iluwatar/sessionserver/LogoutHandler.java b/server-session/src/main/java/com/iluwatar/sessionserver/LogoutHandler.java index 5bea06f2f866..fd58735a6790 100644 --- a/server-session/src/main/java/com/iluwatar/sessionserver/LogoutHandler.java +++ b/server-session/src/main/java/com/iluwatar/sessionserver/LogoutHandler.java @@ -41,6 +41,9 @@ public class LogoutHandler implements HttpHandler { private Map sessions; private Map sessionCreationTimes; + /** + * Handles logging out requests. + */ public LogoutHandler(Map sessions, Map sessionCreationTimes) { this.sessions = sessions; this.sessionCreationTimes = sessionCreationTimes; diff --git a/twin/README.md b/twin/README.md index b2913e0a963f..004b93b714dd 100644 --- a/twin/README.md +++ b/twin/README.md @@ -82,21 +82,38 @@ public class BallItem extends GameItem { ```java @Slf4j public class BallThread extends Thread { + @Setter private BallItem twin; + private volatile boolean isSuspended; + private volatile boolean isRunning = true; + private final Object lock=new Object(); + + /** + * Run the thread. + */ public void run() { + while (isRunning) { - if (!isSuspended) { + if (isSuspended) { + synchronized (lock) { + try { + lock.wait(); // Wait until resumed :: busy loop fix + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } else { twin.draw(); twin.move(); - } - try { - Thread.sleep(250); - } catch (InterruptedException e) { - throw new RuntimeException(e); + try { + Thread.sleep(250); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } } @@ -109,11 +126,19 @@ public class BallThread extends Thread { public void resumeMe() { isSuspended = false; LOGGER.info("Begin to resume BallThread"); + synchronized (lock) + { + lock.notify(); + } } public void stopMe() { this.isRunning = false; this.isSuspended = true; + synchronized (lock) + { + lock.notify(); + } } } ``` diff --git a/twin/src/main/java/com/iluwatar/twin/BallThread.java b/twin/src/main/java/com/iluwatar/twin/BallThread.java index 9d4d9cf71a76..dfca68daca1e 100644 --- a/twin/src/main/java/com/iluwatar/twin/BallThread.java +++ b/twin/src/main/java/com/iluwatar/twin/BallThread.java @@ -42,37 +42,57 @@ public class BallThread extends Thread { private volatile boolean isRunning = true; + private final Object lock = new Object(); + /** * Run the thread. */ public void run() { - while (isRunning) { if (!isSuspended) { twin.draw(); twin.move(); - } - try { - Thread.sleep(250); - } catch (InterruptedException e) { - throw new RuntimeException(e); + } else { + synchronized (lock) { + try { + lock.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } } } } - + /** + * suspend the thread. + */ public void suspendMe() { isSuspended = true; LOGGER.info("Begin to suspend BallThread"); } + /** + * notify run to resume. + */ + public void resumeMe() { isSuspended = false; LOGGER.info("Begin to resume BallThread"); + + synchronized (lock) { + lock.notifyAll(); + } } + /** + * Stop running thread. + */ public void stopMe() { this.isRunning = false; this.isSuspended = true; + synchronized (lock) { + lock.notifyAll(); + } } } diff --git a/twin/src/test/java/com/iluwatar/twin/BallThreadTest.java b/twin/src/test/java/com/iluwatar/twin/BallThreadTest.java index 26cf78509dcf..ad6afec114b7 100644 --- a/twin/src/test/java/com/iluwatar/twin/BallThreadTest.java +++ b/twin/src/test/java/com/iluwatar/twin/BallThreadTest.java @@ -27,6 +27,8 @@ import static java.lang.Thread.UncaughtExceptionHandler; import static java.lang.Thread.sleep; import static java.time.Duration.ofMillis; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertTimeout; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -35,12 +37,14 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; /** * BallThreadTest * */ +@Slf4j class BallThreadTest { /** @@ -59,13 +63,12 @@ void testSuspend() { verify(ballItem, atLeastOnce()).draw(); verify(ballItem, atLeastOnce()).move(); ballThread.suspendMe(); - sleep(1000); + LOGGER.info("Current ballThread State: "+ballThread.getState()); + assertEquals(ballThread.getState(), Thread.State.WAITING); ballThread.stopMe(); ballThread.join(); - - verifyNoMoreInteractions(ballItem); }); } @@ -86,16 +89,16 @@ void testResume() { sleep(1000); verifyNoMoreInteractions(ballItem); - ballThread.resumeMe(); - sleep(300); + sleep(250); + LOGGER.info("Current ballThread State: "+ballThread.getState()); + assertNotSame(ballThread.getState(), Thread.State.WAITING); verify(ballItem, atLeastOnce()).draw(); verify(ballItem, atLeastOnce()).move(); ballThread.stopMe(); ballThread.join(); - verifyNoMoreInteractions(ballItem); }); } @@ -110,11 +113,10 @@ void testInterrupt() { ballThread.setUncaughtExceptionHandler(exceptionHandler); ballThread.setTwin(mock(BallItem.class)); ballThread.start(); + ballThread.suspendMe(); ballThread.interrupt(); ballThread.join(); - verify(exceptionHandler).uncaughtException(eq(ballThread), any(RuntimeException.class)); - verifyNoMoreInteractions(exceptionHandler); }); } } \ No newline at end of file