1 % (c) 2014-2017 Lehrstuhl fuer Softwaretechnik und Programmiersprachen,
2 % Heinrich Heine Universitaet Duesseldorf
3 % This software is licenced under EPL 1.0 (http://www.eclipse.org/org/documents/epl-v10.html)
4 :- module(worker, [start_worker/5,load/1]).
5
6 :- use_module(library(fastrw),[ fast_buf_read/2,fast_buf_write/3 ]).
7 :- use_module(library(lists)).
8
9 :- use_module(library(process)).
10
11 :- use_module('../../../src/module_information.pl').
12 :- use_module(probsrc(error_manager),[add_error/3]).
13 :- use_module('../zmq.pl', [init_zmq/0,
14 setup_worker/5, work_reactor/1, teardown_worker/0,
15 new_results_message/1, start_timer/0, stop_timer/0,
16 msg_put_str/2, put_succ/4, add_stats/1, send_and_destroy_msg/1]).
17
18 :- module_info(group,experimental).
19 :- module_info(description,'This is the interface to C code for distributed model checking.').
20 :- module_info(revision,'$Rev: 9609 $').
21 :- module_info(lastchanged,'$LastChangedDate: 2011-11-18 10:43:02 +0100 (Fr, 18 Nov 2011) $').
22
23
24 % The worker gets some initialisation code, e.g., a B model and the instruction what to check.
25 load(Init) :-
26 fast_buf_read(Initialisation,Init),
27 callback_load(Module,Pred),
28 if(call(Module:Pred,Initialisation),true,
29 (add_error(zmq_worker,'Loading spec failed: ',Module:Pred),fail)).
30
31
32 :-dynamic assertion_state/1.
33 :-dynamic assertion_task/2.
34
35
36 % The worker gets a Workpackage. It must produce a list of computation results
37 % (which are being forwarded to the master) and a list of successor Workpackages.
38 % The successor workpackages are sent to this predicate at a later point in time.
39 % It is not guaranteed that it will be sent to this process, so they must contain all
40 % required information to process the workpackage.
41 % It can be assumed that all processes that might receive this workpackage executed
42 % the same initialisation code.
43 %process(_,[some_result(4), another_result(x)], [foo(baz), doh(dude)]).
44
45
46 :- use_module(library(process)).
47 :- use_module(probsrc(translate)).
48 :- public process/3.
49 process(assertion,result([], []), S) :- findall(check(N),assertion_task(N,_),S).
50
51
52 process(check(N),R,[]) :- %process_id(PID),
53 assertion_task(N,A),
54 check_assertion(A,R).
55
56 process(root,R,S) :- process(state(root),R,S).
57 %process(cbc_test_case(Depth,State,Trace),R,S) :- ....
58 process(state(State),result(ResString, StateAtom),Successors) :- !, %print(treating(State)),nl,flush_output,
59 % statistics(walltime,[T1,_]),
60 % statistics(runtime,[T3,_]),
61 % prolog_flag(gc,G),print(gc(G)),nl,
62 specfile:prepare_state_for_specfile_trans(State,PreparedState), % will perform unpacking of constants just once, for example
63 (debug_mode->count(X),assert(state(X,State)),inc;true),
64 (specfile:state_corresponds_to_initialised_b_machine(PreparedState,S2) ->
65 bmachine:b_get_invariant_from_machine(Invariant),
66 ( b_interpreter:b_test_boolean_expression_for_ground_state(Invariant,[],S2)
67 % TO DO: add time_out, specialized invariants, assertions, ...
68 -> ResString = [] %, print(invariant_ok),nl
69 ; ResString = ['invariant_violated'], translate:translate_bstate(State, StateAtom)
70 )
71 % ,print(checked_inv(Res)),nl
72 ; ResString = [] %, print(not_initialised(PreparedState)),nl
73 ),
74
75 findall(state(NewState),
76 compute_successors(PreparedState,_ActionName,NewState),
77 S),
78 (debug_mode->dump_trans(PreparedState);true),
79 % print(successors(S)),nl,
80 %statistics(walltime,[T2,_]),
81 %statistics(runtime,[T4,_]),
82 %TW is T2 - T1,
83 %TR is T4 - T3,
84
85 %(TW > 10 -> ( print(time(TW,TR)),nl,statistics,print(x),nl);true),
86 Successors = S.
87 process(max_reached,Res,[]) :- !,
88 (succeed_max:max_reached -> Res=true ; Res=false).
89 process(X,_,S) :- print(unhandled(X)),nl, S=[].
90
91
92 check_assertion(A,result(R, StateAtom)) :- assertion_state(State),
93 (b_interpreter:b_test_boolean_expression_for_ground_state(A,[],State)
94 -> (R = [], StateAtom = [])
95 ; (mk_assertion_violation(A,R),
96 translate:translate_bstate(State, StateAtom))).
97
98
99 mk_assertion_violation(Term, Atom) :-
100 translate:translate_bexpression(Term,Atom).
101
102 compute_successors(CurState,ActionName,NewState) :-
103 specfile:specfile_possible_trans_name(CurState,ActionName),
104 specfile:specfile_trans(CurState,ActionName,_Act,NewState,_TransInfo,_Residue).
105 % TO DO: check Residue, add time-out, deadlock, ...
106
107 % dump transitons for debugging
108 dump_trans(S) :- findall(trans(ActionName,NewState),
109 compute_successors(S,ActionName,NewState), List),
110 dump_trans(List,S).
111 dump_trans([],_).
112 dump_trans([trans(Action,H)|T],SS) :- assert(trans(Action,SS, H)), dump_trans(T,SS).
113
114 :- dynamic callback_load/2.
115
116 %:- dynamic is_initialised/0.
117 :- dynamic count/1.
118 :- dynamic state/2.
119 :- dynamic trans/3.
120 :- dynamic debug_mode/0.
121 %:- dynamic local_mode/0.
122
123 :- public set_debug/0.
124 set_debug :- assert(debug_mode).
125 %set_local :- assert(local_mode).
126
127 count(0).
128 inc :- count(X), retractall(count(_)), X1 is X + 1, assert(count(X1)).
129
130 %:- dynamic ids/1.
131 %ids(1).
132 %next_id :- ids(X), retractall(ids(_)), X1 is X + 1, assert(ids(X1)).
133
134
135 :- use_module(probsrc(state_space_exploration_modes),[depth_breadth_first_mode/1]).
136 :- meta_predicate start_worker(+,+,+,+,1).
137 start_worker(Port,ProxyID,Logfile,TmpDir,LoadCallbackModule:LoadCallback) :-
138 % Host is not needed since we connect to a proxy at localhost
139 init_zmq,
140 retractall(callback_load(_,_)), assert(callback_load(LoadCallbackModule,LoadCallback)),
141 depth_breadth_first_mode(MODE),
142 (debug:debug_mode(off) -> Level = 20 ;
143 debug:debug_level(Level)),
144 setup_worker(ProxyID, Port, MODE, Logfile, TmpDir), !,
145 my_reactor_loop,
146 teardown_worker.
147
148 my_reactor_loop :-
149 work_reactor(X), !,
150 (X =\= 0 -> print(work_reactor_retval(X)),nl,flush_output
151 ; my_reactor_loop).
152
153
154 %:- dynamic queue/2.
155 %:- public local_zmq_check/1.
156 %local_zmq_check(N) :-
157 % queue(N, Workpackage),
158 % process(Workpackage, R, S),
159 % add_result(R),
160 % add_successor(S).
161
162 :- public zmq_check/3.
163 zmq_check(WPRead, HashMsg, FD_Logfile) :-
164 %print(yo),nl,flush_output,
165 start_timer,
166 %print(start_timer),nl,flush_output,
167 fast_buf_read(Workpackage, WPRead),
168 %print(processing),nl,flush_output,
169 process(Workpackage, Result, SuccessorList),
170 %print(res(Result)),nl,flush_output,
171 %print(succs(SuccessorList)),nl,flush_output,
172 stop_timer,
173
174 (Result = result(ResAtom, StateAtom)),
175 put_successors(SuccessorList, HashMsg, FD_Logfile),
176 put_results(ResAtom, StateAtom),
177
178 add_stats(HashMsg),
179 send_and_destroy_msg(HashMsg).
180
181 put_results([], _StateAtom). % no results -> don't create a message
182 put_results([H|T], StateAtom) :-
183 new_results_message(Msg),
184 put_results2([H|T], Msg),
185 msg_put_str(Msg, StateAtom),
186 send_and_destroy_msg(Msg).
187 put_results2([], _).
188 put_results2([H|T], Msg) :-
189 msg_put_str(Msg, H),
190 put_results(T, Msg).
191
192 /* TODO: fix symmetry reduction (see below for details) */
193 put_successors([], _Msg, _FD_Logfile).
194 put_successors([H|T], Msg, FD_Logfile) :-
195 fast_buf_write(H, Len, Addr),
196 put_succ(Msg, Addr, Len, FD_Logfile), % TODO: additional information?
197 put_successors(T, Msg, FD_Logfile).
198
199 :- use_module(probsrc(symmetry_marker),[compute_marker_for_state/2]).
200 /* old code that supported symmetry reduction
201 add_successor([]).
202 add_successor([H|T]) :-
203 (debug_mode->inc_transitions; true),
204 % (local_mode->ids(X), next_id, assert(queue(X, H)) ; X = 0),
205 H = state(S),
206 (preferences:get_preference(symmetry_mode,hash) ->
207 symmetry_marker:compute_marker_for_state(S,M)
208 ; M = H),
209 fast_buf_write(M,Len2,Addr2),
210 hashchar(Addr2, Len2, Sha),
211
212 fast_buf_write(H,Len1,Addr1),
213 put_successor(Addr1,Len1, Sha, []),
214 add_successor(T).
215
216 :- dynamic count_transitions/1.
217 count_transitions(0).
218 inc_transitions :- count_transitions(X), retractall(count_transitions(_)), X1 is X + 1, assert(count_transitions(X1)).
219 */
220
221 print_substat(Atom) :-
222 statistics(Atom, Val),
223 print(':'), print(Atom), print(' '), print(Val), print(' ').
224
225 :- public print_stats/1.
226 print_stats(WorkerId) :-
227 garbage_collect,
228 print('{'),
229 print(':workerid '), print(WorkerId), print(' '),
230 process_id(PID),
231 print(':PID '), print(PID), print(' '),
232 print_substat(memory_used),
233 print_substat(memory_free),
234 print_substat(choice_used),
235 print_substat(gc_count),
236 print_substat(agc_count),
237 print_substat(global_stack_used),
238 print_substat(local_stack_used),
239 print_substat(trail_used),
240 print_substat(atoms_used),
241 print('}'),
242 nl, flush_output.
243
244 :- use_module(probsrc(tools_printing),[print_dynamic_pred/3]).
245 :- public dump/0.
246 dump :-
247 portray_clause(start(0)), print('.'),nl,
248 (succeed_max:max_reached -> print('/* max_reached */'),nl ; true),
249 portray_clause((prop(S,'='(Var,Val)) :- state(S,L), member(bind(Var,Val),L))), print('.'),nl,
250 print_dynamic_pred(worker,trans,3),
251 print_dynamic_pred(worker,state,2).