module Gearman class Util end class << Gearman::Util @@cmd = { 1 => [ 'I', "can_do" ], # from W: [FUNC] 23 => [ 'I', "can_do_timeout" ], # from W: FUNC[0]TIMEOUT 2 => [ 'I', "cant_do" ], # from W: [FUNC] 3 => [ 'I', "reset_abilities" ], # from W: --- 22 => [ 'I', "set_client_id" ], # W->J: [RANDOM_STRING_NO_WHITESPACE] 4 => [ 'I', "pre_sleep" ], # from W: --- 6 => [ 'O', "noop" ], # J->W --- 7 => [ 'I', "submit_job" ], # C->J FUNC[0]UNIQ[0]ARGS 21 => [ 'I', "submit_job_high" ], # C->J FUNC[0]UNIQ[0]ARGS 18 => [ 'I', "submit_job_bg" ], # C->J " " " " " 8 => [ 'O', "job_created" ], # J->C HANDLE 9 => [ 'I', "grab_job" ], # W->J -- 10 => [ 'O', "no_job" ], # J->W -- 11 => [ 'O', "job_assign" ], # J->W HANDLE[0]FUNC[0]ARG 12 => [ 'IO', "work_status" ], # W->J/C: HANDLE[0]NUMERATOR[0]DENOMINATOR 13 => [ 'IO', "work_complete" ], # W->J/C: HANDLE[0]RES 14 => [ 'IO', "work_fail" ], # W->J/C: HANDLE 15 => [ 'I', "get_status" ], # C->J: HANDLE 20 => [ 'O', "status_res" ], # C->J: HANDLE[0]KNOWN[0]RUNNING[0]NUM[0]DENOM 16 => [ 'I', "echo_req" ], # ?->J TEXT 17 => [ 'O', "echo_res" ], # J->? TEXT 19 => [ 'O', "error" ], # J->? ERRCODE[0]ERR_TEXT } @@num = {} # name -> num @@cmd.each_pair{|key, value| @@num[value[1]] = key } def cmd_name(num) c = @@cmd[num] c ? c[1] : nil end def pack_req_command(type_arg, arg = "") type = @@num[type_arg] || type_arg len = arg.length "\0REQ" + [type, len].pack("NN") + arg end def pack_res_command(type_arg, arg = "") type = @@num[type_arg] || type_arg.to_i len = arg.length "\0RES" + [type, len].pack("NN") + arg end def read_res_packet(sock, err_ref) err = Proc.new {|code| if err_ref err_ref.replace(code) end return nil } buf = "" # read the header begin rv = sock.sysread(12, buf) rescue warn $! rv = nil end # p buf return err.call("read_error") if rv.nil? return err.call("eof") unless rv return err.call("malformed_header") unless rv.size == 12 (magic, type, len) = buf.unpack("a4NN") # p [magic, type, len] return err.call("malformed_magic") unless magic == "\0RES" if(len) rv = sock.sysread(len, buf) return err.call("short_body") unless rv.size == len end type = @@cmd[type] return err.call("bogus_command") unless type return err.call("bugus_command_type") unless type[0].include?("O") return { :type => type[1], :len => len, :blobref => buf, } end def send_req(sock, req) return nil unless sock begin rv = sock.syswrite(req) rescue warn $! return nil end # p [rv, req.length] return nil unless rv == req.length return 1 end # given a file descriptor number and a timeout, wait for that descriptor to # become readable; returns 0 or 1 on if it did or not def wait_for_readability(jss, timeout) return nil unless jss && timeout nfound = IO.select([jss], nil, nil, timeout) nfound ? 1 : nil end end end