Parallel procedures/ru

From Lazarus wiki

Deutsch (de) English (en) 日本語 (ja) русский (ru)

Общие сведения

На этой странице описывается параллельное выполнение отдельных процедур с помощью модуля MTProcs, что упрощает параллельное выполнение процедур и реализацию параллельных алгоритмов.

Параллельные процедуры и методы часто встречаются в параллельных алгоритмах, и некоторые языки программирования предоставляют встроенную поддержку для них (например, OpenMP в gcc). См. "поддержка OpenMP" для ознакомления с планами добавления таких языковых функций в FPC. Эти возможности, будучи встроенными в язык, могут сэкономить время на ввод кода и позволяют компилятору создавать код с меньшими затратами. С другой стороны, существует множество способов преобразования однопоточного фрагмента кода в параллельный. Однопоточный подход к программированию часто замедляет код. Для получения хороших результатов необходимо указать некоторые параметры, которые компилятор не может определить самостоятельно. Для ознакомления с примерами, посетите следующие ресурсы: / OpenMP и / OpenCL. Если вам нужны параллельные алгоритмы, то MTProcs поможет в их реализации.

Добавление MTProcs

Модуль mtprocs.pas является частью пакета multithreadprocslaz.lpk. Он не зависим от других пакетов и требует лишь наличия FPC >= 2.6.0.

Вы можете найти его исходники на sourceforge:

svn co https://lazarus-ccr.svn.sourceforge.net/svnroot/lazarus-ccr/components/multithreadprocs multithreadprocs

Или в Lazarus components/multithreadprocs.

Как обычно, откройте пакет multithreadprocslaz.lpk в IDE один раз, что бы он узнал путь. Для использования пакета в вашем проекте, выберите: Package / Open recent package / .../multithreadprocslaz.lpk / More / add to project

Простой пример

Ниже приведён пример, который ничего не делает, но демонстрирует, как выглядит параллельная процедура и как она вызывается:

program Test;

{$mode objfpc}{$H+}

uses
  {$IFDEF UNIX}
  cthreads, cmem,
  {$ENDIF}
  MTProcs;

// простая параллельная процедура
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
var
  i: Integer;
begin
  writeln(Index);
  for i:=1 to Index*1000000 do ; // делаем некую работу
end;

begin
  ProcThreadPool.DoParallel(@DoSomethingParallel,1,5,nil); // адрес, стартовый индекс, конечный индекс, дополнительные данные
end.

На выходе будет что-то вроде этого:

2
3
1
4
5

Тут следует дать несколько коротких заметок. Подробности позже.

  • Многопоточность в UNIX требует модуль cthreads как объяснено в учебнике по многопоточным Приложениям.
  • Для повышения скорости cmem рекомендуется использовать диспетчер кучи, хотя в этом примере это не имеет значения.
  • Параллельная процедура DoSomethingParallel получает некие фиксированные и предопределенные параметры.
  • Index определяет, какую часть работы должен сделать этот вызов.
  • Data - это указатель, который был дан в ProcThreadPool.DoParallel как четвертый параметр. Это необязательно и вы можете использовать его для чего угодно.
  • Item может использоваться для доступа к некоторым более сложным функциям пула потоков.
  • ProcThreadPool.DoParallel работает как обычный вызов процедуры. Он завершится, когда он полностью будет выполнен - это означает, что все потоки должны завершить свою работу.
  • Выходные данные показывают типичное многопоточное поведение: порядок вызовов не определен. Несколько запусков могут привести к разным порядкам чисел.

Особенности

Выполнение процедуры параллельно означает:

  • процедура или метод выполняется с индексом, идущим от произвольного StartIndex к произвольному EndIndex.
  • Один или несколько потоков выполняют эти индексы параллельно. Например, если Индекс выполняется от 1 до 10 и в пуле доступно 3 потока, то 3 потока будут одновременно выполнять три разных индекса. Каждый раз, когда поток завершает один вызов (один индекс), он выделяет следующий индекс и выполняет его. Результатом может быть: поток 1 выполняет индекс 3,5,7, поток 2 выполняет 1,6,8,9 и поток 3 работает 2,4,10.
  • Количество потоков может меняться во время выполнения, и нет никакой гарантии минимума потоков. В худшем случае весь индекс будет выполняться одним потоком.
  • Максимальное количество потоков инициализируется для хорошей работы с учётом текущей системы. Однако, его можно изменить вручную через:
ProcThreadPool.MaxThreadCount := 8;
  • Вы можете задать максимальное количество потоков для каждой процедуры.
  • Параллельная процедура (или метод) может вызывать рекурсивно параллельные процедуры (или методы).
  • Потоки используются повторно, это означает, что они не уничтожаются и не создаются для каждого индекса, но существует глобальный пул потоков. На двухъядерном процессоре будет два потока, выполняющих всю работу-основной поток и один дополнительный поток в пуле.

Накладные расходы, замедление

