Difference between revisions of "Parallel procedures"

From Lazarus wiki
Jump to navigationJump to search
 
(37 intermediate revisions by 7 users not shown)
Line 4: Line 4:
 
This page describes how to run single procedures in parallel using the MTProcs unit, which simplifies running procedures in parallel and simplifies implementing parallel algorithms.
 
This page describes how to run single procedures in parallel using the MTProcs unit, which simplifies running procedures in parallel and simplifies implementing parallel algorithms.
  
Parallel procedures and methods are often found in parallel algorithms and some programming languages provide built-in support for them (e.g. OpenMP in gcc). See [[OpenMP support|here]] for the plans adding such language features to FPC. Embedding these things into the language can save some typing and allows the compiler to generate code with less overhead. On the other hand there is no single algorithm to run code in parallel. In fact simple approaches often slows down the code. To get good results one must specify some parameters, which a compiler can not guess. For examples see the vast amount of settings and discussions of [http://openmp.org/wp/ OpenMP] and [http://www.khronos.org/opencl/ OpenCL]. You need parallel algorithms. MTProcs helps to implement parallel algorithms.
+
Parallel procedures and methods are often found in parallel algorithms and some programming languages provide built-in support for them (e.g. OpenMP in gcc). See [[OpenMP support]] for the plans adding such language features to FPC. Embedding these things into the language can save some typing and allows the compiler to generate code with less overhead. On the other hand there are many ways to convert a single threaded piece of code into parallel code. In fact simple approaches often slows down the code. To get good results one must specify some parameters, which a compiler can not guess. For examples see the vast amount of settings and discussions of [http://openmp.org/wp/ OpenMP] and [http://www.khronos.org/opencl/ OpenCL]. You need parallel algorithms. MTProcs helps to implement parallel algorithms.
  
 
= Getting MTProcs =
 
= Getting MTProcs =
  
The unit '''mtprocs.pas''' is part of the '''multithreadprocslaz.lpk''' package, which can be downloaded via svn:
+
The unit '''mtprocs.pas''' is part of the '''multithreadprocslaz.lpk''' package, which needs no other packages, only FPC >= 2.6.0.
  
<pre>
+
You can find its sources either on sourceforge
svn co https://lazarus-ccr.svn.sourceforge.net/svnroot/lazarus-ccr/components/multithreadprocs multithreadprocs  
+
 
</pre>
+
<pre>svn co https://lazarus-ccr.svn.sourceforge.net/svnroot/lazarus-ccr/components/multithreadprocs multithreadprocs</pre>
 +
 
 +
Or in Lazarus ''components/multithreadprocs''.
  
 
As always: Open the package multithreadprocslaz.lpk in the IDE once, so that it learns the path.  
 
As always: Open the package multithreadprocslaz.lpk in the IDE once, so that it learns the path.  
Line 21: Line 23:
 
Here is a short example, that does not do anything useful, but to demonstrate how a parallel procedure looks like and how it is called:
 
Here is a short example, that does not do anything useful, but to demonstrate how a parallel procedure looks like and how it is called:
  
<Delphi>
+
<syntaxhighlight lang=pascal>
 
program Test;
 
program Test;
  
Line 43: Line 45:
 
begin
 
begin
 
   ProcThreadPool.DoParallel(@DoSomethingParallel,1,5,nil); // address, startindex, endindex, optional data
 
   ProcThreadPool.DoParallel(@DoSomethingParallel,1,5,nil); // address, startindex, endindex, optional data
end.
+
end.</syntaxhighlight>
</Delphi>
 
  
 
The output will be something like this:
 
The output will be something like this:
<pre>
+
<pre>2
2
 
 
3
 
3
 
1
 
1
 
4
 
4
5
+
5</pre>
</pre>
 
  
 
Here are some short notes. The details follow later.
 
Here are some short notes. The details follow later.
*Multithreading needs under unix the unit '''cthreads''' as explained in [[Multithreaded Application Tutorial]].
+
* Multithreading needs under unix the unit '''cthreads''' as explained in [[Multithreaded Application Tutorial]].
*For speed reasons the '''cmem''' heap manager is recommended, although it does not make any difference in this example.
+
* For speed reasons the '''cmem''' heap manager is recommended, although it does not make any difference in this example.
*The parallel procedure '''DoSomethingParallel''' gets some fixed and predefined parameters.
+
* The parallel procedure '''DoSomethingParallel''' gets some fixed and predefined parameters.
*'''Index''' defines what chunk of work should be done by this call.
+
* '''Index''' defines what chunk of work should be done by this call.
*'''Data''' is the pointer, that was given to ''ProcThreadPool.DoParallel'' as fourth parameter. It is optional and you are free to use it for anything.
+
* '''Data''' is the pointer, that was given to ''ProcThreadPool.DoParallel'' as fourth parameter. It is optional and you are free to use it for anything.
*'''Item''' can be used to access some of the more sophisticated features of the thread pool.
+
* '''Item''' can be used to access some of the more sophisticated features of the thread pool.
*'''ProcThreadPool.DoParallel''' works like a normal procedure call. It returns when it has run completely - that means all threads have finished their work.
+
* '''ProcThreadPool.DoParallel''' works like a normal procedure call. It returns when it has run completely - that means all threads have finished their work.
*The output shows the typical multi threading behavior: the order of the calls is not determined. Several runs can result in different orders.
+
* The output shows the typical multi threading behavior: the order of the calls is not determined. Several runs can result in different orders.
  
 
= Features =
 
= Features =
  
 
Running a procedure in parallel means:
 
Running a procedure in parallel means:
*a procedure or method is executed with an Index running from an arbitrary StartIndex to an arbitrary EndIndex.
+
* a procedure or method is executed with an Index running from an arbitrary StartIndex to an arbitrary EndIndex.
*One or more threads execute these index' in parallel. For example if the Index runs from 1 to 10 and there are 3 threads available in the pool, then 3 threads will run three different index at the same time. Every time a thread finishes one call (one index) it allocates the next index and runs it. The result may be: Thread 1 executes index 3,5,7, thread 2 executes 1,6,8,9 and thread 3 runs 2,4,10.
+
* One or more threads execute these index' in parallel. For example if the Index runs from 1 to 10 and there are 3 threads available in the pool, then 3 threads will run three different index at the same time. Every time a thread finishes one call (one index) it allocates the next index and runs it. The result may be: Thread 1 executes index 3,5,7, thread 2 executes 1,6,8,9 and thread 3 runs 2,4,10.
*The number of threads may vary during run and there is no guarantee for a minimum of threads. In the worst case all index will be executed by one thread - the thread itself.
+
* The number of threads may vary during run and there is no guarantee for a minimum of threads. In the worst case all index will be executed by one thread - the thread itself.
*The maximum number of threads is initialized with a good guess for the current system. It can be manually set at any time via  
+
* The maximum number of threads is initialized with a good guess for the current system. It can be manually set at any time via  
  ProcThreadPool.MaxThreadCount:=8;
+
<syntaxhighlight lang=pascal>ProcThreadPool.MaxThreadCount := 8;</syntaxhighlight>
*You can set the maximum threads for each procedure.
+
* You can set the maximum threads for each procedure.
*A parallel procedure (or method) can call recursively parallel procedures (or methods).
+
* A parallel procedure (or method) can call recursively parallel procedures (or methods).
*Threads are reused, that means they are not destroyed and created for each index, but there is a global pool of threads. On a dual core processor there will be two threads doing all the work - the main thread and one extra thread in the pool.
+
* Threads are reused, that means they are not destroyed and created for each index, but there is a global pool of threads. On a dual core processor there will be two threads doing all the work - the main thread and one extra thread in the pool.
  
 
= Overhead, slow down =
 
= Overhead, slow down =
Line 94: Line 93:
 
You can specify the maximum number of threads for a procedure as the fifth parameter.
 
You can specify the maximum number of threads for a procedure as the fifth parameter.
  
<Delphi>
+
<syntaxhighlight lang=pascal>
 
begin
 
begin
 
   ProcThreadPool.DoParallel(@DoSomethingParallel,1,10,nil,2); // address, startindex, endindex,  
 
   ProcThreadPool.DoParallel(@DoSomethingParallel,1,10,nil,2); // address, startindex, endindex,  
 
       // optional: data (here: nil), optional: maximum number of threads (here: 2)
 
       // optional: data (here: nil), optional: maximum number of threads (here: 2)
end.
+
end.</syntaxhighlight>
</Delphi>
 
  
This can be useful, when the threads work on the same data, and a too many threads will create so many cache conflicts, that they slow down each other. Or when the algorithm uses a lot of WaitForIndex, so that only a few threads can actually work. Then the threads can be used for other tasks.
+
This can be useful, when the threads work on the same data, and too many threads will create so many cache conflicts, that they slow down each other. Or when the algorithm uses a lot of WaitForIndex, so that only a few threads can actually work. Then the threads can be used for other tasks.
  
 
= Wait for index / executing in order =
 
= Wait for index / executing in order =
Line 107: Line 105:
 
Sometimes an Index depends on the result of a former Index. For example a common task is to first compute chunk 5 and then combine the result with the result of chunk 3. Use the WaitForIndex method for that:
 
Sometimes an Index depends on the result of a former Index. For example a common task is to first compute chunk 5 and then combine the result with the result of chunk 3. Use the WaitForIndex method for that:
  
<Delphi>
+
<syntaxhighlight lang=pascal>
 
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
 
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
 
begin
 
begin
Line 115: Line 113:
 
     ... compute ...
 
     ... compute ...
 
   end;
 
   end;
end;
+
end;</syntaxhighlight>
</Delphi>
 
  
 
WaitForIndex takes as an argument an Index that is lower than the current Index or a range. If it returns true, everything worked as expected. If it returns false, then an exception happened in one of the other threads.
 
WaitForIndex takes as an argument an Index that is lower than the current Index or a range. If it returns true, everything worked as expected. If it returns false, then an exception happened in one of the other threads.
  
 
There is an extended function WaitForIndexRange that waits for whole range of Index:
 
There is an extended function WaitForIndexRange that waits for whole range of Index:
<Delphi>
+
 
if not Item.WaitForIndexRange(3,5) then exit; // wait for 3,4 and 5
+
<syntaxhighlight lang=pascal>
</Delphi>
+
if not Item.WaitForIndexRange(3,5) then exit; // wait for 3,4 and 5</syntaxhighlight>
  
 
= Exceptions =
 
= Exceptions =
Line 129: Line 126:
 
If an exception occur in one of the threads, the other threads will finish normally, but will not start a new Index. The pool waits for all threads to finish and will then raise the exception. That's why you can use ''try..except'' like always:
 
If an exception occur in one of the threads, the other threads will finish normally, but will not start a new Index. The pool waits for all threads to finish and will then raise the exception. That's why you can use ''try..except'' like always:
  
<Delphi>
+
<syntaxhighlight lang=pascal>
 
try
 
try
 
   ...
 
   ...
Line 136: Line 133:
 
except
 
except
 
   On E: Exception do ...
 
   On E: Exception do ...
end;
+
end;</syntaxhighlight>
</Delphi>
 
  
 
If there are multiple exceptions, only the first exception will be raised. To handle all exceptions, add a try..except inside your parallel method.
 
If there are multiple exceptions, only the first exception will be raised. To handle all exceptions, add a try..except inside your parallel method.
 +
 +
= Synchronize =
 +
 +
When you want to call a function in the main thread, for example to update some gui element, you can use the class method ''TThread.Synchronize''. It takes as arguments the current TThread and the address of a method. Since 1.2 mtprocs provides a threadvar '''CurrentThread''', which makes synchronizing simple:
 +
 +
<syntaxhighlight lang=pascal>TThread.Synchronize(CurrentThread,@YourMethod);</syntaxhighlight>
 +
 +
This will post an event on the main event queue and wait until the main thread has executed your method. Keep in mind that a bare fpc program does not have an event queue. A LCL or fpgui program has it.
 +
 +
If you create your own TThread descendants, you should set the variable in your Execute method. For example:
 +
 +
<syntaxhighlight lang=pascal>
 +
procedure TYourThread.Execute;
 +
begin
 +
  CurrentThread:=Self;
 +
  ...work...
 +
end;
 +
</syntaxhighlight>
  
 
= Example: Parallel loop =
 
= Example: Parallel loop =
  
This example explains step by step how to convert a loop into a parallel procedure. The example computes the maximum integer of BigArray.
+
This example explains step by step how to convert a loop into a parallel procedure. The example computes the maximum number of an integer array ''BigArray''.
  
 
== The original loop ==
 
== The original loop ==
  
<Delphi>
+
<syntaxhighlight lang=pascal>type
type
 
 
   TArrayOfInteger = array of integer;
 
   TArrayOfInteger = array of integer;
  
Line 159: Line 172:
 
     if Result<BigArray[i] then Result:=BigArray[i];
 
     if Result<BigArray[i] then Result:=BigArray[i];
 
   end;
 
   end;
end;
+
end;</syntaxhighlight>
</Delphi>
 
  
 
== Splitting the work ==
 
== Splitting the work ==
  
The work should be equally distributed over n threads. For this the BigArray is split into equally sized blocks and an outer loop runs over every block:
+
The work should be equally distributed over n threads. For this the BigArray is split into equally sized blocks and an outer loop runs over every block. Typically n is the number of cpus/cores in the system. MTProcs has some utility functions to compute the block size and count:
  
<Delphi>
+
<syntaxhighlight lang=pascal>function FindMaximum(BigArray: TArrayOfInteger): integer;
function FindMaximum(BigArray: TArrayOfInteger): integer;
 
 
var
 
var
   BlockCount: PtrInt;
+
   BlockCount, BlockSize: PtrInt;
  BlockSize: PtrInt;
 
 
   i: PtrInt;
 
   i: PtrInt;
 
   Index: PtrInt;
 
   Index: PtrInt;
   BlockStart: PtrInt;
+
   BlockStart, BlockEnd: PtrInt;
  BlockEnd: PtrInt;
 
 
begin
 
begin
 
   Result:=BigArray[0];
 
   Result:=BigArray[0];
 
+
   ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
   BlockCount:=ProcThreadPool.MaxThreadCount;
 
  BlockSize:=(length(BigArray) div BlockCount)+1;
 
 
   for Index:=0 to BlockCount-1 do begin
 
   for Index:=0 to BlockCount-1 do begin
     BlockStart:=BlockSize*Index;
+
     Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    BlockEnd:=Min((BlockSize+1)*Index,length(BigArray))-1;
 
 
 
 
     for i:=BlockStart to BlockEnd do begin
 
     for i:=BlockStart to BlockEnd do begin
 
       if Result<BigArray[i] then Result:=BigArray[i];
 
       if Result<BigArray[i] then Result:=BigArray[i];
 
     end;
 
     end;
 
   end;
 
   end;
end;
+
end;</syntaxhighlight>
</Delphi>
 
  
 
The added lines can be used for any loop. Eventually a tool can be written to automate this.
 
The added lines can be used for any loop. Eventually a tool can be written to automate this.
Line 197: Line 201:
 
== Local and shared variables ==
 
== Local and shared variables ==
  
For each used variable in the loop you must decide whether it is shared or if each thread uses its own local variable.  
+
For each used variable in the loop you must decide whether it is shared variable used by all threads or if each thread uses its own local variable.  
When a shared variable like BlockCount and BlockSize is only read not much work is needed.  
+
The shared variables BlockCount and BlockSize are only read and do not change, so no work is needed for them.  
But shared variable like Result will be changed by all threads. This can either be achieved with synchronization (e.g. critical section), which is slow, or each thread use a local copy and these local variable are later combined.
+
But a shared variable like Result will be changed by all threads. This can either be achieved with synchronization (e.g. critical section), which is slow, or each thread use a local copy and these local variable are later combined.
  
 
Here is a solution replacing the ''Result'' variable with an array, which is combined in the end:
 
Here is a solution replacing the ''Result'' variable with an array, which is combined in the end:
  
<Delphi>
+
<syntaxhighlight lang=pascal>function FindMaximum(BigArray: TArrayOfInteger): integer;
function FindMaximum(BigArray: TArrayOfInteger): integer;
 
 
var
 
var
 
   // shared variables
 
   // shared variables
   BlockCount: PtrInt;
+
   BlockCount, BlockSize: PtrInt;
  BlockSize: PtrInt;
 
 
   BlockMax: PPtrInt;
 
   BlockMax: PPtrInt;
 
   // local variables
 
   // local variables
 
   i: PtrInt;
 
   i: PtrInt;
 
   Index: PtrInt;
 
   Index: PtrInt;
   BlockStart: PtrInt;
+
   BlockStart, BlockEnd: PtrInt;
  BlockEnd: PtrInt;
 
 
begin
 
begin
   BlockCount:=ProcThreadPool.MaxThreadCount;
+
   ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  BlockSize:=(length(BigArray) div BlockCount)+1;
 
 
   BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // allocate space for local variables
 
   BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // allocate space for local variables
 
   // compute maximum for each block
 
   // compute maximum for each block
 
   for Index:=0 to BlockCount-1 do begin
 
   for Index:=0 to BlockCount-1 do begin
 
     // compute maximum of block
 
     // compute maximum of block
     BlockStart:=BlockSize*Index;
+
     Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    BlockEnd:=Min((BlockSize+1)*Index,length(BigArray))-1;
 
 
 
 
     BlockMax[Index]:=BigArray[BlockStart];
 
     BlockMax[Index]:=BigArray[BlockStart];
 
     for i:=BlockStart to BlockEnd do begin
 
     for i:=BlockStart to BlockEnd do begin
 
       if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
 
       if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
 
     end;
 
     end;
 
 
   end;
 
   end;
 
   // compute maximum of all blocks
 
   // compute maximum of all blocks
Line 238: Line 235:
  
 
   FreeMem(BlockMax);
 
   FreeMem(BlockMax);
end;
+
end;</syntaxhighlight>
</Delphi>
 
  
Again this approach is straightforward and could be automated.
+
This approach is straightforward and could be automated. The process will need some hints from the programmer though.
  
== Combine shared variables ==
+
== DoParallel ==
 
 
Because DoParallel only takes one pointer, all shared variables must be combined via a record (or object or class):
 
  
<Delphi>
+
The final step is to move the inner loop into a sub procedure and replace the loop with a call to DoParallelNested.
type
 
  TFindMaximumSharedVars = record
 
    _BigArray: TArrayOfInteger;
 
    BlockCount: PtrInt;
 
    BlockSize: PtrInt;
 
    BlockMax: PPtrInt;
 
  end;
 
  PFindMaximumSharedVars = ^TFindMaximumSharedVars;
 
  
function FindMaximum(BigArray: TArrayOfInteger): integer;
+
<syntaxhighlight lang=pascal>
 +
...
 +
{$ModeSwitch nestedprocvars}
 +
uses mtprocs;
 +
...
 +
function TMainForm.FindMaximum(BigArray: TArrayOfInteger): integer;
 
var
 
var
   SharedVars: TFindMaximumSharedVars;
+
   BlockCount, BlockSize: PtrInt;
  i: PtrInt;
+
  BlockMax: PPtrInt;
  Index: PtrInt;
 
  BlockStart: PtrInt;
 
  BlockEnd: PtrInt;
 
begin
 
  with SharedVars do begin
 
    // copy parameters
 
    _BigArray:=BigArray;
 
    // split work into equally sized block
 
    BlockCount:=ProcThreadPool.MaxThreadCount;
 
    BlockSize:=(length(BigArray) div BlockCount)+1;
 
    // allocate local variables
 
    BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt));
 
    // compute maximum for each block
 
    for Index:=0 to BlockCount-1 do begin
 
      // compute maximum of block
 
      BlockStart:=BlockSize*Index;
 
      BlockEnd:=Min((BlockSize+1)*Index,length(BigArray))-1;
 
      BlockMax[Index]:=BigArray[BlockStart];
 
      for i:=BlockStart to BlockEnd do begin
 
        if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
 
      end;
 
    end;
 
    // compute maximum of all blocks
 
    Result:=BlockMax[0];
 
    for Index:=1 to BlockCount-1 do
 
      Result:=Max(Result,BlockMax[Index]);
 
    FreeMem(BlockMax);
 
  end;
 
