Erste Variante

Von der KlasseThread erben und die Methode run überschreiben.
 
public class Foo extends Thread
{
 @Override
 public void run()
 {
  // ...
 }
}
Jetzt einfach ein Objekt dieser Klasse erzeugen und die Methode start() und nicht <pre>run()</pre> aufrufen.
 
final int nrOfThreads=4;
Foo [] f=new Foo[nrOfThreads];
for(int i=0; i<nrOfThreads; i++)
{
 f[i]=new Foo();
 f[i].start();
}
 

Zweite Variante

Das Interface Runnable implementieren
 
public class Bar implements Runnable
{
 public void run()
 {
 // ...
 }
}

final int nrOfThreads=4;
Thread [] f=new Thread[nrOfThreads];
for(int i=0; i<nrOfThreads; i++)
{
 f[i]=new Thread(new Bar());
 f[i].start();
}

Auf das Ende eines Threads warten

try
{
 for(int i=0; i<nrOfThreads; i++) f[i].join();
}
catch (InterruptedException e) {};

Normalerweise läuft ein Programm so lange weiter, bis auch der letzte Thread sich beendet hat. Ruft man auf einem Thread die Methode

setDaemon(true);

auf, wird auf das Ende dieses Threads nicht mehr gewartet.

Threads von außen beenden

Um einen Thread von außen beenden zu können, ist es hilfreich, wenn dieser auf

!isInterrupted()

testet und

InterruptedException

Exceptions fängt und dann selbst

interrrupt()

aufruft. Z.B.

while (!isInterrupted())
 {
   try
   {

...

   }
   catch ( InterruptedException e )
   {
    interrupt();
   }
 }

Jetzt kann man den Thread von außen mit

interrupt()

beenden.

Hat man Runnable implementiert, kann man so testen, ob man unterbrochen wurde

while(!Thread.currentThread().isInterrupted())
{
    try
    {
     ...
    }
    catch (InterruptedException e)
    {
     ...
     Thread.currentThread().interrupt();
    }
}

Synchronized

Methode darf nur von einem Thread gleichzeitig betreten werden

public class Foo
{
 synchronized public static void bar()
 {
  // ...
 }
}

Pro Objekt darf Methode nur von einem Thread gleichzeitig betreten werden

public class Foo
{
 synchronized public void bar()
 {
  // ...
 }
}

Entspricht

public class Foo
{
 public void bar()
 {
  synchronized(this)
  {
   // ...
  }
 }
}

Alternative:

Lock l = ...;
l.lock();
try
{ ...}
finally
{
  l.unlock();
}

ThreadLocal

Variablen auf die nur der entsprechenden Thread Zugriff hat (auf private Attribute hätte alle Objekte der entsprechenden Klasse Zugriff).

ThreadLocal foo = new ThreadLocal<Integer>();
foo.set(42);
Integer i=foo.get();

Damit man nicht für jeden Thread auch einen Start Wert ablegen muss kann man

private static final ThreadLocal<SimpleFormatter> perThreadFormatter = new ThreadLocal<SimpleFormatter>() {
        @Override
        protected SimpleFormatter initialValue() {
                return new SimpleFormatter(4, true);
        }
};

Exceptions in Threads

Angenommen man hat 4 Threads plus den Thread in dem main() läuft:

public static void main(String[] args)
{
 Thread t1,t2,t3,t4;
 ...
 t1.start();
 t2.start();
 t3.start();
 t4.start();

 while(true)
 {
   foo();                      
 }
}

Eine nicht gefangene Exception in einem der Threads bricht nur genau diesen ab, alle anderen laufen ganz normal weiter. Selbst wenn der Thread in dem main läuft abbricht, laufen t1 bis t4 weiter.

Möchte man ein anderes Verhalten erreichen muss man manuell eingreifen. Z.B. kann man einen UncaughtExceptionHandler in der Klasse Thread eintragen:

Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler());

Die Klasse muss die Methode

void uncaughtException(Thread t, Throwable e)

überschreiben, die bei jedem Thread aufgerufen wird, der durch eine unbehandelte Exception vor der Beendigung steht. In dieser Methode kann er dann auch andere Threads zum Beenden auffordern.

public class MyUncaughtExceptionHandler implements UncaughtExceptionHandler
{
        private HashMap<Thread, Set<Thread>> connectedThreads;

...            
        @Override
        public void uncaughtException(Thread t, Throwable e)
        {
                for(Thread connectedThread : connectedThreads.get(t))
                {
                        System.out.println("Thread "+t+" had an uncaught exception, stop also the connected thread "+connectedThread);
                        connectedThread.interrupt();
                }
        }
}

Im Debugger funktioniert das möglicherweise nicht wie erwartet.

