class MCollective::Aggregate

Attributes

action[RW]
ddl[RW]
failed[RW]
functions[RW]

Public Class Methods

new(ddl) click to toggle source
   # File lib/mcollective/aggregate.rb
 8 def initialize(ddl)
 9   @functions = []
10   @ddl = ddl
11   @action = ddl[:action]
12   @failed = []
13 
14   create_functions
15 end

Public Instance Methods

call_functions(reply) click to toggle source

Call all the appropriate functions with the reply data received from RPC::Client

   # File lib/mcollective/aggregate.rb
45 def call_functions(reply)
46   @functions.each do |function|
47     Log.debug("Calling aggregate function #{function} for result")
48     begin
49       function.process_result(reply[:data][function.output_name], reply)
50     rescue Exception => e
51       Log.error("Could not process aggregate function for '#{function.output_name}'. #{e.to_s}")
52       @failed << {:name => function.output_name, :type => :process_result}
53       @functions.delete(function)
54     end
55   end
56 end
contains_output?(output) click to toggle source

Check if the function param is defined as an output for the action in the ddl

   # File lib/mcollective/aggregate.rb
40 def contains_output?(output)
41   @ddl[:output].keys.include?(output)
42 end
create_functions() click to toggle source

Creates instances of the Aggregate functions and stores them in the function array. All aggregate call and summarize method calls operate on these function as a batch.

   # File lib/mcollective/aggregate.rb
19 def create_functions
20   @ddl[:aggregate].each_with_index do |agg, i|
21     output = agg[:args][0]
22 
23     if contains_output?(output)
24       arguments = agg[:args][1]
25       format = (arguments.delete(:format) if arguments) || nil
26       begin
27         @functions << load_function(agg[:function]).new(output, arguments, format, @action)
28       rescue Exception => e
29         Log.error("Cannot create aggregate function '#{output}'. #{e.to_s}")
30         @failed << {:name => output, :type => :startup}
31       end
32     else
33       Log.error("Cannot create aggregate function '#{output}'. '#{output}' has not been specified as a valid ddl output.")
34       @failed << {:name => output, :type => :create}
35     end
36   end
37 end
load_function(function_name) click to toggle source

Loads function from disk for use

   # File lib/mcollective/aggregate.rb
76 def load_function(function_name)
77   function_name = function_name.to_s.capitalize
78 
79   PluginManager.loadclass("MCollective::Aggregate::#{function_name}") unless Aggregate.const_defined?(function_name)
80   Aggregate.const_get(function_name)
81 rescue Exception
82   raise "Aggregate function file '#{function_name.downcase}.rb' cannot be loaded"
83 end
summarize() click to toggle source

Finalizes the function returning a result object

   # File lib/mcollective/aggregate.rb
59 def summarize
60   summary = @functions.map do |function|
61     begin
62       function.summarize
63     rescue Exception => e
64       Log.error("Could not summarize aggregate result for '#{function.output_name}'. #{e.to_s}")
65       @failed << {:name => function.output_name, :type => :summarize}
66       nil
67     end
68   end
69 
70   summary.reject{|x| x.nil?}.sort do |x,y|
71     x.result[:output] <=> y.result[:output]
72   end
73 end