end;
 
</Delphi>
 
  
This is straightforward too and can be automated.
+
   procedure FindMaximumParallel(Index: PtrInt; Data: Pointer;
 
+
                                Item: TMultiThreadProcItem);
Now the inner loop is ready for running parallel.
+
  var
 
+
    i: integer;
== DoParallel ==
+
    BlockStart, BlockEnd: PtrInt;
 
+
   begin
Finally the inner loop and its local variables ''i'', ''BlockStart'' and ''BlockEnd'' are moved to a new procedure and called in parallel:
 
 
 
<Delphi>
 
type
 
   TFindMaximumSharedVars = record
 
    _BigArray: TArrayOfInteger;
 
    BlockCount: PtrInt;
 
    BlockSize: PtrInt;
 
    BlockMax: PPtrInt;
 
  end;
 
  PFindMaximumSharedVars = ^TFindMaximumSharedVars;
 
 
 
procedure FindMaximumParallel(Index: PtrInt; Data: Pointer;
 
                              Item: TMultiThreadProcItem);
 
var
 
  SharedVars: PFindMaximumSharedVars absolute Data;
 
  i: integer;
 
  BlockStart: PtrInt;
 
  BlockEnd: PtrInt;
 
begin
 
   with SharedVars^ do begin
 
 
     // compute maximum of block
 
     // compute maximum of block
     BlockStart:=BlockSize*Index;
