Module Distributed

Jan Burse, created Aug 11. 2017

/**
* This module provides meta-predicates to distribute work over multiple
* threads. The simplest meta-predicate horde/[1,2] collects the results
* of the spawned threads and leaves the work distribution to the spawned
* threads itself. The other predicates distribute work items among the
* spawned threads, but do this only on a local scale.
*
* Example:
* ?- balance(X^(between(1,10,X), Y is X*X)).
* Y = 1 ;
* Y = 9 ;
* Etc..
*
* The meta-predicates balance/[2,3] and setup_balance/[3,4] allow work
* distribution of a generate and test. The meta-predicates might change
* the order of the result set. If the meta-predicates are cancelled by
* a cut such as in a surrounding \+/1 or once/1 they will automatically
* cancel each spawned thread.
*
* The meta-predicates balance/[2,3] and setup_balance/[3,4] assume a
* side effect free interaction between the generate and test. The only
* channels are the variables in the intersection of the generate and
* test and the instantiations are copied. Currently the copying
* doesn't support attribute variables.
*
* The setup in the meta-predicates setup_balance/[3,4] is executed once
* per spawned thread. The setup is executed before the test and attribute
* variables can communicate between the setup and the test. The setup
* can for example be used to build a CLP(FD) model and the model will
* be available in the test for labelling.
*
* Warranty & Liability
* To the extent permitted by applicable law and unless explicitly
* otherwise agreed upon, XLOG Technologies GmbH makes no warranties
* regarding the provided information. XLOG Technologies GmbH assumes
* no liability that any problems might be solved with the information
* provided by XLOG Technologies GmbH.
*
* Rights & License
* All industrial property rights regarding the information - copyright
* and patent rights in particular - are the sole property of XLOG
* Technologies GmbH. If the company was not the originator of some
* excerpts, XLOG Technologies GmbH has at least obtained the right to
* reproduce, change and translate the information.
*
* Reproduction is restricted to the whole unaltered document. Reproduction
* of the information is only allowed for non-commercial uses. Selling,
* giving away or letting of the execution of the library is prohibited.
* The library can be distributed as part of your applications and libraries
* for execution provided this comment remains unchanged.
*
* Trademarks
* Jekejeke is a registered trademark of XLOG Technologies GmbH.
*/
:- package(library(jekpro/reference/runtime)).
:- module(distributed, []).
:- use_module(library(advanced/arith)).
:- use_module(library(misc/pipe)).
:- use_module(library(misc/clean)).
/**
* horde(V1^..Vn^T):
* horde(V1^..Vn^T, N):
* The predicate succeeds whenever T succeeds. The predicate
* spawns threads over the available processors running
* copies of T. The binary predicate allows specifying the
* number N of requested threads.
*/
% horde(+Quant)
:- public horde/1.
:- meta_predicate horde(0).
current_prolog_flag(sys_cpu_count, N),
horde(T, N).
% horde(+Quant, +Integer)
:- public horde/2.
:- meta_predicate horde(0,?).
horde(T, N) :-
horde2(J, S, N).
% horde2(+List, +Goal, +Integer)
:- private horde2/3.
:- meta_predicate horde2(?,0,?).
horde2(J, S, N) :-
pipe_new(N, B),
sys_take_all(J, B, N).
/**
* balance(V1^..Vn^(G, T)):
* balance(V1^..Vn^(G, T), N):
* The predicate succeeds whenever G, T succeeds. The predicate
* distributes the work generated by G over the available processors
* running copies of T. The ternary predicate allows specifying the
* number N of requested threads.
*/
% balance(+Quant)
:- public balance/1.
:- meta_predicate balance(0).
current_prolog_flag(sys_cpu_count, N),
balance(P, N).
% balance(+Quant, +Integer)
:- public balance/2.
:- meta_predicate balance(0,?).
balance(P, N) :-
sys_goal_kernel(P, (G,T)),
pipe_new(N, F),
horde2(J, ( sys_take_all(I, F, 1), T), N).
/**
* setup_balance(V1^..Vn^(S, G, T)):
* setup_balance(V1^..Vn^(S, G, T), N):
* The predicate succeeds whenever S, G, T succeeds. The predicate
* distributes the work generated by G over the available processors
* running copies of S, T. The ternary predicate allows specifying the
* number N of requested threads.
*/
% setup_balance(+Quant)
:- public setup_balance/1.
:- meta_predicate setup_balance(0).
current_prolog_flag(sys_cpu_count, N),
% setup_balance(+Quant, +Integer)
:- public setup_balance/2.
:- meta_predicate setup_balance(0,?).
sys_goal_kernel(Q, (S,G,T)),
pipe_new(N, F),
horde2(J, ( S,
sys_take_all(I, F, 1), T), N).
/**********************************************************/
/* Pipe Utilities */
/**********************************************************/
/**
* sys_take_all(T, Q, N):
* The predicate succeeds for all elements T of the queue Q.
*/
% sys_take_all(+Term, +Queue, +Integer)
:- private sys_take_all/3.
sys_take_all(T, Q, N) :-
between(1, N, _),
% sys_take_all2(+Term, +Queue)
:- private sys_take_all2/2.
pipe_take(Q, A),
( A = the(S)
-> S = T
; A = ball(E)
/**
* sys_put_all(T, G, Q, N):
* The predicate succeeds with all elements T in the queue Q that satisfy G.
*/
% sys_put_all(+Term, +Goal, +Queue, +Integer)
:- private sys_put_all/4.
:- meta_predicate sys_put_all(?,0,?,?).
sys_put_all(T, G, Q, N) :-
sys_trap(sys_put_all2(T, G, Q, N),
E,
( E = error(system_error(user_close),_)
; pipe_put(Q, ball(E)))).
% sys_put_all2(+Term, +Goal, +Queue, +Integer)
:- private sys_put_all2/4.
:- meta_predicate sys_put_all2(?,0,?,?).
sys_put_all2(T, G, Q, _) :- G,
pipe_put(Q, the(T)), fail.
sys_put_all2(_, _, Q, N) :-
between(1, N, _),
pipe_put(Q, no), fail.
sys_put_all2(_, _, _, _).
Add 

Comments