Build a thread-safe, bounded, cancellable print spooler — the kind of software that accepts print jobs from many users and dispatches them to a limited number of printer workers (like a shared campus lab printer).
Total marks: 70, split across three grading areas:
| Area | Marks | What is tested |
|---|---|---|
| A) Timing sanity | 20 | Single-worker scenario checks that printing takes realistic time (prevents trivially marking jobs DONE). Uses System.nanoTime(). |
| B) Concurrency + cancellation + performance | 40 | B1 (20 pts): all jobs reach terminal state (DONE or CANCELLED), no deadlocks. B2 (5 pts): cancel while queued or printing is visible across threads. B3 (15 pts): throughput ratio vs. reference spooler run on the same machine. |
| C) Shutdown correctness | 10 | close() terminates dispatcher + thread pool promptly, no stuck threads. |
A template project was provided with the full API layer (interfaces, records, enum) already written. The student implements three classes: BoundedJobQueue, JobRegistry, and SpoolerImpl. PrintWorker and JobRecord were given and work once the TODOs are complete.
Thread, Runnablewait() / notifyAll()ExecutorService / Executors.newFixedThreadPool(...)AtomicLong (and similar atomics)ReentrantLock / tryLockNo external concurrency frameworks (Akka, Reactor, RxJava, etc.) are allowed.
assignment4_print_spooler/ pom.xml // Maven build file (Java 17) DESIGN.md // Your design rationale (submit this) scripts/ run_tests.sh // Shortcut: compile + run JUnit tests run_bench.sh // Shortcut: compile + run benchmark src/ main/java/edu/iitg/cs/concurrency/printspooler/ api/ // --- PROVIDED (do NOT modify) --- Spooler.java // Interface: submitBlocking, trySubmit, cancel, status, metrics, close PrintJob.java // Value class: owner, pages, pageMillis JobStatus.java // Enum: QUEUED, PRINTING, DONE, CANCELLED SpoolerMetrics.java // Record: totalSubmitted, totalCompleted, totalCancelled, maxQueueDepth impl/ // --- STUDENT IMPLEMENTS --- BoundedJobQueue.java // TODO 1: bounded blocking queue (synchronized + wait/notifyAll) JobRegistry.java // TODO 2: thread-safe job state machine SpoolerImpl.java // TODO 3: dispatcher, pool, submit, cancel, close PrintWorker.java // Given: page-by-page printing with cancel check JobRecord.java // Given: stores jobId, PrintJob, status, volatile cancelRequested runtime/ SpoolerBench.java // Benchmark runner (given) test/java/.../ PublicSpoolerTest.java // Public tests (must pass)
The api package defines the contract: the Spooler interface, the PrintJob value class, the JobStatus enum, and the SpoolerMetrics record. These are read-only — you never modify them. The impl package contains the working code. All classes there are package-private (final class, no public) except SpoolerImpl which is public because it implements the Spooler interface and is instantiated from the test and benchmark code.
# Navigate to the project root cd cs331_a4_170101025/assignment4_print_spooler # Run all JUnit tests via the helper script bash scripts/run_tests.sh # Run the benchmark (default: 4 workers, 20 capacity, 8 producers, 50 jobs each) bash scripts/run_bench.sh # Or with custom arguments: workers capacity producers jobsPerProducer bash scripts/run_bench.sh 2 10 2 10
# Run tests directly with Maven mvn -q clean test # Compile and run the benchmark manually mvn -q -DskipTests package && java -cp "target/classes" edu.iitg.cs.concurrency.printspooler.runtime.SpoolerBench # Benchmark with custom arguments mvn -q -DskipTests package && java -cp "target/classes" edu.iitg.cs.concurrency.printspooler.runtime.SpoolerBench 2 10 2 10
Prerequisite: Java 17 (java -version) and Apache Maven (mvn -version) must be installed. On Ubuntu: sudo apt install -y openjdk-17-jdk maven.
submitted=400 completed=400 cancelled=0 maxQueue=20
throughput=482.16 jobs/s
public enum JobStatus { QUEUED, PRINTING, DONE, CANCELLED }
Four possible states for every print job. A job is born QUEUED, transitions to PRINTING when a worker picks it up, then reaches a terminal state: DONE (printed successfully) or CANCELLED (cancelled by user or interrupted). Once terminal, the status never changes.
public final class PrintJob { private final String owner; // who submitted ("student-0") private final int pages; // number of pages to print private final long pageMillis; // simulated print time per page // constructor validates: pages > 0, pageMillis >= 0 // accessor methods: owner(), pages(), pageMillis() }
An immutable value class. The benchmark creates jobs with 1–5 pages and 5–15 ms per page. The pageMillis field simulates real printer latency via Thread.sleep(pageMillis) inside PrintWorker.
public interface Spooler extends AutoCloseable { long submitBlocking(PrintJob job) throws InterruptedException; long trySubmit(PrintJob job, long timeoutMs) throws InterruptedException; boolean cancel(long jobId); JobStatus status(long jobId); SpoolerMetrics metrics(); @Override void close() throws Exception; }
| Method | Meaning |
|---|---|
submitBlocking | Blocks until there is space in the bounded queue, then enqueues the job. Returns the unique job ID. |
trySubmit | Same as above but gives up after timeoutMs milliseconds. Returns -1 on timeout. |
cancel | Attempts to cancel a job. Returns true if status changed to CANCELLED. |
status | Returns the current JobStatus for the given job ID. |
metrics | Returns a snapshot of submitted/completed/cancelled counts and max queue depth. |
close | Shuts down the spooler: stops the dispatcher, drains the thread pool. Extends AutoCloseable so you can use try-with-resources. |
public record SpoolerMetrics( long totalSubmitted, // jobs that entered the queue long totalCompleted, // jobs that finished printing long totalCancelled, // jobs that were cancelled int maxQueueDepth // peak occupancy of the bounded queue ) {}
A Java record — immutable data carrier with auto-generated toString, equals, hashCode, and accessor methods. Used by the benchmark to print the final summary line.
A classic bounded blocking queue of job IDs (longs). Producers block when the queue is full; the dispatcher blocks when it is empty. This is the textbook monitor pattern using synchronized + wait() + notifyAll().
final class BoundedJobQueue { private final int capacity; private final Deque<Long> q = new ArrayDeque<>(); // underlying storage private int maxDepth = 0; // tracks peak occupancy BoundedJobQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException("capacity must be > 0"); this.capacity = capacity; } int maxDepthObserved() { synchronized (this) { return maxDepth; } // safe read for metrics }
synchronized void putBlocking(long jobId) throws InterruptedException { while (q.size() >= capacity) wait(); // 1. WAIT while full q.addLast(jobId); // 2. Enqueue at tail if (q.size() > maxDepth) maxDepth = q.size(); // 3. Track peak notifyAll(); // 4. Wake blocked consumers }
wait() returns, another thread may have filled the queue again (spurious wakeup or another producer acted first). The while re-checks the condition.maxDepth for the metrics snapshot.notifyAll() wakes any thread blocked in takeBlocking() waiting for items.long takeBlocking() throws InterruptedException { synchronized (this) { while (q.isEmpty()) wait(); // block while empty long id = q.pollFirst(); // dequeue from head (FIFO) notifyAll(); // wake blocked producers return id; } }
Mirror image of putBlocking. Blocks when empty, dequeues when available, wakes producers who may be waiting for space.
boolean putWithTimeout(long jobId, long timeoutMs) throws InterruptedException { synchronized (this) { long deadline = System.currentTimeMillis() + timeoutMs; // 1. absolute deadline while (q.size() >= capacity) { long remaining = deadline - System.currentTimeMillis(); // 2. how much time left? if (remaining <= 0) return false; // 3. timed out wait(remaining); // 4. wait at most remaining ms } q.addLast(jobId); if (q.size() > maxDepth) maxDepth = q.size(); notifyAll(); return true; } }
wait() inside a loop, which would reset the timer after every spurious wakeup.false.wait(remaining) sleeps for at most the remaining time. On spurious wakeup, the loop re-checks capacity.boolean removeIfPresent(long jobId) { synchronized (this) { boolean removed = q.remove(jobId); // O(n) scan and remove if (removed) notifyAll(); // freed a slot; wake producers return removed; } }
Called by SpoolerImpl.cancel(). If the job is still in the queue (not yet picked up by the dispatcher), remove it and wake any producers waiting for space.
while not if? — wait() can return due to a spurious wakeup (JVM spec allows this). Even without spurious wakeups, another thread may have acted between your notifyAll() and your re-acquisition of the lock. The while loop always re-verifies the condition.notifyAll() not notify()? — notify() wakes only one thread. If that thread is a producer (waiting for space) but the actual change freed an item (for a consumer), the consumer stays stuck. notifyAll() wakes everyone; the while loop ensures only the right threads proceed.remaining = deadline - now inside the loop. Never pass the original timeout to wait() inside a loop — that restarts the full wait after each spurious wakeup.A thread-safe registry that stores JobRecord objects and enforces the job state machine. Every state transition is atomic and guarded by synchronized.
// Legal transitions: QUEUED → PRINTING → DONE QUEUED → CANCELLED PRINTING → CANCELLED // Illegal: DONE → anything (terminal), CANCELLED → anything (terminal)
final class JobRecord { final long jobId; final PrintJob job; volatile boolean cancelRequested = false; // cross-thread visibility JobStatus status = JobStatus.QUEUED; // protected by JobRegistry sync }
cancelRequested is volatile so that when markCancelled() sets it to true, the PrintWorker running on a different thread sees the update immediately without needing to acquire the registry lock.
final class JobRegistry { private final Map<Long, JobRecord> jobs = new HashMap<>(); synchronized JobRecord create(long jobId, PrintJob job) { JobRecord r = new JobRecord(jobId, job); // status = QUEUED jobs.put(jobId, r); return r; } synchronized JobRecord get(long jobId) { return jobs.get(jobId); // safe read under lock } synchronized boolean markPrinting(long jobId) { JobRecord r = jobs.get(jobId); if (r == null) return false; if (r.status != JobStatus.QUEUED) return false; // already printing/done/cancelled if (r.cancelRequested) { // CANCEL RACE CHECK r.status = JobStatus.CANCELLED; // cancel arrived between dequeue and print start return false; } r.status = JobStatus.PRINTING; return true; } synchronized void markDone(long jobId) { JobRecord r = jobs.get(jobId); if (r != null && r.status == JobStatus.PRINTING) r.status = JobStatus.DONE; // only PRINTING → DONE is legal } synchronized boolean markCancelled(long jobId) { JobRecord r = jobs.get(jobId); if (r == null) return false; if (r.status == JobStatus.DONE) return false; // can't cancel a finished job r.status = JobStatus.CANCELLED; r.cancelRequested = true; // volatile write: visible to PrintWorker return true; } synchronized JobStatus status(long jobId) { JobRecord r = jobs.get(jobId); return r == null ? null : r.status; } }
synchronized? — HashMap is not thread-safe. Without synchronization, concurrent put/get calls can corrupt the internal table. Synchronizing every method makes the registry "linearizable" — operations appear to happen one at a time in some serial order.markPrinting: Between the moment the dispatcher dequeues a job ID and the moment PrintWorker calls markPrinting(), a cancel request may arrive. The if (r.cancelRequested) check inside markPrinting catches this race: it transitions directly to CANCELLED instead of PRINTING.volatile cancelRequested? — PrintWorker.run() reads r.cancelRequested without holding the registry lock (it would be too expensive to lock on every page). volatile guarantees that the write in markCancelled is immediately visible to the worker thread. Without volatile, the worker could cache false in a CPU register and never see the cancellation.The main orchestrator. Wires together the queue, registry, a dispatcher thread, and a fixed thread pool. Implements the Spooler interface.
public final class SpoolerImpl implements Spooler { private final AtomicLong idGen = new AtomicLong(1); // unique job ID generator private final AtomicLong submitted = new AtomicLong(0); // metric counters private final AtomicLong completed = new AtomicLong(0); private final AtomicLong cancelled = new AtomicLong(0); private final JobRegistry registry = new JobRegistry(); private final BoundedJobQueue queue; private final ExecutorService pool; private final Thread dispatcher; private volatile boolean closed = false; // visibility flag for shutdown
public SpoolerImpl(int capacity, int workers) { this.queue = new BoundedJobQueue(capacity); this.pool = Executors.newFixedThreadPool(workers); this.dispatcher = new Thread(() -> { while (!closed) { try { long jobId = queue.takeBlocking(); // blocks until job available pool.submit(() -> { // hand to thread pool new PrintWorker(jobId, registry).run(); JobStatus st = registry.status(jobId); if (st == JobStatus.DONE) completed.incrementAndGet(); else if (st == JobStatus.CANCELLED) cancelled.incrementAndGet(); }); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // restore flag break; // exit loop on shutdown } catch (RuntimeException e) { // swallow to keep dispatcher alive } } }, "spooler-dispatcher"); this.dispatcher.start(); }
The dispatcher is a single dedicated thread that loops forever: take a job ID from the queue, submit a PrintWorker to the pool. The pool has workers threads, so up to that many jobs print in parallel. The catch (InterruptedException) is how close() cleanly stops the dispatcher.
@Override
public long submitBlocking(PrintJob job) throws InterruptedException {
if (closed) throw new IllegalStateException("spooler closed");
long id = idGen.getAndIncrement(); // 1. generate unique ID
registry.create(id, job); // 2. register (status = QUEUED)
queue.putBlocking(id); // 3. enqueue (blocks if full)
submitted.incrementAndGet(); // 4. bump metric counter
return id;
}
@Override
public long trySubmit(PrintJob job, long timeoutMs) throws InterruptedException {
if (closed) throw new IllegalStateException("spooler closed");
long id = idGen.getAndIncrement();
registry.create(id, job);
boolean ok = queue.putWithTimeout(id, timeoutMs);
if (!ok) {
registry.markCancelled(id); // timed out → mark cancelled
return -1;
}
submitted.incrementAndGet();
return id;
}
@Override
public boolean cancel(long jobId) {
queue.removeIfPresent(jobId); // 1. try to yank from queue
boolean marked = registry.markCancelled(jobId); // 2. set status to CANCELLED
if (marked) cancelled.incrementAndGet(); // 3. bump metric
return marked;
}
Two-step cancellation: first attempt to remove from the queue (if still queued), then mark cancelled in the registry. If the job is already printing, removeIfPresent returns false (job is not in the queue), but markCancelled still sets cancelRequested = true, which the worker checks between pages.
@Override
public JobStatus status(long jobId) {
JobStatus s = registry.status(jobId);
return s == null ? JobStatus.CANCELLED : s; // unknown ID → treat as cancelled
}
@Override
public SpoolerMetrics metrics() {
return new SpoolerMetrics(
submitted.get(), completed.get(),
cancelled.get(), queue.maxDepthObserved()
);
}
@Override
public void close() throws Exception {
closed = true; // 1. volatile write: reject new submits
dispatcher.interrupt(); // 2. unblock takeBlocking() via InterruptedException
dispatcher.join(); // 3. wait for dispatcher to die
pool.shutdown(); // 4. stop accepting new tasks
if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
pool.shutdownNow(); // 5. force-stop if workers hang
}
}
closed = true (volatile write). Any subsequent submitBlocking call throws IllegalStateException.queue.takeBlocking() → wait(), so the interrupt throws InterruptedException, which causes the dispatcher loop to break.dispatcher.join() ensures the dispatcher has fully exited before we proceed.pool.shutdown() stops accepting new tasks but lets already-submitted workers finish.shutdownNow() interrupts them. Workers handle InterruptedException by marking the job CANCELLED.RejectedExecutionException.AtomicLong for counters? — Multiple threads (producers calling submitBlocking, pool threads incrementing completed/cancelled) update these counters concurrently. AtomicLong uses CPU-level CAS (Compare-And-Swap) for lock-free, thread-safe incrementing without needing synchronized.volatile closed? — The closed flag is written by the thread calling close() and read by the dispatcher loop and by submitBlocking. Without volatile, the dispatcher might cache false in a register and never see the shutdown signal.Simulates printing one job page by page. Checks for cancellation between pages and handles interruption during sleep. This class was given (not a TODO), but understanding it is critical for the viva because it shows how cancellation propagates.
final class PrintWorker implements Runnable { private final long jobId; private final JobRegistry registry; PrintWorker(long jobId, JobRegistry registry) { this.jobId = jobId; this.registry = registry; } @Override public void run() { JobRecord r = registry.get(jobId); // 1. look up the job record if (r == null) return; // shouldn't happen if (!registry.markPrinting(jobId)) return; // 2. try to transition QUEUED → PRINTING // fails if already cancelled PrintJob job = r.job; for (int p = 1; p <= job.pages(); p++) { // 3. page-by-page loop if (r.cancelRequested) { // 4. check volatile flag before each page registry.markCancelled(jobId); return; } try { Thread.sleep(job.pageMillis()); // 5. simulate printing one page } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 6. restore interrupt flag registry.markCancelled(jobId); // mark cancelled on shutdown return; } } registry.markDone(jobId); // 7. all pages printed successfully } }
JobRecord to read the job's page count, page time, and the cancelRequested flag.markPrinting is atomic. If a cancel request arrived before this point, markPrinting returns false (it moves the status to CANCELLED internally) and the worker exits immediately.job.pages().cancelRequested flag before printing each page. This is how in-flight cancellation works — the worker discovers the flag and stops cooperatively.shutdownNow() interrupts the worker thread during sleep, the worker marks the job cancelled and exits. It also restores the interrupt flag (Thread.currentThread().interrupt()) so that the thread pool framework knows this thread was interrupted.cancelRequested before each page instead of once? — A cancel request can arrive at any time while the worker is printing. Checking between pages gives a responsive cancellation window. Without it, a 100-page job would print to completion even after the user cancelled it.Thread.sleep() clears the interrupt flag when it throws InterruptedException. Calling Thread.currentThread().interrupt() re-sets it so that the ExecutorService framework can detect the interruption and take appropriate action (e.g., shutting down the thread).cancelRequested volatile but status is not? — status is always read/written under the registry's synchronized lock, which provides visibility. cancelRequested is read by the worker without locking (for performance), so it needs volatile to guarantee cross-thread visibility.| Decision | Why | What happens if you don't |
|---|---|---|
Use while(cond) wait() loops, never if |
Guards against spurious wakeups (JVM spec allows wait() to return without notify) and against other threads changing state between notifyAll and lock re-acquisition. |
A producer may proceed even though the queue is still full, exceeding capacity. A consumer may proceed on an empty queue and get a null. |
Call notifyAll() after every mutation |
Producers wait for space; consumers wait for items. Only notifyAll() guarantees both types of waiters are woken. |
Using notify() can deadlock: it may wake a producer when a consumer should have been woken (or vice versa), and no further notification arrives. |
| Compute absolute deadline for timeout | Each spurious wakeup recalculates remaining = deadline - now, so the total wait never exceeds the requested timeout. |
Passing the original timeout to wait() inside the loop restarts the full wait after each spurious wakeup, potentially blocking forever. |
All JobRegistry methods are synchronized |
HashMap is not thread-safe. Synchronizing every method makes the registry linearizable. |
Concurrent put/get can corrupt the HashMap's internal table, causing infinite loops or lost entries. |
Check cancelRequested inside markPrinting |
Catches the race where cancel arrives between dequeue and print start. | A cancelled job starts printing anyway, wasting resources and reporting the wrong final status. |
cancelRequested is volatile |
Read by PrintWorker without holding the registry lock. volatile ensures cross-thread visibility. | Worker caches false in a CPU register and never sees the cancellation — the job prints to completion despite being cancelled. |
Use AtomicLong for metric counters |
Multiple threads increment counters concurrently. AtomicLong uses CPU CAS for lock-free thread safety. | Using a plain long with ++ is a read-modify-write — a data race. Counters become inaccurate (lost updates). |
closed is volatile |
Written by close() thread, read by dispatcher loop and submitBlocking. |
Dispatcher may never see closed = true and loop forever. Producers may submit after shutdown. |
| Interrupt dispatcher before shutting down pool | Dispatcher submits tasks to the pool. If pool is shut down first, dispatcher gets RejectedExecutionException. |
Unhandled exception in the dispatcher, or tasks lost between the two shutdowns. |
| One lock per monitor (queue vs. registry) | No thread ever holds both locks simultaneously, so no lock-ordering deadlock is possible. | If you nested locks (e.g., held queue lock while calling registry), thread A holding lock X waiting for lock Y while thread B holds Y waiting for X → deadlock. |
Restore interrupt flag after catching InterruptedException |
Thread.sleep() clears the flag when it throws. Re-setting it preserves the interrupt for the executor framework. |
Executor may not realize the thread was interrupted, preventing clean pool shutdown. |
| Concept | Where Used | One-liner Explanation |
|---|---|---|
| Monitor pattern | BoundedJobQueue | synchronized + while(cond) wait() + notifyAll() — the classic producer-consumer coordination pattern. |
| Spurious wakeup | BoundedJobQueue | JVM allows wait() to return without a notify; the while loop re-checks the condition to handle this. |
| volatile | cancelRequested, closed | Guarantees that writes by one thread are immediately visible to reads by another thread, without needing a lock. |
| synchronized | BoundedJobQueue, JobRegistry | Mutual exclusion: only one thread at a time can execute a synchronized block on the same object. Also establishes happens-before memory ordering. |
| AtomicLong / CAS | SpoolerImpl counters, idGen | Lock-free thread-safe counter using CPU Compare-And-Swap instruction. No blocking, no context switch. |
| ExecutorService | SpoolerImpl pool | Thread pool that manages a fixed set of worker threads. submit() queues a task; the pool runs it when a thread is free. |
| Thread.interrupt() | close() → dispatcher | Cooperative interruption: sets a flag and unblocks wait()/sleep()/join() by throwing InterruptedException. |
| Graceful shutdown | SpoolerImpl.close() | shutdown() stops new tasks; awaitTermination() waits for running tasks; shutdownNow() interrupts if they hang. |
| State machine | JobRegistry | Each job follows QUEUED → PRINTING → DONE (or CANCELLED). Guarded transitions prevent illegal state changes. |
| Deadlock avoidance | Entire design | Only one lock is held at a time per thread (queue's monitor OR registry's monitor, never both). No lock-ordering deadlock possible. |
| Happens-before | All concurrency primitives | JMM guarantee: an unlock of a monitor happens-before the next lock of the same monitor. Volatile write happens-before subsequent volatile read. This is what makes shared state changes visible across threads. |
| Bounded queue | BoundedJobQueue | Prevents unbounded memory growth. Producers block when the queue is full, providing backpressure. |