+
     Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    BlockEnd:=Min((BlockSize+1)*Index,length(_BigArray))-1;
+
     BlockMax[Index]:=BigArray[BlockStart];
     BlockMax[Index]:=_BigArray[BlockStart];
+
     for i:=BlockStart to BlockEnd do
     for i:=BlockStart to BlockEnd do begin
+
       if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
       if BlockMax[Index]<_BigArray[i] then BlockMax[Index]:=_BigArray[i];
 
    end;
 
 
   end;
 
   end;
end;
 
 
function FindMaximum(BigArray: TArrayOfInteger): integer;
 
 
var
 
var
  SharedVars: TFindMaximumSharedVars;
 
 
   Index: PtrInt;
 
   Index: PtrInt;
 
begin
 
begin
   with SharedVars do begin
+
   // split work into equally sized blocks
    // copy parameters
+
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
    _BigArray:=BigArray;
+
  // allocate local/thread variables
    // split work into equally sized block
+
  BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt));
    BlockCount:=ProcThreadPool.MaxThreadCount;
+
  // compute maximum for each block
    BlockSize:=(length(BigArray) div BlockCount)+1;
+
  ProcThreadPool.DoParallelNested(@FindMaximumParallel,0,BlockCount-1);
    // allocate local variables
+
  // compute maximum of all blocks
    BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt));
