Hello.

There is a program that he himself wrote to create and test a thread pool. The only thing left to do is to add the ability to cancel the task by id, which is executed. My version for some reason does not cancel anything. Tell me, please, I will be grateful. Here is the pool code:

import java.util.*; import java.util.concurrent.*; public interface CustomQueue<E>{ public void enqueue(E e); public E dequeue(); } public class FreeThread extends Thread { private MyQueue<Callable> myQueue; public int task_count; public long seconds; public FreeThread(Worker worker,MyQueue<Callable> myQueue, String name) { this.myQueue = myQueue; } @SuppressWarnings("deprecation") @Override public void run() { while(true){ try{ Callable r = myQueue.dequeue(); // print the dequeued item Date start=new Date(); System.out.println(" Задачу принял поток : " + Thread.currentThread().getName() ); task_count++; r.call(); System.out.println(" Задачу выполнил поток : " + Thread.currentThread().getName()); Date end=new Date(); this.seconds = (end.getSeconds()-start.getSeconds()); }catch(Exception ex){} } } } public class MyQueue<E> implements CustomQueue<E>{ // queue backed by a linkedlist private Queue<E> queue = new LinkedList<E>(); /** * Enqueue will add an object to this queue, and will notify any waiting * threads that now there is an object available. * * In enqueue method we just adding the elements not caring of size, * we can even introduce some check of size here also. */ @Override public synchronized void enqueue(E e) { queue.add(e); // Wake up anyone waiting on the queue to put some item. notifyAll(); } /** * Make a blocking call so that we will only return when the queue has * something on it, otherwise wait until something is put on it. */ @Override public synchronized E dequeue(){ E e = null; while(queue.isEmpty()){ try { wait(); } catch (InterruptedException e1) { return e; } } e = queue.remove(); return e; } } public class MyTimerTask extends TimerTask { private FreeThread thread; private int timelife; public MyTimerTask(FreeThread thread,int timelife) { this.thread=thread; this.timelife=timelife; } @SuppressWarnings("deprecation") @Override public void run() { if (thread.task_count<1){ try { thread.stop(); } catch (Exception e) {} System.out.println(" Свободный поток : "+thread.getName()+" удален из пула в : "+new Date()); } else{ thread.task_count=0; MyTimerTask timerTask = new MyTimerTask(thread,timelife); Timer timer = new Timer(true); timer.schedule(timerTask,thread.seconds*1000);} } } public class Task implements Callable { public Future<Void> future; public int id; public int time ; Task(int timeout) { time = timeout;Random rn=new Random(); this.id= rn.nextInt((2000 - 1) + 1) + 1;} public void setFuture(Future<Void> future) { this.future = future; } public Integer call() throws Exception { System.out.println("Начало задачи id:"+id); Thread.sleep(time*1000); System.out.println("Конец задачи id:"+id); return this.id; } } public class Worker implements Runnable{ private volatile boolean stop = false; private MyQueue<Callable> myQueue; private String name; public Worker(MyQueue<Callable> myQueue, String name){ this.myQueue = myQueue; this.name = name; } public void requestStop() { stop = true; } @Override public void run() { while(true){ try{ Callable r = myQueue.dequeue(); System.out.println(" Задачу принял поток : Thread - " + this.name ); r.call(); System.out.println(" Задачу выполнил поток: Thread - " + this.name); }catch(Exception ex){} } } } public class ThreadPoolManager { private final int THREADPOOL_CAPACITY; // Queue<String> greeting = new QueueLinkedList<>(); MyQueue<Callable> myQueue = new MyQueue<Callable>(); private final int timelife; public ThreadPoolManager(int capacity,int timeout){ this.THREADPOOL_CAPACITY = capacity; this.timelife=timeout; initAllHotConsumers(); initAllFreeConsumers(); } private void initAllHotConsumers(){ for(Integer i = 0; i < THREADPOOL_CAPACITY; i++){ Thread thread = new Thread(new Worker(myQueue, i.toString())); thread.setName("Thread - "+i.toString()); thread.start(); } } private void initAllFreeConsumers(){ for(Integer i = THREADPOOL_CAPACITY; i < THREADPOOL_CAPACITY*2; i++){ FreeThread thread = new FreeThread(new Worker(myQueue, i.toString()), myQueue, null); thread.setName("Thread - "+i.toString()); thread.start(); Date start=new Date(); TimerTask timerTask = new MyTimerTask(thread,timelife); //running timer task as daemon thread Timer timer = new Timer(true); timer.schedule(timerTask, timelife*1000); } } public Future<String> submitTask(Callable r){ myQueue.enqueue(r); return null; } } 

And this is a testing class:

 public class TestPool { public static void main(String[] args) throws InterruptedException, ExecutionException { ArrayList<Future<String>> results = new ArrayList<Future<String>>(); Set<Future<Integer>> set = new HashSet<Future<Integer>>(); boolean quit = false; int thread_num=Integer.parseInt(args[0]) ; int timeout=Integer.parseInt(args[1]) ; StringBuilder resultStr = new StringBuilder(); ThreadPoolManager poolManager = new ThreadPoolManager(thread_num,timeout); Scanner sc = new Scanner(System.in); System.out.println("Выберите: добавить задачу в пул - 1 \n удалить задачу по id - 2\n выход - 3"); // возвращает истинну если с потока ввода можно считать целое число do{ int choice = sc.nextInt(); if (choice==1) { System.out.println("Введите длительность задачи"); int time = sc.nextInt(); Callable task=new Task(time); poolManager.submitTask(task); FutureTask<String> task1 = new FutureTask<String>(task); results.add(task1); } if (choice==2) { System.out.println("Введите id задачи"); int id = sc.nextInt(); results.get(id).cancel(true); } if (choice ==3) {quit=true;} }while(!quit); } } 
  • I do not see that the interrupt () method was used to interrupt the task. But I see deprecated methods in the code section where you work with threads. Read, please, about correct work with threads. habrahabr.ru/post/133413 - an interesting article about interrupting and stopping threads. In general, it would be nice to interrupt the thread, and then try to do something evil with it. Properly written tasks for threads must respond correctly to interrupts. - DimXenon

0