1 % (c) 2014-2021 Lehrstuhl fuer Softwaretechnik und Programmiersprachen,
2 % Heinrich Heine Universitaet Duesseldorf
3 % This software is licenced under EPL 1.0 (http://www.eclipse.org/org/documents/epl-v10.html)
4 :- module(worker, [start_worker/5, load/1]).
5
6 :- use_module(library(fastrw),[ fast_buf_read/2,fast_buf_write/3 ]).
7 :- use_module(library(lists)).
8
9 :- use_module(library(process)).
10
11 :- use_module(probsrc(module_information)).
12 :- use_module(probsrc(error_manager),[add_error/3]).
13 :- use_module(extension('zmq/zmq'), [init_zmq/0,
14 setup_worker/5, work_reactor/1, teardown_worker/0,
15 new_results_message/1, start_timer/0, stop_timer/0,
16 msg_put_str/2, put_succ/4, add_stats/1, send_and_destroy_msg/1]).
17
18 :- module_info(group,zmq).
19 :- module_info(description,'This is the interface to C code for distributed model checking.').
20
21
22 % The worker gets some initialisation code, e.g., a B model and the instruction what to check.
23 load(Init) :-
24 fast_buf_read(Initialisation,Init),
25 callback_load(Module,Pred),
26 if(call(Module:Pred,Initialisation),true,
27 (add_error(zmq_worker,'Loading spec failed: ',Module:Pred),fail)).
28
29
30 :-dynamic assertion_state/1.
31 :-dynamic assertion_task/2.
32
33
34 % The worker gets a Workpackage. It must produce a list of computation results
35 % (which are being forwarded to the master) and a list of successor Workpackages.
36 % The successor workpackages are sent to this predicate at a later point in time.
37 % It is not guaranteed that it will be sent to this process, so they must contain all
38 % required information to process the workpackage.
39 % It can be assumed that all processes that might receive this workpackage executed
40 % the same initialisation code.
41 %process(_,[some_result(4), another_result(x)], [foo(baz), doh(dude)]).
42
43
44 :- use_module(library(process)).
45 :- use_module(probsrc(translate)).
46 :- use_module(probsrc(specfile),[state_corresponds_to_initialised_b_machine/2]).
47 :- public process/3.
48 :- use_module(probsrc(b_interpreter),[b_test_boolean_expression_for_ground_state/5]).
49
50
51 process(assertion,result([], []), S) :- findall(check(N),assertion_task(N,_),S).
52 process(check(N),R,[]) :- %process_id(PID),
53 assertion_task(N,A),
54 check_assertion(A,R).
55 process(root,R,S) :- process(state(root),R,S).
56 %process(cbc_test_case(Depth,State,Trace),R,S) :- ....
57 process(state(State),result(ResString, StateAtom),Successors) :- !, % print(treating(State)),nl,flush_output,
58 % statistics(walltime,[T1,_]),
59 % statistics(runtime,[T3,_]),
60 % current_prolog_flag(gc,G),print(gc(G)),nl,
61 specfile:prepare_state_for_specfile_trans(State,root,PreparedState), % will perform unpacking of constants just once, for example
62 % pass root as stateid, we have no reference state id available here (mainly used for operation caching)
63 (debug_mode->count(X),assertz(state(X,State)),inc;true),
64 (state_corresponds_to_initialised_b_machine(PreparedState,S2) ->
65 bmachine:b_get_invariant_from_machine(Invariant), % TO DO: get unproven invariant if this is not the initial state
66 ( b_test_boolean_expression_for_ground_state(Invariant,[],S2,'ParB worker',0)
67 % TO DO: add time_out, specialized invariants, assertions, ...
68 -> ResString = [] %, print(invariant_ok),nl
69 ; ResString = ['invariant_violated'], translate:translate_bstate(State, StateAtom)
70 )
71 % ,print(checked_inv(Res)),nl
72 ; ResString = [] %, print(not_initialised(PreparedState)),nl
73 ),
74
75 findall(state(NewState),
76 compute_successors(PreparedState,_ActionName,NewState),
77 S),
78 (debug_mode->dump_trans(PreparedState);true),
79 % print(successors(S)),nl,
80 %statistics(walltime,[T2,_]),
81 %statistics(runtime,[T4,_]),
82 %TW is T2 - T1,
83 %TR is T4 - T3,
84
85 %(TW > 10 -> ( print(time(TW,TR)),nl,statistics,print(x),nl);true),
86 Successors = S.
87 process(max_reached,Res,[]) :- !,
88 (succeed_max:max_reached -> Res=true ; Res=false).
89 process(X,_,S) :- print(unhandled(X)),nl, S=[].
90
91
92 check_assertion(A,result(R, StateAtom)) :- assertion_state(State),
93 (b_test_boolean_expression_for_ground_state(A,[],State,'ParB worker ASSERTION',0)
94 -> R = [], StateAtom = []
95 ; mk_assertion_violation(A,R),
96 translate:translate_bstate(State, StateAtom)).
97
98 :- use_module(probsrc(bmachine),[b_get_machine_searchscope/1]).
99 :- use_module(probsrc(eclipse_interface),[test_boolean_expression_in_node/2]).
100 :- use_module(probsrc(bsyntaxtree),[create_negation/2]).
101 % check if a state satisfied a SCOPE predicate provided by the user
102 is_not_interesting(State) :- % currently we are always in B or CSP||B mode here; specfile:b_or_z_mode,
103 state_corresponds_to_initialised_b_machine(State,S2),
104 b_get_machine_searchscope(Scope),
105 %format('Testing SCOPE in ~w~n',[State]),
106 preferences:preference(use_scope_predicate,true),
107 create_negation(Scope,NotScope),
108 b_test_boolean_expression_for_ground_state(NotScope,[],S2,'ParB worker (not_interesting check)',0).
109
110
111 mk_assertion_violation(Term, Atom) :-
112 translate:translate_bexpression(Term,Atom).
113
114 compute_successors(CurState,ActionName,NewState) :-
115 specfile:specfile_possible_trans_name(CurState,ActionName),
116 specfile:specfile_trans(CurState,ActionName,_Act,NewState,_TransInfo,_Residue),
117 \+ is_not_interesting(NewState).
118 % Note: removing not_interesting nodes may cause deadlocks
119 % but currently we do not check for deadlocks in parB
120 % TO DO: check Residue, add time-out, deadlock, ...
121
122 % dump transitons for debugging
123 dump_trans(S) :- findall(trans(ActionName,NewState),
124 compute_successors(S,ActionName,NewState), List),
125 dump_trans(List,S).
126 dump_trans([],_).
127 dump_trans([trans(Action,H)|T],SS) :- assertz(trans(Action,SS, H)), dump_trans(T,SS).
128
129 :- dynamic callback_load/2.
130
131 %:- dynamic is_initialised/0.
132 :- dynamic count/1.
133 :- dynamic state/2.
134 :- dynamic trans/3.
135 :- dynamic debug_mode/0.
136 %:- dynamic local_mode/0.
137
138 :- public set_debug/0.
139 set_debug :- assertz(debug_mode).
140 %set_local :- assertz(local_mode).
141
142 count(0).
143 inc :- count(X), retractall(count(_)), X1 is X + 1, assertz(count(X1)).
144
145 %:- dynamic ids/1.
146 %ids(1).
147 %next_id :- ids(X), retractall(ids(_)), X1 is X + 1, assertz(ids(X1)).
148
149
150 :- use_module(probsrc(state_space_exploration_modes),[depth_breadth_first_mode/1]).
151 :- meta_predicate start_worker(+,+,+,+,1).
152 start_worker(Port,ProxyID,Logfile,TmpDir,LoadCallbackModule:LoadCallback) :-
153 % Host is not needed since we connect to a proxy at localhost
154 init_zmq,
155 retractall(callback_load(_,_)), assertz(callback_load(LoadCallbackModule,LoadCallback)),
156 depth_breadth_first_mode(MODE),
157 setup_worker(ProxyID, Port, MODE, Logfile, TmpDir), !,
158 my_reactor_loop,
159 teardown_worker.
160
161 my_reactor_loop :-
162 work_reactor(X), !,
163 (X =\= 0 -> print(work_reactor_retval(X)),nl,flush_output
164 ; my_reactor_loop).
165
166
167 %:- dynamic queue/2.
168 %:- public local_zmq_check/1.
169 %local_zmq_check(N) :-
170 % queue(N, Workpackage),
171 % process(Workpackage, R, S),
172 % add_result(R),
173 % add_successor(S).
174
175 :- public zmq_check/3.
176 zmq_check(WPRead, HashMsg, FD_Logfile) :-
177 %print(yo),nl,flush_output,
178 start_timer,
179 %print(start_timer),nl,flush_output,
180 fast_buf_read(Workpackage, WPRead),
181 %print(processing),nl,flush_output,
182 process(Workpackage, Result, SuccessorList),
183 %print(res(Result)),nl,flush_output,
184 %print(succs(SuccessorList)),nl,flush_output,
185 stop_timer,
186
187 (Result = result(ResAtom, StateAtom)),
188 put_successors(SuccessorList, HashMsg, FD_Logfile),
189 put_results(ResAtom, StateAtom),
190
191 add_stats(HashMsg),
192 send_and_destroy_msg(HashMsg).
193
194 put_results([], _StateAtom). % no results -> don't create a message
195 put_results([H|T], StateAtom) :-
196 new_results_message(Msg),
197 put_results2([H|T], Msg),
198 msg_put_str(Msg, StateAtom),
199 send_and_destroy_msg(Msg).
200 put_results2([], _).
201 put_results2([H|T], Msg) :-
202 msg_put_str(Msg, H),
203 put_results(T, Msg).
204
205 /* TODO: fix symmetry reduction (see below for details) */
206 put_successors([], _Msg, _FD_Logfile).
207 put_successors([H|T], Msg, FD_Logfile) :-
208 fast_buf_write(H, Len, Addr),
209 put_succ(Msg, Addr, Len, FD_Logfile), % TODO: additional information?
210 put_successors(T, Msg, FD_Logfile).
211
212 /* old code that supported symmetry reduction
213 :- use_module(probsrc(symmetry_marker),[compute_marker_for_state/2]).
214 add_successor([]).
215 add_successor([H|T]) :-
216 (debug_mode->inc_transitions; true),
217 % (local_mode->ids(X), next_id, assertz(queue(X, H)) ; X = 0),
218 H = state(S),
219 (preferences:get_preference(symmetry_mode,hash) ->
220 symmetry_marker:compute_marker_for_state(S,M)
221 ; M = H),
222 fast_buf_write(M,Len2,Addr2),
223 hashchar(Addr2, Len2, Sha),
224
225 fast_buf_write(H,Len1,Addr1),
226 put_successor(Addr1,Len1, Sha, []),
227 add_successor(T).
228
229 :- dynamic count_transitions/1.
230 count_transitions(0).
231 inc_transitions :- count_transitions(X), retractall(count_transitions(_)), X1 is X + 1, assertz(count_transitions(X1)).
232 */
233
234 print_substat(Atom) :-
235 statistics(Atom, Val),
236 print(':'), print(Atom), print(' '), print(Val), print(' ').
237
238 :- public print_stats/1.
239 print_stats(WorkerId) :-
240 garbage_collect,
241 print('{'),
242 print(':workerid '), print(WorkerId), print(' '),
243 process_id(PID),
244 print(':PID '), print(PID), print(' '),
245 print_substat(memory_used),
246 print_substat(memory_free),
247 print_substat(choice_used),
248 print_substat(gc_count),
249 print_substat(agc_count),
250 print_substat(global_stack_used),
251 print_substat(local_stack_used),
252 print_substat(trail_used),
253 print_substat(atoms_used),
254 print('}'),
255 nl, flush_output.
256
257 :- use_module(probsrc(tools_printing),[print_dynamic_pred/3]).
258 :- public dump/0.
259 dump :-
260 portray_clause(start(0)), print('.'),nl,
261 (succeed_max:max_reached -> print('/* max_reached */'),nl ; true),
262 portray_clause((prop(S,'='(Var,Val)) :- state(S,L), member(bind(Var,Val),L))), print('.'),nl,
263 print_dynamic_pred(worker,trans,3),
264 print_dynamic_pred(worker,state,2).