class MCollective::Message

container for a message, its headers, agent, collective and other meta data

Constants

VALIDTYPES

Attributes

agent[RW]
collective[RW]
discovered_hosts[RW]
expected_msgid[R]
filter[RW]
headers[RW]
message[R]
msgtime[R]
options[RW]
payload[R]
reply_to[R]
request[R]
requestid[RW]
ttl[RW]
type[R]
validated[R]

Public Class Methods

new(payload, message, options = {}) click to toggle source

payload - the message body without headers etc, just the text message - the original message received from the middleware options - if the body base64 encoded? options - the agent the message is for/from options - the collective its for/from options - the message headers options - an indicator about the type of message, :message, :request, :direct_request or :reply options - if this is a reply this should old the message we are replying to options - for requests, the filter to encode into the message options - the normal client options hash options - the maximum amount of seconds this message can be valid for options - in the case of replies this is the msgid it is expecting in the replies options - specific request id to use else one will be generated

   # File lib/mcollective/message.rb
23 def initialize(payload, message, options = {})
24   options = {:base64 => false,
25              :agent => nil,
26              :headers => {},
27              :type => :message,
28              :request => nil,
29              :filter => Util.empty_filter,
30              :options => {},
31              :ttl => 60,
32              :expected_msgid => nil,
33              :requestid => nil,
34              :collective => nil}.merge(options)
35 
36   @payload = payload
37   @message = message
38   @requestid = options[:requestid]
39   @discovered_hosts = nil
40   @reply_to = nil
41 
42   @type = options[:type]
43   @headers = options[:headers]
44   @base64 = options[:base64]
45   @filter = options[:filter]
46   @expected_msgid = options[:expected_msgid]
47   @options = options[:options]
48 
49   @ttl = @options[:ttl] || Config.instance.ttl
50   @msgtime = 0
51 
52   @validated = false
53 
54   if options[:request]
55     @request = options[:request]
56     @agent = request.agent
57     @collective = request.collective
58     @type = :reply
59   else
60     @agent = options[:agent]
61     @collective = options[:collective]
62   end
63 
64   base64_decode!
65 end

Public Instance Methods

base64?() click to toggle source
    # File lib/mcollective/message.rb
129 def base64?
130   @base64
131 end
base64_decode!() click to toggle source
    # File lib/mcollective/message.rb
115 def base64_decode!
116   return unless @base64
117 
118   @payload = SSL.base64_decode(@payload)
119   @base64 = false
120 end
base64_encode!() click to toggle source
    # File lib/mcollective/message.rb
122 def base64_encode!
123   return if @base64
124 
125   @payload = SSL.base64_encode(@payload)
126   @base64 = true
127 end
create_reqid() click to toggle source
    # File lib/mcollective/message.rb
241 def create_reqid
242   # we gsub out the -s so that the format of the id does not
243   # change from previous versions, these should just be more
244   # unique than previous ones
245   SSL.uuid.gsub("-", "")
246 end
decode!() click to toggle source
    # File lib/mcollective/message.rb
183 def decode!
184   raise "Cannot decode message type #{type}" unless [:request, :reply].include?(type)
185 
186   begin
187     @payload = PluginManager["security_plugin"].decodemsg(self)
188   rescue Exception => e
189     if type == :request
190       # If we're a server receiving a request, reraise
191       raise(e)
192     else
193       # We're in the client, log and carry on as best we can
194 
195       # Note: mc_sender is unverified.  The verified identity is in the
196       # payload we just failed to decode
197       Log.warn("Failed to decode a message from '#{headers["mc_sender"]}': #{e}")
198       return
199     end
200   end
201 
202   if type == :request
203     raise 'callerid in request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(payload[:callerid])
204   end
205 
206   [:collective, :agent, :filter, :requestid, :ttl, :msgtime].each do |prop|
207     instance_variable_set("@#{prop}", payload[prop]) if payload.include?(prop)
208   end
209 end
description() click to toggle source
    # File lib/mcollective/message.rb
133 def description
134   cid = ""
135   cid += payload[:callerid] + "@" if payload.include?(:callerid)
136   cid += payload[:senderid]
137 
138   "#{requestid} for agent '#{agent}' in collective '#{collective}' from #{cid}"
139 end
encode!() click to toggle source
    # File lib/mcollective/message.rb
141 def encode!
142   case type
143     when :reply
144       raise "Cannot encode a reply message if no request has been associated with it" unless request
145       raise 'callerid in original request is not valid, surpressing reply to potentially forged request' unless PluginManager["security_plugin"].valid_callerid?(request.payload[:callerid])
146 
147       @requestid = request.payload[:requestid]
148       @payload = PluginManager["security_plugin"].encodereply(agent, payload, requestid, request.payload[:callerid])
149     when :request, :direct_request
150       validate_compound_filter(@filter["compound"]) unless @filter["compound"].empty?
151 
152       @requestid = create_reqid unless @requestid
153       @payload = PluginManager["security_plugin"].encoderequest(Config.instance.identity, payload, requestid, filter, agent, collective, ttl)
154     else
155       raise "Cannot encode #{type} messages"
156   end
157 end
expected_msgid=(msgid) click to toggle source