CountDownLatch

Die Klasse java.util.concurrent.CountDownLatch kann benutzt werden, um mehrere Threads zu koordinieren. Ein CountDownLatch hat einen Startwert, kann um 1 heruntergezählt werden und wenn 0 erreicht wird, kann eine Aktion ausgelöst werden.

Das ist hier ist ein normaler Thread, der zwei CountDownLatch Objekte beinhaltet. Über das eine Objekt teilen wir dem Thread mit, wann er starten darf, über das andere teilt er uns mit, wann er fertig ist.

import java.util.concurrent.CountDownLatch;

public class ThreadLatched extends Thread
{
    private final CountDownLatch startLatch;
    private final CountDownLatch stopLatch;

    /**
     * Constructor
     *
     * @param startLatch latch to signal the start
     * @param stopLatch  latch to signal the end
     */

    public ThreadLatched(CountDownLatch startLatch, CountDownLatch stopLatch)
    {
        this.startLatch = startLatch;
        this.stopLatch  = stopLatch;
    }

    /* (non-Javadoc)
     * @see java.lang.Thread#run()
     */

    public void run()
    {
      try
      {
          // wait for the startLatch to reach 0 to start
          startLatch.await();

          ...
      }
      catch (InterruptedException iex)
      {
       ...          
      }
      finally
      {
          // reduce the stopLatch by one to indicate that we are done
          stopLatch.countDown();
      }
    }
}

Und so kann man die CountDownLatch Objekte dann benutzen

final int nrOfThreads=5;

// this starts at 1, is passed to all threads and once it hits 0, everything starts
CountDownLatch startLatch = new CountDownLatch(1);

// this starts at number of threads, is passed to all threads, every thread that finishes reduces it by one, once it reaches 0 all threads are done
CountDownLatch stopLatch  = new CountDownLatch(nrOfThreads);

// init threads        
for(int i=1; i<=nrOfThreads; i++)
{
    Thread t = new ThreadLatched(startLatch, stopLatch);
     t.start();    
}

// this gives the start signal for all threads
startLatch.countDown();

// this waits for all threads to be finished
stopLatch.await();

Semaphore

Mit einer Semaphore kann man die Anzahl der Thread für eine Resource beschränken. Mit

semaphore.acquire();

kann eine Thread in den kritischen Bereich eintreten, ist der schon zu voll wird automatisch gewartet. Und mit

semaphore.release();

kann er sie beim Verlassen wieder freigeben und einer der Wartenden kann dafür eintreten.

Die Semaphore wird zentral erzeugt und dann allen interessierten Threads zugewiesen. Beim Erzeugen kann man noch festlegen, ob sie fair ist, d.h. der, der am längsten wartet, darf als erstes rein. Fair sein ist hier etwas langsamer, dafür kann es aber auch nicht passieren, dass ein Thread in der sehr Warteposition sehr lange hängen bleibt.

boolean fair=true;
Semaphore s=new Semaphore(3, fair);

MyTask t1=new MyTask(s);
MyTask t2=new MyTask(s);
...
 
import java.util.concurrent.Semaphore;

public class MyTask implements Runnable
{
    private Semaphore semaphore;

    public MyTask(Semaphore pSemaphore)
    {
       this.semaphore=pSemaphore;
    }

    @Override
    public void run()
    {
        try
        {
            this.semaphore.acquire();

            // do the work

            this.semaphore.release();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }

    }

}

ThreadPoolExecutor

Wenn man in Java x Threads zu verarbeiten hat und davon immer y Threads parallel verarbeiten möchte, geht das so:

// number of CPUs in your computer
int cpus=Runtime.getRuntime().availableProcessors();

// what to run
Runnable r1, r2;

// run Runnables
ExecutorService executorService = Executors.newFixedThreadPool(cpus);
...
executorService.execute(r1);
executorService.execute(r2);
...
executorService.shutdown();
executorService.awaitTermination(15, TimeUnit.SECONDS);

Wenn man von den Tasks auch ein Ergebnis zurückgeliefert haben möchte, kann man statt Runable zu nutzen das Interface

Callable< TYPE >

implementieren.

public class MyTask implements Callable< Integer >
{

    @Override
    public Integer call() throws Exception
    {
      Integer result;
       ...
        return result;
    }
}

Und so stellt man neue Tasks ein (es wird nicht darauf gewartet, bis das Ergebnis wirklich vorliegt, man bekommt nur ein mehr oder weniger leeres Future Objekt):

Future<Integer> myResult= executorService.submit( new MyTask() );

So kann man testen ob das Ergebnis bereits vorliegt (es wird nicht darauf gewartet, dass ein Ergebnis vorliegt)