Накладные расходы сильно зависят от системы (количества и типа ядер, тип общей памяти, скорость критических разделов, размер кэша). Вот некоторые общие советы:

  • Каждый блок работы (index) должен занимать не менее нескольких миллисекунд.
  • Накладные расходы не зависят от рекурсивных уровней параллельных процедур.

Накладные расходы на многопоточность, которые не зависят от модулей MTProcs, а просто являются результатом современных компьютерных архитектур:

  • Как только создается один поток, ваша программа становится многопоточной, и менеджеры памяти должны использовать критические разделы, что замедляет ее работу. Таким образом, даже если вы ничего не сделаете с потоком, ваша программа может стать медленнее.
  • Менеджер кучи cmem в некоторых системах намного быстрее при многопоточности. В моих тестах, особенно на системах Intel и особенно под OS X, разница в скорости может быть более 10 раз.
  • Строки и интерфейсы подсчитываются глобально. Для каждого случая доступа нужна критическая секция. Таким образом, обработка строк в нескольких потоках вряд ли увеличит скорость работы программы. Вместо них используйте PChars.
  • Каждый фрагмент работы (индекс) должен работать с дизъюнктивной частью памяти, чтобы избежать перекрестных обновлений кеша.
  • Не работайте с большим объемом памяти. В некоторых системах одного потока достаточно, чтобы заполнить скорость шины памяти. По достижении максимальной скорости шины памяти любой последующий поток будет замедляться, а не ускоряться.

Установка максимального числа потоков для процедуры

Вы можете указать максимальное число потоков для процедуры в пятом параметре.

begin
  ProcThreadPool.DoParallel(@DoSomethingParallel,1,10,nil,2); // адрес, начальный_индекс, конечный_индекс, 
      // необязательный параметр: данные (в данном случае: nil), необязательный параметр: максимальное число потоков (в данном случае: 2)
end.

Данная опция может быть полезна когда потоки работают с одними и теми же данными, и слишком большое количество потоков будет создавать много конфликтов с кэшем, что в итоге приведет к их замедлению или когда алгоритм использует много WaitForIndex, так что фактически могут работать только несколько потоков. Затем потоки могут использоваться для других задач.

Дождитесь индексации/выполнения по порядку

Иногда результат текущего индекса зависит от результата предыдущего индекса. Например, обычная задача - сначала вычислить блок 5, а затем объединить результат с результатом блока 3. Используйте для этого метод WaitForIndex:

procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
begin
  ... вычисляем номер блока 'Index' ...
  if Index=5 then begin
    if not Item.WaitForIndex(3) then exit;
    ... вычисляем ...
  end;
end;

WaitForIndex принимает в качестве аргумента индекс, который меньше текущего индекса или диапазона. Если он вернет true, все работает, как ожидалось. Если он возвращает false, то в одном из других потоков произошло исключение.

Существует расширенная функция WaitForIndexRange, ожидающая весь диапазон индекса:

if not Item.WaitForIndexRange(3,5) then exit; // ожидаем для значений индекса 3,4 и 5

Исключения

Если в одном из потоков возникает исключение, то другие потоки завершают работу нормально, но не запускают новый индекс. Пул ожидает завершения всех потоков, а затем вызовет исключение. Вот почему вы можете использовать try..except как всегда:

try
  ...
  ProcThreadPool.DoParallel(...);
  ...
except
  On E: Exception do ...
end;

Если есть несколько исключений, будет вызвано только первое исключение. Чтобы обрабатывать все исключения, добавьте в свой параллельный метод команду try..except.

Синхронизация

Если вы хотите вызвать функцию в основном потоке, например, чтобы обновить какой-либо элемент графического интерфейса, вы можете использовать метод класса TThread.Synchronize. Он принимает в качестве аргументов текущий TThread и адрес метода. Начиная с версии 1.2 mtprocs предоставляет переменную потока CurrentThread, которая упрощает синхронизацию:

TThread.Synchronize(CurrentThread,@YourMethod);

Это разместит событие в основной очереди событий и дождется, пока основной поток выполнит ваш метод. Имейте в виду, что голая программа fpc не имеет очереди событий. Это есть в программе LCL или fpgui.

Если вы создаете своих собственных потомков TThread, вы должны установить переменную в своем методе Execute. Например:

procedure TYourThread.Execute;
begin
  CurrentThread:=Self;
  ...работаем...
end;

Пример: Параллельный цикл

В этом примере шаг за шагом объясняется, как преобразовать цикл в параллельную процедуру. В примере вычисляется максимальное количество целочисленного массива BigArray.

Исходный цикл

type
  TArrayOfInteger = array of integer;

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  i: PtrInt;
begin
  Result:=BigArray[0];
  for i:=1 to length(BigArray)-1 do begin
    if Result<BigArray[i] then Result:=BigArray[i];
  end;
end;

Разделение работы