+
  Result:=BlockMax[0];
    // compute maximum for each block
+
  for Index:=1 to BlockCount-1 do
    ProcThreadPool.DoParallel(@FindMaximumParallel,0,BlockCount-1,@SharedVars);
+
    Result:=Max(Result,BlockMax[Index]);
    // compute maximum over all blocks
+
  FreeMem(BlockMax);
    Result:=BlockMax[0];
+
end;</syntaxhighlight>
    for Index:=1 to BlockCount-1 do
 
      Result:=Max(Result,BlockMax[Index]);
 
    FreeMem(BlockMax);
 
  end;
 
end;
 
</Delphi>
 
  
 
This was mostly copy and paste, so again this could be automated.
 
This was mostly copy and paste, so again this could be automated.
  
=Experimental=
+
=Example: parallel sort=
 +
The unit mtputils contains the function '''ParallelSortFPList''' which uses mtprocs to sort a TFPList in parallel. A compare function must be given.
 +
 
 +
<syntaxhighlight lang=pascal>procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare; MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);</syntaxhighlight>
 +
 
 +
This function uses the parallel MergeSort algorithm. The parameter MaxThreadCount is passed to DoParallel. A 0 means to use the system default.
  
Since 1.0.1 there is a new DoParallelLocalProc which allows to call nested procedures of procedures. Beware that calling nested procedures is not yet supported by the compiler and therefore one has to use unsafe type casts. For example:
+
Optionally you can provide your own sort function (OnSortPart) to sort the part of each single thread. For example you can sort the blocks via QuickSort, which are then merged. Then you have a '''parallel QuickSort'''. See TFPList.Sort for an example implementation of QuickSort.
  