myResult.isDone();

Und so holt man sich das Ergebnis (falls es noch nicht fertig ist wird darauf gewartet)

myResult.get();

Wenn eine Menge von Tasks abgeben möchte und erst weiterlaufen will, wenn alle ein Ergebnis vorliegen geht das so:

List< Callable< TYPE > > myTasks;
results=executorService.invokeAll(myTasks);

Man erhält dann eine komplette Liste aller Ergebnisse, die auch bereits alle vorliegen.

If you would like to have one Thread doing some work in the background, this will call after 60 seconds your method, once it finished wait for 15 seconds and start it again

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(() -> process(), 60, 15, TimeUnit.SECONDS);

private void process() {
  ...
}

ForkJoinPool

Ab Java 7 gibt es ein neues Framework für die Abarbeitung von Task. Hierbei gibt es nicht nur eine Warteschlange mit zu erledigenden Tasks, sondern jeder Task hat selbst auch eine lokale Warteschlange, in die er Tasks einstellen kann. Wenn ein Task mit seiner Warteschlange fertig ist, versucht er erst den anderen Tasks einen Task abzunehmen (zu "stehlen") und wenn alle fertig sind wird die Haupt Warteschlange abgearbeitet. In einigen Fällen kann das schneller sein als eine ThreadPoolExecutor Lösung die nur eine Warteschlange für alle Tasks hat. Ein Beispiel dafür ist, die Abarbeitung verschieden aufwendiger Tasks, die sich selbst in kleinere Subtasks aufteilen. Die Threads, die die leichteren Task bereits abgearbeitet haben, können hier die belasteten Threads Aufgaben stehlen und so die Last gleichmäßig verteilen. Eine andere Anwendung ist die Möglichkeit, dass ein Task auch ein Ergebnis zurückliefern kann. Wenn er seine Aufgabe in Subtasks verteilt, kann er selbst auf das Ergebnis der Subtask warten und wird während er wartet nicht mehr als aktiver Thread gezählt. Bei einer ThreadPoolExecutor Lösung ist es in diesem Fall schwieriger eine ungefähr konstante Anzahl an aktiven Threads zu haben und gleichzeit die Ergebnisse aller Threads optimal aufzusammeln. So kann man einen Pool erzeugen und ihn mit Aufgaben auffüllen. In diesem Beispiel berechnen die Tasks alle Primzahlen im vorgegeben bestimmten Bereich:

import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;

// create a pool of 4 parallel tasks
final int nrOfParallelTasks=4;
ForkJoinPool pool= new ForkJoinPool(nrOfParallelTasks);

// add some tasks to that pool
ForkJoinTask<Set<Integer>> task_A = new PrimeCalculator(1,   100000);
pool.invoke(task_A);

ForkJoinTask<Set<Integer>> task_B = new PrimeCalculator(100000,  200000);        
pool.invoke(task_B);

// collect the results of our tasks
Set<Integer> allPrimes=new HashSet<Integer>();
allPrimes.addAll(task_A.getRawResult());
allPrimes.addAll(task_B.getRawResult());

Und so sieht ein Task aus (die eigentlichen Methoden, die berechnen ob eine Zahl prim ist fehlen).

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.RecursiveTask;

public class PrimeCalculator extends RecursiveTask<Set<Integer> > {
    private final int min;
    private final int max;

    public PrimeCalculator(int pMin, int pMax)
    {
        this.min=pMin;
        this.max=pMax;
    }

    /* (non-Javadoc)
     * @see java.util.concurrent.RecursiveTask#compute()
     */

    @Override
    protected Set<Integer> compute()
    {        
        Set<Integer> result=new HashSet<Integer>();

        // decide if our task should be split in two smaller tasks
        if(notTooBigForOneTask())
        {
            // do the calculation
            System.out.println("Calculate "+this.min+ " to "+this.max);
            int currentNumber=this.min;
            while(currentNumber<=this.max)
            {
                if(isPrime(currentNumber))
                    result.add(currentNumber);

                currentNumber++;
            }            
        }
        else
        {
            // split the task into two subtasks
            int splitPos=splitPosition();

            System.out.println("Split task "+this.min+"-"+this.max+" into two subtasks "+this.min+"-"+splitPos+" and "+splitPos+"-"+this.max);

            PrimeCalculator a=new PrimeCalculator(this.min,   splitPos);
            PrimeCalculator b=new PrimeCalculator(splitPos+1, this.max);
            // add the new tasks to the local queue of this taks
            invokeAll(a, b);
            // collect their results
            result.addAll(a.getRawResult());
            result.addAll(b.getRawResult());
        }
        return result;
    }
}

Links ForkJoinPool