Работа должна быть равномерно распределена по n потокам. Для этого BigArray разбивается на блоки одинакового размера, и внешний цикл проходит по каждому блоку. Обычно n - это количество процессоров/ядер в системе. MTProcs имеет несколько служебных функций для вычисления размера и количества блоков:

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  BlockCount, BlockSize: PtrInt;
  i: PtrInt;
  Index: PtrInt;
  BlockStart, BlockEnd: PtrInt;
begin
  Result:=BigArray[0];
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  for Index:=0 to BlockCount-1 do begin
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    for i:=BlockStart to BlockEnd do begin
      if Result<BigArray[i] then Result:=BigArray[i];
    end;
  end;
end;

Добавленные строки можно использовать для любого цикла. Со временем можно будет написать инструмент для автоматизации этого.

Теперь работа разбита на более мелкие части. Теперь части должны стать более независимыми.

Локальные и общие переменные

Для каждой используемой переменной в цикле вы должны решить, является ли она общей переменной, используемой всеми потоками, или каждый поток использует свою собственную локальную переменную. Общие переменные BlockCount и BlockSize только читаются и не меняются, поэтому для них не требуется никакой работы. Но общая переменная, такая как Result, будет изменяться всеми потоками. Это может быть достигнуто либо с помощью синхронизации (например, критической секции), которая выполняется медленно, либо каждый поток использует локальную копию, и эти локальные переменные позже объединяются.

Вот решение, заменяющее переменную Result на массив, который в конце объединяется:

function FindMaximum(BigArray: TArrayOfInteger): integer;
var
  // общие переменные
  BlockCount, BlockSize: PtrInt;
  BlockMax: PPtrInt;
  // локальные переменные
  i: PtrInt;
  Index: PtrInt;
  BlockStart, BlockEnd: PtrInt;
begin
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // выделение памяти для локальных переменных
  // вычисляем максимум для каждого блока
  for Index:=0 to BlockCount-1 do begin
    // вычисляем максимум блока
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    BlockMax[Index]:=BigArray[BlockStart];
    for i:=BlockStart to BlockEnd do begin
      if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
    end;
  end;
  // вычисляем максимум всех блоков
  // (это лучшее решение, если у вас сотни потоков)
  Result:=BlockMax[0];
  for Index:=1 to BlockCount-1 do
    Result:=Max(Result,BlockMax[Index]);

  FreeMem(BlockMax);
end;

Этот подход прост и может быть автоматизирован. Однако для этого процесса потребуются подсказки от программиста.

DoParallel

Последний шаг - переместить внутренний цикл в подпрограмму и заменить цикл вызовом DoParallelNested.

...
{$ModeSwitch nestedprocvars}
uses mtprocs;
...
function TMainForm.FindMaximum(BigArray: TArrayOfInteger): integer;
var
  BlockCount, BlockSize: PtrInt;
  BlockMax: PPtrInt;

  procedure FindMaximumParallel(Index: PtrInt; Data: Pointer;
                                Item: TMultiThreadProcItem);
  var
    i: integer;
    BlockStart, BlockEnd: PtrInt;
  begin
    // вычисляем максимум блоков
    Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
    BlockMax[Index]:=BigArray[BlockStart];
    for i:=BlockStart to BlockEnd do
      if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
  end;
var
  Index: PtrInt;
begin
  // разделяем работу на блоки одинакового размера
  ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
  // выделяем память для локальных/потоковых переменных
  BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt));
  // вычисляем максимум для каждого блока
  ProcThreadPool.DoParallelNested(@FindMaximumParallel,0,BlockCount-1);
  // вычисляем максимум всех блоков
  Result:=BlockMax[0];
  for Index:=1 to BlockCount-1 do
    Result:=Max(Result,BlockMax[Index]);
  FreeMem(BlockMax);
end;

В основном это было копирование и вставка, так что снова это можно было автоматизировать.

Пример: параллельная сортировка

Модуль mtputils содержит функцию ParallelSortFPList, которая использует mtprocs для параллельной сортировки TFPList. Должна быть указана функция сравнения.

procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare; MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);

Эта функция использует параллельный алгоритм MergeSort. Параметр MaxThreadCount передается DoParallel. 0 означает использование системы по умолчанию.

При желании вы можете предоставить свою собственную функцию сортировки (OnSortPart) для сортировки части каждого отдельного потока. Например, вы можете сортировать блоки через QuickSort, которые затем объединяются. Тогда у вас будет параллельный QuickSort. См. TFPList.Sort для примера реализации QuickSort.

Использование вложенных процедур

procedure DoSomething(Value: PtrInt);
var
  p: array[1..2] of Pointer;

  procedure SubProc(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
  begin
    p[Index]:=Pointer(Value); // возможен доступ к локальным переменным и параметрам!
  end;

var
  i: Integer;
begin
  ProcThreadPool.DoParallelNested(@SubProc,1,2);
end;

Это может сохранить много времени на рефакторинг и сделать код параллельно работающих процедур более читаемым.

См. также