<Delphi>
+
=Using nested procedure=
procedure DoSomething(Value: PtrInt);
+
 
 +
<syntaxhighlight lang=pascal>procedure DoSomething(Value: PtrInt);
 
var
 
var
 
   p: array[1..2] of Pointer;
 
   p: array[1..2] of Pointer;
Line 372: Line 306:
 
   i: Integer;
 
   i: Integer;
 
begin
 
begin
   ProcThreadPool.DoParallelLocalProc(@SubProc,1,2); // BEWARE: no type checking is done for SubProc.
+
   ProcThreadPool.DoParallelNested(@SubProc,1,2);
                // DoSomething must be a procedure, not a method.
+
end;</syntaxhighlight>
end;
 
</Delphi>
 
  
 
This can save a lot of refactoring and makes the parallel procedure much more readable.
 
This can save a lot of refactoring and makes the parallel procedure much more readable.
  
With this new technique the above FindMaximum example becomes much shorter:
+
=See also=
  
<Delphi>
+
* [[Multithreaded Application Tutorial]]
function FindMaximum(BigArray: TArrayOfInteger): integer;
+
* [[OpenCL]]
var
 
  BlockCount: PtrInt;
 
  BlockSize: PtrInt;
 
  BlockMax: PPtrInt;
 
 
 
  procedure FindMaximumParallel(Index: PtrInt; Data: Pointer;
 
                                Item: TMultiThreadProcItem);
 
  var
 
    i: integer;
 
    BlockStart: PtrInt;
 
    BlockEnd: PtrInt;
 
  begin
 
    // compute maximum of block
 
    BlockStart:=BlockSize*Index;
 
    BlockEnd:=Min((BlockSize+1)*Index,length(BigArray))-1;
 
    BlockMax[Index]:=BigArray[BlockStart];
 
    for i:=BlockStart to BlockEnd do
 
      if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
 
  end;
 
