Helfer Swidistributed

Jan Burse, erstellt 15. Sep 2018
/**
* SWI-Prolog compatible module clean.
*
* Warranty & Liability
* To the extent permitted by applicable law and unless explicitly
* otherwise agreed upon, XLOG Technologies GmbH makes no warranties
* regarding the provided information. XLOG Technologies GmbH assumes
* no liability that any problems might be solved with the information
* provided by XLOG Technologies GmbH.
*
* Rights & License
* All industrial property rights regarding the information - copyright
* and patent rights in particular - are the sole property of XLOG
* Technologies GmbH. If the company was not the originator of some
* excerpts, XLOG Technologies GmbH has at least obtained the right to
* reproduce, change and translate the information.
*
* Reproduction is restricted to the whole unaltered document. Reproduction
* of the information is only allowed for non-commercial uses. Selling,
* giving away or letting of the execution of the library is prohibited.
* The library can be distributed as part of your applications and libraries
* for execution provided this comment remains unchanged.
*
* Restrictions
* Only to be distributed with programs that add significant and primary
* functionality to the library. Not to be distributed with additional
* software intended to replace any components of the library.
*
* Trademarks
* Jekejeke is a registered trademark of XLOG Technologies GmbH.
*/
:- module(distrbuted, [balance/3,
setup_balance/4,
horde/2]).
:- use_module(swiclean).
/**
* balance(G, T, N):
* The predicate succeeds whenever G, T succeeds. The predicate
* distributes the work generated by G over N processors
* running copies of T.
*/
% balance(+Goal, +Goal, +Integer)
:- meta_predicate balance(0,0,?).
balance(G, T, N) :-
term_variables(G, I),
pipe_new(N, F),
sys_clean_thread(sys_put_all(I, G, F, N)),
horde((sys_take_all(I, F, 1), T), N).
/**
* setup_balance(S, G, T, N):
* The predicate succeeds whenever S, G, T succeeds. The predicate
* distributes the work generated by G over N processors
* running copies of S, T.
*/
% setup_balance(+Goal, +Goal, +Goal, +Integer)
:- meta_predicate setup_balance(0,0,0,?).
setup_balance(S, G, T, N) :-
term_variables(G, I),
pipe_new(N, F),
sys_clean_thread(sys_put_all(I, G, F, N)),
horde((S, sys_take_all(I, F, 1), T), N).
/**
* horde(T, N):
* The predicate succeeds whenever T succeeds. The predicate
* distributes the work over N processors running
* copies of T.
*/
% horde(+Goal, +Integer)
:- meta_predicate horde(0, ?).
horde(T, N) :-
term_variables(T, J),
pipe_new(N, B),
sys_clean_threads(sys_put_all(J, T, B, 1), N),
sys_take_all(J, B, N).
/**********************************************************/
/* Pipe Utilities */
/**********************************************************/
% sys_take_all(+Term, +Queue, +Integer)
sys_take_all(T, Q, N) :-
between(1, N, _),
sys_take_all2(T, Q).
% sys_take_all2(+Term, +Queue)
sys_take_all2(T, Q) :-
repeat,
pipe_take(Q, A),
(A = the(S) -> S = T;
A = ball(E) -> throw(E);
!, fail).
% sys_put_all(+Term, +Goal, +Queue, +Integer)
:- meta_predicate sys_put_all(?,0,?,?).
sys_put_all(T, G, Q, N) :-
catch(sys_put_all2(T, G, Q, N),
E,
(E = error(system_error(user_close), _)
-> throw(E)
; pipe_put(Q, ball(E)))).
% sys_put_all2(+Term, +Goal, +Queue, +Integer)
:- meta_predicate sys_put_all2(?,0,?,?).
sys_put_all2(T, G, Q, _) :-
G,
pipe_put(Q, the(T)),
fail.
sys_put_all2(_, _, Q, N) :-
between(1, N, _),
pipe_put(Q, no),
fail.
sys_put_all2(_, _, _, _).
/**********************************************************/
/* Pipe API */
/**********************************************************/
% pipe_new(+Integer, -Queue)
pipe_new(N, Q) :-
message_queue_create(Q, [max_size(N)]).
% pipe_take(+Queue -Term)
pipe_take(Q, T) :-
thread_get_message(Q, T).
% pipe_put(+Queue +Term)
pipe_put(Q, T) :-
thread_send_message(Q, T).

Kommentare