Parallel procedures/ru
│
Deutsch (de) │
English (en) │
日本語 (ja) │
русский (ru) │
中文(中国大陆) (zh_CN) │
Общие сведения
На этой странице описывается параллельное выполнение отдельных процедур с помощью модуля MTProcs, что упрощает параллельное выполнение процедур и реализацию параллельных алгоритмов.
Параллельные процедуры и методы часто встречаются в параллельных алгоритмах, и некоторые языки программирования предоставляют встроенную поддержку для них (например, OpenMP в gcc). См. "поддержка OpenMP" для ознакомления с планами добавления таких языковых функций в FPC. Эти возможности, будучи встроенными в язык, могут сэкономить время на ввод кода и позволяют компилятору создавать код с меньшими затратами. С другой стороны, существует множество способов преобразования однопоточного фрагмента кода в параллельный. Однопоточный подход к программированию часто замедляет код. Для получения хороших результатов необходимо указать некоторые параметры, которые компилятор не может определить самостоятельно. Для ознакомления с примерами, посетите следующие ресурсы: / OpenMP и / OpenCL. Если вам нужны параллельные алгоритмы, то MTProcs поможет в их реализации.
Добавление MTProcs
Модуль mtprocs.pas является частью пакета multithreadprocslaz.lpk. Он не зависим от других пакетов и требует лишь наличия FPC >= 2.6.0.
Вы можете найти его исходники на sourceforge:
svn co https://lazarus-ccr.svn.sourceforge.net/svnroot/lazarus-ccr/components/multithreadprocs multithreadprocs
Или в Lazarus components/multithreadprocs.
Как обычно, откройте пакет multithreadprocslaz.lpk в IDE один раз, что бы он узнал путь. Для использования пакета в вашем проекте, выберите: Package / Open recent package / .../multithreadprocslaz.lpk / More / add to project
Простой пример
Ниже приведён пример, который ничего не делает, но демонстрирует, как выглядит параллельная процедура и как она вызывается:
program Test;
{$mode objfpc}{$H+}
uses
{$IFDEF UNIX}
cthreads, cmem,
{$ENDIF}
MTProcs;
// простая параллельная процедура
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
var
i: Integer;
begin
writeln(Index);
for i:=1 to Index*1000000 do ; // делаем некую работу
end;
begin
ProcThreadPool.DoParallel(@DoSomethingParallel,1,5,nil); // адрес, стартовый индекс, конечный индекс, дополнительные данные
end.
На выходе будет что-то вроде этого:
2 3 1 4 5
Тут следует дать несколько коротких заметок. Подробности позже.
- Многопоточность в UNIX требует модуль cthreads как объяснено в учебнике по многопоточным Приложениям.
- Для повышения скорости cmem рекомендуется использовать диспетчер кучи, хотя в этом примере это не имеет значения.
- Параллельная процедура DoSomethingParallel получает некие фиксированные и предопределенные параметры.
- Index определяет, какую часть работы должен сделать этот вызов.
- Data - это указатель, который был дан в ProcThreadPool.DoParallel как четвертый параметр. Это необязательно и вы можете использовать его для чего угодно.
- Item может использоваться для доступа к некоторым более сложным функциям пула потоков.
- ProcThreadPool.DoParallel работает как обычный вызов процедуры. Он завершится, когда он полностью будет выполнен - это означает, что все потоки должны завершить свою работу.
- Выходные данные показывают типичное многопоточное поведение: порядок вызовов не определен. Несколько запусков могут привести к разным порядкам чисел.
Особенности
Выполнение процедуры параллельно означает:
- процедура или метод выполняется с индексом, идущим от произвольного StartIndex к произвольному EndIndex.
- Один или несколько потоков выполняют эти индексы параллельно. Например, если Индекс выполняется от 1 до 10 и в пуле доступно 3 потока, то 3 потока будут одновременно выполнять три разных индекса. Каждый раз, когда поток завершает один вызов (один индекс), он выделяет следующий индекс и выполняет его. Результатом может быть: поток 1 выполняет индекс 3,5,7, поток 2 выполняет 1,6,8,9 и поток 3 работает 2,4,10.
- Количество потоков может меняться во время выполнения, и нет никакой гарантии минимума потоков. В худшем случае весь индекс будет выполняться одним потоком.
- Максимальное количество потоков инициализируется для хорошей работы с учётом текущей системы. Однако, его можно изменить вручную через:
ProcThreadPool.MaxThreadCount := 8;
- Вы можете задать максимальное количество потоков для каждой процедуры.
- Параллельная процедура (или метод) может вызывать рекурсивно параллельные процедуры (или методы).
- Потоки используются повторно, это означает, что они не уничтожаются и не создаются для каждого индекса, но существует глобальный пул потоков. На двухъядерном процессоре будет два потока, выполняющих всю работу-основной поток и один дополнительный поток в пуле.
Накладные расходы, замедление
Накладные расходы сильно зависят от системы (количества и типа ядер, тип общей памяти, скорость критических разделов, размер кэша). Вот некоторые общие советы:
- Каждый блок работы (index) должен занимать не менее нескольких миллисекунд.
- Накладные расходы не зависят от рекурсивных уровней параллельных процедур.
Накладные расходы на многопоточность, которые не зависят от модулей MTProcs, а просто являются результатом современных компьютерных архитектур:
- Как только создается один поток, ваша программа становится многопоточной, и менеджеры памяти должны использовать критические разделы, что замедляет ее работу. Таким образом, даже если вы ничего не сделаете с потоком, ваша программа может стать медленнее.
- Менеджер кучи
cmem
в некоторых системах намного быстрее при многопоточности. В моих тестах, особенно на системах Intel и особенно под OS X, разница в скорости может быть более 10 раз. - Строки и интерфейсы подсчитываются глобально. Для каждого случая доступа нужна критическая секция. Таким образом, обработка строк в нескольких потоках вряд ли увеличит скорость работы программы. Вместо них используйте PChars.
- Каждый фрагмент работы (индекс) должен работать с дизъюнктивной частью памяти, чтобы избежать перекрестных обновлений кеша.
- Не работайте с большим объемом памяти. В некоторых системах одного потока достаточно, чтобы заполнить скорость шины памяти. По достижении максимальной скорости шины памяти любой последующий поток будет замедляться, а не ускоряться.
Установка максимального числа потоков для процедуры
Вы можете указать максимальное число потоков для процедуры в пятом параметре.
begin
ProcThreadPool.DoParallel(@DoSomethingParallel,1,10,nil,2); // адрес, начальный_индекс, конечный_индекс,
// необязательный параметр: данные (в данном случае: nil), необязательный параметр: максимальное число потоков (в данном случае: 2)
end.
Данная опция может быть полезна когда потоки работают с одними и теми же данными, и слишком большое количество потоков будет создавать много конфликтов с кэшем, что в итоге приведет к их замедлению или когда алгоритм использует много WaitForIndex, так что фактически могут работать только несколько потоков. Затем потоки могут использоваться для других задач.
Дождитесь индексации/выполнения по порядку
Иногда результат текущего индекса зависит от результата предыдущего индекса. Например, обычная задача - сначала вычислить блок 5, а затем объединить результат с результатом блока 3. Используйте для этого метод WaitForIndex:
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
begin
... вычисляем номер блока 'Index' ...
if Index=5 then begin
if not Item.WaitForIndex(3) then exit;
... вычисляем ...
end;
end;
WaitForIndex принимает в качестве аргумента индекс, который меньше текущего индекса или диапазона. Если он вернет true
, все работает, как ожидалось. Если он возвращает false
, то в одном из других потоков произошло исключение.
Существует расширенная функция WaitForIndexRange, ожидающая весь диапазон индекса:
if not Item.WaitForIndexRange(3,5) then exit; // ожидаем для значений индекса 3,4 и 5
Исключения
Если в одном из потоков возникает исключение, то другие потоки завершают работу нормально, но не запускают новый индекс. Пул ожидает завершения всех потоков, а затем вызовет исключение. Вот почему вы можете использовать try..except
как всегда:
try
...
ProcThreadPool.DoParallel(...);
...
except
On E: Exception do ...
end;
Если есть несколько исключений, будет вызвано только первое исключение. Чтобы обрабатывать все исключения, добавьте в свой параллельный метод команду try..except
.
Синхронизация
Если вы хотите вызвать функцию в основном потоке, например, чтобы обновить какой-либо элемент графического интерфейса, вы можете использовать метод класса TThread.Synchronize
. Он принимает в качестве аргументов текущий TThread и адрес метода. Начиная с версии 1.2 mtprocs
предоставляет переменную потока CurrentThread
, которая упрощает синхронизацию:
TThread.Synchronize(CurrentThread,@YourMethod);
Это разместит событие в основной очереди событий и дождется, пока основной поток выполнит ваш метод. Имейте в виду, что голая программа fpc не имеет очереди событий. Это есть в программе LCL или fpgui.
Если вы создаете своих собственных потомков TThread
, вы должны установить переменную в своем методе Execute
. Например:
procedure TYourThread.Execute;
begin
CurrentThread:=Self;
...работаем...
end;
Пример: Параллельный цикл
В этом примере шаг за шагом объясняется, как преобразовать цикл в параллельную процедуру. В примере вычисляется максимальное количество целочисленного массива BigArray.
Исходный цикл
type
TArrayOfInteger = array of integer;
function FindMaximum(BigArray: TArrayOfInteger): integer;
var
i: PtrInt;
begin
Result:=BigArray[0];
for i:=1 to length(BigArray)-1 do begin
if Result<BigArray[i] then Result:=BigArray[i];
end;
end;
Разделение работы
Работа должна быть равномерно распределена по n потокам. Для этого BigArray разбивается на блоки одинакового размера, и внешний цикл проходит по каждому блоку. Обычно n - это количество процессоров/ядер в системе. MTProcs
имеет несколько служебных функций для вычисления размера и количества блоков:
function FindMaximum(BigArray: TArrayOfInteger): integer;
var
BlockCount, BlockSize: PtrInt;
i: PtrInt;
Index: PtrInt;
BlockStart, BlockEnd: PtrInt;
begin
Result:=BigArray[0];
ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
for Index:=0 to BlockCount-1 do begin
Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
for i:=BlockStart to BlockEnd do begin
if Result<BigArray[i] then Result:=BigArray[i];
end;
end;
end;
Добавленные строки можно использовать для любого цикла. Со временем можно будет написать инструмент для автоматизации этого.
Теперь работа разбита на более мелкие части. Теперь части должны стать более независимыми.
Локальные и общие переменные
Для каждой используемой переменной в цикле вы должны решить, является ли она общей переменной, используемой всеми потоками, или каждый поток использует свою собственную локальную переменную. Общие переменные BlockCount
и BlockSize
только читаются и не меняются, поэтому для них не требуется никакой работы. Но общая переменная, такая как Result
, будет изменяться всеми потоками. Это может быть достигнуто либо с помощью синхронизации (например, критической секции), которая выполняется медленно, либо каждый поток использует локальную копию, и эти локальные переменные позже объединяются.
Вот решение, заменяющее переменную Result
на массив, который в конце объединяется:
function FindMaximum(BigArray: TArrayOfInteger): integer;
var
// общие переменные
BlockCount, BlockSize: PtrInt;
BlockMax: PPtrInt;
// локальные переменные
i: PtrInt;
Index: PtrInt;
BlockStart, BlockEnd: PtrInt;
begin
ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt)); // выделение памяти для локальных переменных
// вычисляем максимум для каждого блока
for Index:=0 to BlockCount-1 do begin
// вычисляем максимум блока
Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
BlockMax[Index]:=BigArray[BlockStart];
for i:=BlockStart to BlockEnd do begin
if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
end;
end;
// вычисляем максимум всех блоков
// (это лучшее решение, если у вас сотни потоков)
Result:=BlockMax[0];
for Index:=1 to BlockCount-1 do
Result:=Max(Result,BlockMax[Index]);
FreeMem(BlockMax);
end;
Этот подход прост и может быть автоматизирован. Однако для этого процесса потребуются подсказки от программиста.
DoParallel
Последний шаг - переместить внутренний цикл в подпрограмму и заменить цикл вызовом DoParallelNested
.
...
{$ModeSwitch nestedprocvars}
uses mtprocs;
...
function TMainForm.FindMaximum(BigArray: TArrayOfInteger): integer;
var
BlockCount, BlockSize: PtrInt;
BlockMax: PPtrInt;
procedure FindMaximumParallel(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem);
var
i: integer;
BlockStart, BlockEnd: PtrInt;
begin
// вычисляем максимум блоков
Item.CalcBlock(Index,BlockSize,length(BigArray),BlockStart,BlockEnd);
BlockMax[Index]:=BigArray[BlockStart];
for i:=BlockStart to BlockEnd do
if BlockMax[Index]<BigArray[i] then BlockMax[Index]:=BigArray[i];
end;
var
Index: PtrInt;
begin
// разделяем работу на блоки одинакового размера
ProcThreadPool.CalcBlockSize(length(BigArray),BlockCount,BlockSize);
// выделяем память для локальных/потоковых переменных
BlockMax:=AllocMem(BlockCount*SizeOf(PtrInt));
// вычисляем максимум для каждого блока
ProcThreadPool.DoParallelNested(@FindMaximumParallel,0,BlockCount-1);
// вычисляем максимум всех блоков
Result:=BlockMax[0];
for Index:=1 to BlockCount-1 do
Result:=Max(Result,BlockMax[Index]);
FreeMem(BlockMax);
end;
В основном это было копирование и вставка, так что снова это можно было автоматизировать.
Пример: параллельная сортировка
Модуль mtputils
содержит функцию ParallelSortFPList
, которая использует mtprocs
для параллельной сортировки TFPList
. Должна быть указана функция сравнения.
procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare; MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);
Эта функция использует параллельный алгоритм MergeSort
. Параметр MaxThreadCount
передается DoParallel
. 0 означает использование системы по умолчанию.
При желании вы можете предоставить свою собственную функцию сортировки (OnSortPart
) для сортировки части каждого отдельного потока. Например, вы можете сортировать блоки через QuickSort
, которые затем объединяются. Тогда у вас будет параллельный QuickSort. См. TFPList.Sort
для примера реализации QuickSort
.
Использование вложенных процедур
procedure DoSomething(Value: PtrInt);
var
p: array[1..2] of Pointer;
procedure SubProc(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
begin
p[Index]:=Pointer(Value); // возможен доступ к локальным переменным и параметрам!
end;
var
i: Integer;
begin
ProcThreadPool.DoParallelNested(@SubProc,1,2);
end;
Это может сохранить много времени на рефакторинг и сделать код параллельно работающих процедур более читаемым.