var
 
  Index: PtrInt;
 
begin
 
  // split work into equally sized block
 
  BlockCount:=ProcThreadPool.MaxThreadCount;
 
  BlockSize:=(length(BigArray) div BlockCount)+1;
 
  // allocate local/thread variables
 
  BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt));
 
  // compute maximum for each block
 
  ProcThreadPool.DoParallelLocalProc(@FindMaximumParallel,0,BlockCount-1);
 
  // compute maximum of all blocks
 
  Result:=BlockMax[0];
 
  for Index:=1 to BlockCount-1 do
 
    Result:=Max(Result,BlockMax[Index]);
 
  FreeMem(BlockMax);
 
end;
 
</Delphi>
 

Latest revision as of 10:25, 10 February 2020

Deutsch (de) English (en) 日本語 (ja) русский (ru)

Overview

This page describes how to run single procedures in parallel using the MTProcs unit, which simplifies running procedures in parallel and simplifies implementing parallel algorithms.

Parallel procedures and methods are often found in parallel algorithms and some programming languages provide built-in support for them (e.g. OpenMP in gcc). See OpenMP support for the plans adding such language features to FPC. Embedding these things into the language can save some typing and allows the compiler to generate code with less overhead. On the other hand there are many ways to convert a single threaded piece of code into parallel code. In fact simple approaches often slows down the code. To get good results one must specify some parameters, which a compiler can not guess. For examples see the vast amount of settings and discussions of OpenMP and OpenCL. You need parallel algorithms. MTProcs helps to implement parallel algorithms.

Getting MTProcs

The unit mtprocs.pas is part of the multithreadprocslaz.lpk package, which needs no other packages, only FPC >= 2.6.0.

You can find its sources either on sourceforge

svn co https://lazarus-ccr.svn.sourceforge.net/svnroot/lazarus-ccr/components/multithreadprocs multithreadprocs

Or in Lazarus components/multithreadprocs.

As always: Open the package multithreadprocslaz.lpk in the IDE once, so that it learns the path. To use the package in your project: Either use IDE Menu / Package / Open recent package / .../multithreadprocslaz.lpk / More / add to project to use it in your current project. Or add it via the project inspector.

Simple example

Here is a short example, that does not do anything useful, but to demonstrate how a parallel procedure looks like and how it is called:

program Test;

{$mode objfpc}{$H+}

uses
  {$IFDEF UNIX}
  cthreads, cmem,
  {$ENDIF}
  MTProcs;

// a simple parallel procedure
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
var
  i: Integer;
begin
  writeln(Index);
  for i:=1 to Index*1000000 do ; // do some work
end;

begin
  ProcThreadPool.DoParallel(@DoSomethingParallel,1,5,nil); // address, startindex, endindex, optional data
