Mbed Host Tests
host_test_default.py
Go to the documentation of this file.
1"""
2mbed SDK
3Copyright (c) 2011-2016 ARM Limited
4
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16
17Author: Przemyslaw Wirkus <Przemyslaw.Wirkus@arm.com>
18"""
19
20
21import re
22import sys
23import traceback
24from time import time
25from sre_compile import error
26if (sys.version_info > (3, 0)):
27 from queue import Empty as QueueEmpty # Queue here refers to the module, not a class
28else:
29 from Queue import Empty as QueueEmpty
30
31from mbed_host_tests import BaseHostTest
32from multiprocessing import Process, Queue, Lock
33from mbed_host_tests import host_tests_plugins
34from ..host_tests_registry import HostRegistry
35
36# Host test supervisors
37from mbed_host_tests.host_tests.echo import EchoTest
39from mbed_host_tests.host_tests.hello_auto import HelloTest
40from mbed_host_tests.host_tests.detect_auto import DetectPlatformTest
41from mbed_host_tests.host_tests.wait_us_auto import WaitusTest
42from mbed_host_tests.host_tests.default_auto import DefaultAuto
43from mbed_host_tests.host_tests.dev_null_auto import DevNullTest
44
45from mbed_host_tests.host_tests_logger import HtrunLogger
46from mbed_host_tests.host_tests_conn_proxy import conn_process
47from mbed_host_tests.host_tests_runner.host_test import DefaultTestSelectorBase
48from mbed_host_tests.host_tests_toolbox.host_functional import handle_send_break_cmd
49
50
52 """! Select default host_test supervision (replaced after auto detection) """
53 RESET_TYPE_SW_RST = "software_reset"
54 RESET_TYPE_HW_RST = "hardware_reset"
55
56 def __init__(self, options):
57 """! ctor
58 """
59 self.options = options
60
61 self.logger = HtrunLogger('HTST')
62
64 self.registry.register_host_test("echo", EchoTest())
65 self.registry.register_host_test("default", DefaultAuto())
66 self.registry.register_host_test("rtc_auto", RTCTest())
67 self.registry.register_host_test("hello_auto", HelloTest())
68 self.registry.register_host_test("detect_auto", DetectPlatformTest())
69 self.registry.register_host_test("default_auto", DefaultAuto())
70 self.registry.register_host_test("wait_us_auto", WaitusTest())
71 self.registry.register_host_test("dev_null_auto", DevNullTest())
72
73 # Handle extra command from
74 if options:
75 if options.enum_host_tests:
76 for path in options.enum_host_tests:
77 self.registry.register_from_path(
78 path, verbose=options.verbose
79 )
80
81 if options.list_reg_hts: # --list option
82 print(self.registry.table(options.verbose))
83 sys.exit(0)
84
85 if options.list_plugins: # --plugins option
86 host_tests_plugins.print_plugin_info()
87 sys.exit(0)
88
89 if options.version: # --version
90 import pkg_resources # part of setuptools
91 version = pkg_resources.require("mbed-host-tests")[0].version
92 print(version)
93 sys.exit(0)
94
95 if options.send_break_cmd: # -b with -p PORT (and optional -r RESET_TYPE)
96 handle_send_break_cmd(port=options.port,
97 disk=options.disk,
98 reset_type=options.forced_reset_type,
99 baudrate=options.baud_rate,
100 verbose=options.verbose)
101 sys.exit(0)
102
103 if options.global_resource_mgr or options.fast_model_connection:
104 # If Global/Simulator Resource Mgr is working it will handle reset/flashing workflow
105 # So local plugins are offline
106 self.options.skip_reset = True
107 self.options.skip_flashing = True
108
109 if options.compare_log:
110 with open(options.compare_log, "r") as f:
111 self.compare_log = f.read().splitlines()
112
113 else:
114 self.compare_log = None
115 self.serial_output_file = options.serial_output_file
117 DefaultTestSelectorBase.__init__(self, options)
118
119 def is_host_test_obj_compatible(self, obj_instance):
120 """! Check if host test object loaded is actually host test class
121 derived from 'mbed_host_tests.BaseHostTest()'
122 Additionaly if host test class implements custom ctor it should
123 call BaseHostTest().__Init__()
124 @param obj_instance Instance of host test derived class
125 @return True if obj_instance is derived from mbed_host_tests.BaseHostTest()
126 and BaseHostTest.__init__() was called, else return False
127 """
128 result = False
129 if obj_instance:
130 result = True
131 self.logger.prn_inf("host test class: '%s'"% obj_instance.__class__)
132
133 # Check if host test (obj_instance) is derived from mbed_host_tests.BaseHostTest()
134 if not isinstance(obj_instance, BaseHostTest):
135 # In theory we should always get host test objects inheriting from BaseHostTest()
136 # because loader will only load those.
137 self.logger.prn_err("host test must inherit from mbed_host_tests.BaseHostTest() class")
138 result = False
139
140 # Check if BaseHostTest.__init__() was called when custom host test is created
141 if not obj_instance.base_host_test_inited():
142 self.logger.prn_err("custom host test __init__() must call BaseHostTest.__init__(self)")
143 result = False
144
145 return result
146
147 def run_test(self):
148 """! This function implements key-value protocol state-machine.
149 Handling of all events and connector are handled here.
150 @return Return self.TestResults.RESULT_* enum
151 """
152 result = None
153 timeout_duration = 10 # Default test case timeout
154 coverage_idle_timeout = 10 # Default coverage idle timeout
155 event_queue = Queue() # Events from DUT to host
156 dut_event_queue = Queue() # Events from host to DUT {k;v}
157
158 def callback__notify_prn(key, value, timestamp):
159 """! Handles __norify_prn. Prints all lines in separate log line """
160 for line in value.splitlines():
161 self.logger.prn_inf(line)
162
163 callbacks = {
164 "__notify_prn" : callback__notify_prn
165 }
166
167 # if True we will allow host test to consume all events after test is finished
168 callbacks_consume = True
169 # Flag check if __exit event occurred
170 callbacks__exit = False
171 # Flag check if __exit_event_queue event occurred
172 callbacks__exit_event_queue = False
173 # Handle to dynamically loaded host test object
174 self.test_supervisor = None
175 # Version: greentea-client version from DUT
176 self.client_version = None
177
178 self.logger.prn_inf("starting host test process...")
179
180
181 # Create device info here as it may change after restart.
182 config = {
183 "digest" : "serial",
184 "port" : self.mbed.port,
185 "baudrate" : self.mbed.serial_baud,
186 "program_cycle_s" : self.options.program_cycle_s,
187 "reset_type" : self.options.forced_reset_type,
188 "target_id" : self.options.target_id,
189 "disk" : self.options.disk,
190 "polling_timeout" : self.options.polling_timeout,
191 "forced_reset_timeout" : self.options.forced_reset_timeout,
192 "sync_behavior" : self.options.sync_behavior,
193 "platform_name" : self.options.micro,
194 "image_path" : self.mbed.image_path,
195 "skip_reset": self.options.skip_reset,
196 "tags" : self.options.tag_filters,
197 "sync_timeout": self.options.sync_timeout
198 }
199
200 if self.options.global_resource_mgr:
201 grm_module, grm_host, grm_port = self.options.global_resource_mgr.split(':')
202
203 config.update({
204 "conn_resource" : 'grm',
205 "grm_module" : grm_module,
206 "grm_host" : grm_host,
207 "grm_port" : grm_port,
208 })
209
210 if self.options.fast_model_connection:
211
212 config.update({
213 "conn_resource" : 'fmc',
214 "fm_config" : self.options.fast_model_connection
215 })
216
217 def start_conn_process():
218 # DUT-host communication process
219 args = (event_queue, dut_event_queue, config)
220 p = Process(target=conn_process, args=args)
221 p.deamon = True
222 p.start()
223 return p
224
225 def process_code_coverage(key, value, timestamp):
226 """! Process the found coverage key value and perform an idle
227 loop checking for more timeing out if there is no response from
228 the target within the idle timeout.
229 @param key The key from the first coverage event
230 @param value The value from the first coverage event
231 @param timestamp The timestamp from the first coverage event
232 @return The elapsed time taken by the processing of code coverage,
233 and the (key, value, and timestamp) of the next event
234 """
235 original_start_time = time()
236 start_time = time()
237
238 # Perform callback on first event
239 callbacks[key](key, value, timestamp)
240
241 # Start idle timeout loop looking for other events
242 while (time() - start_time) < coverage_idle_timeout:
243 try:
244 (key, value, timestamp) = event_queue.get(timeout=1)
245 except QueueEmpty:
246 continue
247
248 # If coverage detected use idle loop
249 # Prevent breaking idle loop for __rxd_line (occurs between keys)
250 if key == '__coverage_start' or key == '__rxd_line':
251 start_time = time()
252
253 # Perform callback
254 callbacks[key](key, value, timestamp)
255 continue
256
257 elapsed_time = time() - original_start_time
258 return elapsed_time, (key, value, timestamp)
259
260 p = start_conn_process()
261 conn_process_started = False
262 try:
263 # Wait for the start event. Process start timeout does not apply in
264 # Global resource manager case as it may take a while for resource
265 # to be available.
266 (key, value, timestamp) = event_queue.get(
267 timeout=None if self.options.global_resource_mgr else self.options.process_start_timeout)
268
269 if key == '__conn_process_start':
270 conn_process_started = True
271 else:
272 self.logger.prn_err("First expected event was '__conn_process_start', received '%s' instead"% key)
273
274 except QueueEmpty:
275 self.logger.prn_err("Conn process failed to start in %f sec"% self.options.process_start_timeout)
276
277 if not conn_process_started:
278 p.terminate()
279 return self.RESULT_TIMEOUT
280
281 start_time = time()
282
283 try:
284 consume_preamble_events = True
285
286 while (time() - start_time) < timeout_duration:
287 # Handle default events like timeout, host_test_name, ...
288 try:
289 (key, value, timestamp) = event_queue.get(timeout=1)
290 except QueueEmpty:
291 continue
292
293 # Write serial output to the file if specified in options.
294 if self.serial_output_file:
295 if key == '__rxd_line':
296 with open(self.serial_output_file, "a") as f:
297 f.write("%s\n" % value)
298
299 # In this mode we only check serial output against compare log.
300 if self.compare_log:
301 if key == '__rxd_line':
302 if self.match_log(value):
303 self.logger.prn_inf("Target log matches compare log!")
304 result = True
305 break
306
307 if consume_preamble_events:
308 if key == '__timeout':
309 # Override default timeout for this event queue
310 start_time = time()
311 timeout_duration = int(value) # New timeout
312 self.logger.prn_inf("setting timeout to: %d sec"% int(value))
313 elif key == '__version':
314 self.client_version = value
315 self.logger.prn_inf("DUT greentea-client version: " + self.client_version)
316 elif key == '__host_test_name':
317 # Load dynamically requested host test
318 self.test_supervisor = self.registry.get_host_test(value)
319
320 # Check if host test object loaded is actually host test class
321 # derived from 'mbed_host_tests.BaseHostTest()'
322 # Additionaly if host test class implements custom ctor it should
323 # call BaseHostTest().__Init__()
325 # Pass communication queues and setup() host test
326 self.test_supervisor.setup_communication(event_queue, dut_event_queue, config)
327 try:
328 # After setup() user should already register all callbacks
330 except (TypeError, ValueError):
331 # setup() can throw in normal circumstances TypeError and ValueError
332 self.logger.prn_err("host test setup() failed, reason:")
333 self.logger.prn_inf("==== Traceback start ====")
334 for line in traceback.format_exc().splitlines():
335 print(line)
336 self.logger.prn_inf("==== Traceback end ====")
337 result = self.RESULT_ERROR
338 event_queue.put(('__exit_event_queue', 0, time()))
339
340 self.logger.prn_inf("host test setup() call...")
341 if self.test_supervisor.get_callbacks():
342 callbacks.update(self.test_supervisor.get_callbacks())
343 self.logger.prn_inf("CALLBACKs updated")
344 else:
345 self.logger.prn_wrn("no CALLBACKs specified by host test")
346 self.logger.prn_inf("host test detected: %s"% value)
347 else:
348 self.logger.prn_err("host test not detected: %s"% value)
349 result = self.RESULT_ERROR
350 event_queue.put(('__exit_event_queue', 0, time()))
351
352 consume_preamble_events = False
353 elif key == '__sync':
354 # This is DUT-Host Test handshake event
355 self.logger.prn_inf("sync KV found, uuid=%s, timestamp=%f"% (str(value), timestamp))
356 elif key == '__notify_sync_failed':
357 # This event is sent by conn_process, SYNC failed
358 self.logger.prn_err(value)
359 self.logger.prn_wrn("stopped to consume events due to %s event"% key)
360 callbacks_consume = False
361 result = self.RESULT_SYNC_FAILED
362 event_queue.put(('__exit_event_queue', 0, time()))
363 elif key == '__notify_conn_lost':
364 # This event is sent by conn_process, DUT connection was lost
365 self.logger.prn_err(value)
366 self.logger.prn_wrn("stopped to consume events due to %s event"% key)
367 callbacks_consume = False
368 result = self.RESULT_IO_SERIAL
369 event_queue.put(('__exit_event_queue', 0, time()))
370 elif key == '__exit_event_queue':
371 # This event is sent by the host test indicating no more events expected
372 self.logger.prn_inf("%s received"% (key))
373 callbacks__exit_event_queue = True
374 break
375 elif key.startswith('__'):
376 # Consume other system level events
377 pass
378 else:
379 self.logger.prn_err("orphan event in preamble phase: {{%s;%s}}, timestamp=%f"% (key, str(value), timestamp))
380 else:
381 # If coverage detected switch to idle loop
382 if key == '__coverage_start':
383 self.logger.prn_inf("starting coverage idle timeout loop...")
384 elapsed_time, (key, value, timestamp) = process_code_coverage(key, value, timestamp)
385
386 # Ignore the time taken by the code coverage
387 timeout_duration += elapsed_time
388 self.logger.prn_inf("exiting coverage idle timeout loop (elapsed_time: %.2f" % elapsed_time)
389
390 if key == '__notify_complete':
391 # This event is sent by Host Test, test result is in value
392 # or if value is None, value will be retrieved from HostTest.result() method
393 self.logger.prn_inf("%s(%s)" % (key, str(value)))
394 result = value
395 event_queue.put(('__exit_event_queue', 0, time()))
396 elif key == '__reset':
397 # This event only resets the dut, not the host test
398 dut_event_queue.put(('__reset', True, time()))
399 elif key == '__reset_dut':
400 # Disconnect to avoid connection lost event
401 dut_event_queue.put(('__host_test_finished', True, time()))
402 p.join()
403
404 if value == DefaultTestSelector.RESET_TYPE_SW_RST:
405 self.logger.prn_inf("Performing software reset.")
406 # Just disconnecting and re-connecting comm process will soft reset DUT
407 elif value == DefaultTestSelector.RESET_TYPE_HW_RST:
408 self.logger.prn_inf("Performing hard reset.")
409 # request hardware reset
410 self.mbed.hw_reset()
411 else:
412 self.logger.prn_err("Invalid reset type (%s). Supported types [%s]." %
413 (value, ", ".join([DefaultTestSelector.RESET_TYPE_HW_RST,
414 DefaultTestSelector.RESET_TYPE_SW_RST])))
415 self.logger.prn_inf("Software reset will be performed.")
416
417 # connect to the device
418 p = start_conn_process()
419 elif key == '__notify_conn_lost':
420 # This event is sent by conn_process, DUT connection was lost
421 self.logger.prn_err(value)
422 self.logger.prn_wrn("stopped to consume events due to %s event"% key)
423 callbacks_consume = False
424 result = self.RESULT_IO_SERIAL
425 event_queue.put(('__exit_event_queue', 0, time()))
426 elif key == '__exit':
427 # This event is sent by DUT, test suite exited
428 self.logger.prn_inf("%s(%s)"% (key, str(value)))
429 callbacks__exit = True
430 event_queue.put(('__exit_event_queue', 0, time()))
431 elif key == '__exit_event_queue':
432 # This event is sent by the host test indicating no more events expected
433 self.logger.prn_inf("%s received"% (key))
434 callbacks__exit_event_queue = True
435 break
436 elif key == '__timeout_set':
437 # Dynamic timeout set
438 timeout_duration = int(value) # New timeout
439 self.logger.prn_inf("setting timeout to: %d sec"% int(value))
440 elif key == '__timeout_adjust':
441 # Dynamic timeout adjust
442 timeout_duration = timeout_duration + int(value) # adjust time
443 self.logger.prn_inf("adjusting timeout with %d sec (now %d)" % (int(value), timeout_duration))
444 elif key in callbacks:
445 # Handle callback
446 callbacks[key](key, value, timestamp)
447 else:
448 self.logger.prn_err("orphan event in main phase: {{%s;%s}}, timestamp=%f"% (key, str(value), timestamp))
449 except Exception:
450 self.logger.prn_err("something went wrong in event main loop!")
451 self.logger.prn_inf("==== Traceback start ====")
452 for line in traceback.format_exc().splitlines():
453 print(line)
454 self.logger.prn_inf("==== Traceback end ====")
455 result = self.RESULT_ERROR
456
457 time_duration = time() - start_time
458 self.logger.prn_inf("test suite run finished after %.2f sec..."% time_duration)
459
460 if self.compare_log and result is None:
461 if self.compare_log_idx < len(self.compare_log):
462 self.logger.prn_err("Expected output [%s] not received in log." % self.compare_log[self.compare_log_idx])
463
464 # Force conn_proxy process to return
465 dut_event_queue.put(('__host_test_finished', True, time()))
466 p.join()
467 self.logger.prn_inf("CONN exited with code: %s"% str(p.exitcode))
468
469 # Callbacks...
470 self.logger.prn_inf("No events in queue" if event_queue.empty() else "Some events in queue")
471
472 # If host test was used we will:
473 # 1. Consume all existing events in queue if consume=True
474 # 2. Check result from host test and call teardown()
475
476 # NOTE: with the introduction of the '__exit_event_queue' event, there
477 # should never be left events assuming the DUT has stopped sending data
478 # over the serial data. Leaving this for now to catch anything that slips through.
479
480 if callbacks_consume:
481 # We are consuming all remaining events if requested
482 while not event_queue.empty():
483 try:
484 (key, value, timestamp) = event_queue.get(timeout=1)
485 except QueueEmpty:
486 break
487
488 if key == '__notify_complete':
489 # This event is sent by Host Test, test result is in value
490 # or if value is None, value will be retrieved from HostTest.result() method
491 self.logger.prn_inf("%s(%s)"% (key, str(value)))
492 result = value
493 elif key.startswith('__'):
494 # Consume other system level events
495 pass
496 elif key in callbacks:
497 callbacks[key](key, value, timestamp)
498 else:
499 self.logger.prn_wrn(">>> orphan event: {{%s;%s}}, timestamp=%f"% (key, str(value), timestamp))
500 self.logger.prn_inf("stopped consuming events")
501
502 if result is not None: # We must compare here against None!
503 # Here for example we've received some error code like IOERR_COPY
504 self.logger.prn_inf("host test result() call skipped, received: %s"% str(result))
505 else:
506 if self.test_supervisor:
507 result = self.test_supervisor.result()
508 self.logger.prn_inf("host test result(): %s"% str(result))
509
510 if not callbacks__exit:
511 self.logger.prn_wrn("missing __exit event from DUT")
512
513 if not callbacks__exit_event_queue:
514 self.logger.prn_wrn("missing __exit_event_queue event from host test")
515
516 #if not callbacks__exit_event_queue and not result:
517 if not callbacks__exit_event_queue and result is None:
518 self.logger.prn_err("missing __exit_event_queue event from " + \
519 "host test and no result from host test, timeout...")
520 result = self.RESULT_TIMEOUT
521
522 self.logger.prn_inf("calling blocking teardown()")
523 if self.test_supervisor:
524 self.test_supervisor.teardown()
525 self.logger.prn_inf("teardown() finished")
526
527 return result
528
529 def execute(self):
530 """! Test runner for host test.
531
532 @details This function will start executing test and forward test result via serial port
533 to test suite. This function is sensitive to work-flow flags such as --skip-flashing,
534 --skip-reset etc.
535 First function will flash device with binary, initialize serial port for communication,
536 reset target. On serial port handshake with test case will be performed. It is when host
537 test reads property data from serial port (sent over serial port).
538 At the end of the procedure proper host test (defined in set properties) will be executed
539 and test execution timeout will be measured.
540 """
541 result = self.RESULT_UNDEF
542
543 # hello sting with htrun version, for debug purposes
544 self.logger.prn_inf(self.get_hello_string())
545
546 try:
547 # Copy image to device
548 if self.options.skip_flashing:
549 self.logger.prn_inf("copy image onto target... SKIPPED!")
550 else:
551 self.logger.prn_inf("copy image onto target...")
552 result = self.mbed.copy_image()
553 if not result:
554 result = self.RESULT_IOERR_COPY
555 return self.get_test_result_int(result)
556
557 # Execute test if flashing was successful or skipped
558 test_result = self.run_test()
559
560 if test_result == True:
561 result = self.RESULT_SUCCESS
562 elif test_result == False:
563 result = self.RESULT_FAILURE
564 elif test_result is None:
565 result = self.RESULT_ERROR
566 else:
567 result = test_result
568
569 # This will be captured by Greentea
570 self.logger.prn_inf("{{result;%s}}"% result)
571 return self.get_test_result_int(result)
572
573 except KeyboardInterrupt:
574 return(-3) # Keyboard interrupt
575
576 def match_log(self, line):
577 """
578 Matches lines from compare log with the target serial output. Compare log lines are matched in seq using index
579 self.compare_log_idx. Lines can be strings to be matched as is or regular expressions.
580
581 :param line:
582 :return:
583 """
584 if self.compare_log_idx < len(self.compare_log):
585 regex = self.compare_log[self.compare_log_idx]
586 # Either the line is matched as is or it is checked as a regular expression.
587 try:
588 if regex in line or re.search(regex, line):
589 self.compare_log_idx += 1
590 except error:
591 # May not be a regular expression
592 return False
593 return self.compare_log_idx == len(self.compare_log)
get_test_result_int(self, test_result_str)
Maps test result string to unique integer.
Definition host_test.py:76
setup(self)
Setup and check if configuration for test is correct.
Definition host_test.py:102
Select default host_test supervision (replaced after auto detection)
is_host_test_obj_compatible(self, obj_instance)
Check if host test object loaded is actually host test class derived from 'mbed_host_tests....
run_test(self)
This function implements key-value protocol state-machine.