in the case of reply messages we are expecting replies to a previously created message. This stores a hint to that previously sent message id and can be used by other classes like the security plugins as a means of optimizing their behavior like by ignoring messages not directed at us.

    # File lib/mcollective/message.rb
110 def expected_msgid=(msgid)
111   raise "Can only store the expected msgid for reply messages" unless @type == :reply
112   @expected_msgid = msgid
113 end
publish() click to toggle source

publish a reply message by creating a target name and sending it

    # File lib/mcollective/message.rb
228 def publish
229   # If we've been specificaly told about hosts that were discovered
230   # use that information to do P2P calls if appropriate else just
231   # send it as is.
232   config = Config.instance
233   if @discovered_hosts && config.direct_addressing && (@discovered_hosts.size <= config.direct_addressing_threshold)
234     self.type = :direct_request
235     Log.debug("Handling #{requestid} as a direct request")
236   end
237 
238   PluginManager['connector_plugin'].publish(self)
239 end
reply_to=(target) click to toggle source

Sets a custom reply-to target for requests. The connector plugin should inspect this when constructing requests and set this header ensuring replies will go to the custom target otherwise the connector should just do what it usually does

    # File lib/mcollective/message.rb
 99 def reply_to=(target)
100   raise "Custom reply targets can only be set on requests" unless [:request, :direct_request].include?(@type)
101 
102   @reply_to = target
103 end
type=(type) click to toggle source

Sets the message type to one of the known types. In the case of :direct_request the list of hosts to communicate with should have been set with discovered_hosts else an exception will be raised. This is for extra security, we never accidentally want to send a direct request without a list of hosts or something weird like that as it might result in a filterless broadcast being sent.

Additionally you simply cannot set :direct_request if direct_addressing was not enabled this is to force a workflow that doesnt not yield in a mistake when someone might assume direct_addressing is enabled when its not.

   # File lib/mcollective/message.rb
76 def type=(type)
77   raise "Unknown message type #{type}" unless VALIDTYPES.include?(type)
78 
79   if type == :direct_request
80     raise "Direct requests is not enabled using the direct_addressing config option" unless Config.instance.direct_addressing
81 
82     unless @discovered_hosts && !@discovered_hosts.empty?
83       raise "Can only set type to :direct_request if discovered_hosts have been set"
84     end
85 
86     # clear out the filter, custom discovery sources might interpret the filters
87     # different than the remote mcollectived and in directed mode really the only
88     # filter that matters is the agent filter
89     @filter = Util.empty_filter
90     @filter["agent"] << @agent
91   end
92 
93   @type = type
94 end
validate() click to toggle source

Perform validation against the message by checking filters and ttl

    # File lib/mcollective/message.rb
212 def validate
213   raise "Can only validate request messages" unless type == :request
214 
215   msg_age = Time.now.utc.to_i - msgtime
216 
217   if msg_age > ttl
218     PluginManager["global_stats"].ttlexpired
219     raise(MsgTTLExpired, "Message #{description} created at #{msgtime} is #{msg_age} seconds old, TTL is #{ttl}. Rejecting message.")
220   end
221 
222   raise(NotTargettedAtUs, "Message #{description} does not pass filters. Ignoring message.") unless PluginManager["security_plugin"].validate_filter?(payload[:filter])
223 
224   @validated = true
225 end
validate_compound_filter(compound_filter) click to toggle source
    # File lib/mcollective/message.rb
159 def validate_compound_filter(compound_filter)
160   compound_filter.each do |filter|
161     filter.each do |statement|
162       if statement["fstatement"]
163         functionname = statement["fstatement"]["name"]
164         pluginname = Data.pluginname(functionname)
165         value = statement["fstatement"]["value"]
166 
167         ddl = DDL.new(pluginname, :data)
168 
169         # parses numbers and booleans entered as strings into proper
170         # types of data so that DDL validation will pass
171         statement["fstatement"]["params"] = Data.ddl_transform_input(ddl, statement["fstatement"]["params"])
172 
173         Data.ddl_validate(ddl, statement["fstatement"]["params"])
174 
175         unless value && Data.ddl_has_output?(ddl, value)
176           DDL.validation_fail!(:PLMC41, "Data plugin '%{functionname}()' does not return a '%{value}' value", :error, {:functionname => functionname, :value => value})
177         end
178       end
179     end
180   end
181 end