end.

The output will be something like this:

2
3
1
4
5

Here are some short notes. The details follow later.

  • Multithreading needs under unix the unit cthreads as explained in Multithreaded Application Tutorial.
  • For speed reasons the cmem heap manager is recommended, although it does not make any difference in this example.
  • The parallel procedure DoSomethingParallel gets some fixed and predefined parameters.
  • Index defines what chunk of work should be done by this call.
  • Data is the pointer, that was given to ProcThreadPool.DoParallel as fourth parameter. It is optional and you are free to use it for anything.
  • Item can be used to access some of the more sophisticated features of the thread pool.
  • ProcThreadPool.DoParallel works like a normal procedure call. It returns when it has run completely - that means all threads have finished their work.
  • The output shows the typical multi threading behavior: the order of the calls is not determined. Several runs can result in different orders.

Features

Running a procedure in parallel means:

  • a procedure or method is executed with an Index running from an arbitrary StartIndex to an arbitrary EndIndex.
  • One or more threads execute these index' in parallel. For example if the Index runs from 1 to 10 and there are 3 threads available in the pool, then 3 threads will run three different index at the same time. Every time a thread finishes one call (one index) it allocates the next index and runs it. The result may be: Thread 1 executes index 3,5,7, thread 2 executes 1,6,8,9 and thread 3 runs 2,4,10.
  • The number of threads may vary during run and there is no guarantee for a minimum of threads. In the worst case all index will be executed by one thread - the thread itself.
  • The maximum number of threads is initialized with a good guess for the current system. It can be manually set at any time via
ProcThreadPool.MaxThreadCount := 8;
  • You can set the maximum threads for each procedure.
  • A parallel procedure (or method) can call recursively parallel procedures (or methods).
  • Threads are reused, that means they are not destroyed and created for each index, but there is a global pool of threads. On a dual core processor there will be two threads doing all the work - the main thread and one extra thread in the pool.

Overhead, slow down

The overhead heavily depends on the system (number and types of cores, type of shared memory, speed of critical sections, cache size). Here are some general hints:

  • Each chunk of work (index) should take at least some milliseconds.
  • The overhead is independent of recursive levels of parallel procedures.

Multi threading overhead, which is independent of the MTProcs units, but simply results from todays computer architectures:

  • As soon as one thread is created your program becomes multi threaded and the memory managers must use critical sections, which slows down. So even if you do nothing with the thread your program might become slower.
  • The cmem heap manager is on some systems much faster for multi threading. In my benchmarks especially on intel systems and especially under OS X the speed difference can be more than 10 times.
  • Strings and interfaces are globally reference counted. Each access needs a critical section. Processing strings in multiple threads will therefore hardly give any speed up. Use PChars instead.
  • Each chunk of work (index) should work on a disjunctive part of memory to avoid cross cache updates.
  • Do not work on vast amounts of memory. On some systems one thread alone is fast enough to fill the memory bus speed. When the memory bus maximum speed is reached, any further thread will slow down instead of making it faster.

Setting the maximum number of threads for a procedure

You can specify the maximum number of threads for a procedure as the fifth parameter.

begin
  ProcThreadPool.DoParallel(@DoSomethingParallel,1,10,nil,2); // address, startindex, endindex, 
      // optional: data (here: nil), optional: maximum number of threads (here: 2)
end.

This can be useful, when the threads work on the same data, and too many threads will create so many cache conflicts, that they slow down each other. Or when the algorithm uses a lot of WaitForIndex, so that only a few threads can actually work. Then the threads can be used for other tasks.

Wait for index / executing in order

Sometimes an Index depends on the result of a former Index. For example a common task is to first compute chunk 5 and then combine the result with the result of chunk 3. Use the WaitForIndex method for that:

procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
begin
  ... compute chunk number 'Index' ...
  if Index=5 then begin
    if not Item.WaitForIndex(3) then exit;
    ... compute ...
  end;
end;

WaitForIndex takes as an argument an Index that is lower than the current Index or a range. If it returns true, everything worked as expected. If it returns false, then an exception happened in one of the other threads.

There is an extended function WaitForIndexRange that waits for whole range of Index:

if not Item.WaitForIndexRange(3,5) then exit; // wait for 3,4 and 5

Exceptions

If an exception occur in one of the threads, the other threads will finish normally, but will not start a new Index. The pool waits for all threads to finish and will then raise the exception. That's why you can use try..except like always:

try
  ...
  ProcThreadPool.DoParallel(...);
  ...
except
  On E: Exception do ...
end;

If there are multiple exceptions, only the first exception will be raised. To handle all exceptions, add a try..except inside your parallel method.

Synchronize

When you want to call a function in the main thread, for example to update some gui element, you can use the class method TThread.Synchronize. It takes as arguments the current TThread and the address of a method. Since 1.2 mtprocs provides a threadvar CurrentThread, which makes synchronizing simple:

TThread.Synchronize(CurrentThread,@YourMethod);

This will post an event on the main event queue and wait until the main thread has executed your method. Keep in mind that a bare fpc program does not have an event queue. A LCL or fpgui program has it.

If you create your own TThread descendants, you should set the variable in your Execute method. For example:

procedure TYourThread.Execute;
begin
  CurrentThread:=Self;
  ...work...
end;

Example: Parallel loop

This example explains step by step how to convert a loop into a parallel procedure. The example computes the maximum number of an integer array BigArray.

The original loop

type
  TArrayOfInteger = array of integer;

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  i: PtrInt;
begin
  Result:=BigArray[0];
  for i:=1 to length(BigArray)-1 do begin
    if Result<BigArray[i] then Result:=BigArray[i];
  end;
end;

Splitting the work

The work should be equally distributed over n threads. For this the BigArray is split into equally sized blocks and an outer loop runs over every block. Typically n is the number of cpus/cores in the system. MTProcs has some utility functions to compute the block size and count:

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  BlockCount, BlockSize: PtrInt;
  i: PtrInt;
  Index: PtrInt;
  BlockStart, BlockEnd: PtrInt;
begin
  Result:=BigArray[0];
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  for Index:=0 to BlockCount-1 do begin
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    for i:=BlockStart to BlockEnd do begin
      if Result<BigArray[i] then Result:=BigArray[i];
    end;
  end;
end;

The added lines can be used for any loop. Eventually a tool can be written to automate this.

The work is now split into smaller pieces. Now the pieces must become more independent.

Local and shared variables

For each used variable in the loop you must decide whether it is shared variable used by all threads or if each thread uses its own local variable. The shared variables BlockCount and BlockSize are only read and do not change, so no work is needed for them. But a shared variable like Result will be changed by all threads. This can either be achieved with synchronization (e.g. critical section), which is slow, or each thread use a local copy and these local variable are later combined.

Here is a solution replacing the Result variable with an array, which is combined in the end:

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  // shared variables
  BlockCount, BlockSize: PtrInt;
  BlockMax: PPtrInt;
  // local variables
  i: PtrInt;
  Index: PtrInt;
  BlockStart, BlockEnd: PtrInt;
begin
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // allocate space for local variables
  // compute maximum for each block
  for Index:=0 to BlockCount-1 do begin
    // compute maximum of block
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    BlockMax[Index]:=BigArray[BlockStart];
    for i:=BlockStart to BlockEnd do begin
      if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
    end;
  end;
  // compute maximum of all blocks
  // (if you have hundreds of threads there are better solutions)
  Result:=BlockMax[0];
  for Index:=1 to BlockCount-1 do
    Result:=Max(Result,BlockMax[Index]);

  FreeMem(BlockMax);
end;

This approach is straightforward and could be automated. The process will need some hints from the programmer though.

DoParallel

The final step is to move the inner loop into a sub procedure and replace the loop with a call to DoParallelNested.

...
{$ModeSwitch nestedprocvars}
uses mtprocs;
...
function TMainForm.FindMaximum(BigArray: TArrayOfInteger): integer;
var
  BlockCount, BlockSize: PtrInt;
  BlockMax: PPtrInt;

  procedure FindMaximumParallel(Index: PtrInt; Data: Pointer;
                                Item: TMultiThreadProcItem);
  var
    i: integer;
    BlockStart, BlockEnd: PtrInt;
  begin
    // compute maximum of block
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    BlockMax[Index]:=BigArray[BlockStart];
    for i:=BlockStart to BlockEnd do
      if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
  end;
var
  Index: PtrInt;
begin
  // split work into equally sized blocks
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  // allocate local/thread variables
  BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt));
  // compute maximum for each block
  ProcThreadPool.DoParallelNested(@FindMaximumParallel,0,BlockCount-1);
  // compute maximum of all blocks
  Result:=BlockMax[0];
  for Index:=1 to BlockCount-1 do
    Result:=Max(Result,BlockMax[Index]);
  FreeMem(BlockMax);
end;

This was mostly copy and paste, so again this could be automated.

Example: parallel sort

The unit mtputils contains the function ParallelSortFPList which uses mtprocs to sort a TFPList in parallel. A compare function must be given.

procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare; MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);

This function uses the parallel MergeSort algorithm. The parameter MaxThreadCount is passed to DoParallel. A 0 means to use the system default.

Optionally you can provide your own sort function (OnSortPart) to sort the part of each single thread. For example you can sort the blocks via QuickSort, which are then merged. Then you have a parallel QuickSort. See TFPList.Sort for an example implementation of QuickSort.

Using nested procedure

procedure DoSomething(Value: PtrInt);
var
  p: array[1..2] of Pointer;

  procedure SubProc(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
  begin
    p[Index]:=Pointer(Value); // accessing local variables and parameters is possible!
  end;

var
  i: Integer;
begin
  ProcThreadPool.DoParallelNested(@SubProc,1,2);
end;

This can save a lot of refactoring and makes the parallel procedure much more readable.

See also