require 'socket' require 'timeout' require 'gearman/util' module Gearman class Job attr_reader :argref attr_reader :handle def initialize(func, argref, handle, jss) @func = func @handle = handle @argref = argref @jss = jss end def set_status(nu, de) req = Gearman::Util::pack_req_command( "work_status", [@handle, nu, de].join("\0") ) Gearman::Util::send_req(@jss, req) or abort("work_status write failed") return 1 end def arg @argref end end class Worker attr_accessor :debug attr_accessor :prefix def initialize(*args) opts = args.shift || {} @job_servers = [] @js_count = 0 @sock_cache = {}; @last_connect_fail = {} @down_since = {} @can = {} @timeouts = {} @client_id = (1..30).map{ (rand(26) + 97).chr }.join("") @prefix = '' if opts["debug"] debug(opt["debug"]) end if opts["prefix"] prefix(opt["prefix"]) end if(opts["job_servers"]) job_servers(*opts["job_servers"]) end end def _get_js_sock(ipport) warn "getting job server socket: #{ipport}" if debug if ipport.kind_of?(IO) if sock = @sock_cache[ipport] return $sock else abort "Gearman server disappeared in STDIO mode.\n" end end if sock = @sock_cache[ipport] begin if sock.getpeername return sock end rescue warn $! @sock_cache.delete(ipport) end @sock_cache.delete(ipport) end now = Time.now down_since = @down_since[ipport] if down_since warn "job server down since #{down_since}" if debug down_for = now - down_since; retry_period = down_for > 60 ? 30 : ((down_for / 2).to_i + 1) if (@last_connect_fail[ipport] > now - retry_period) return nil end end warn "connecting to '#{ipport}'" if debug; (ip, port) = ipport.split(":") timeout(1){ sock = TCPSocket.open(ip, port) sock.binmode } unless sock @down_since[ipport] = now unless @down_since[ipport].nil? @last_connect_fail[ipport] = now return nil end @last_connect_fail.delete(ipport) @down_since.delete(ipport) # $sock->autoflush(1); # sock.sync = true # setsockopt($sock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die; # print [Socket::IPPROTO_TCP, Socket::TCP_NODELAY, [1].pack("l")] sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, [1].pack("l")) or abort @sock_cache[ipport] = sock unless _on_connect(sock) @sock_cache.delete(ipport) return nil end return sock end # Housekeeping things to do on connection to a server. Method call # with one argument being the 'socket' we're going to take care of. # returns true on success, false on failure. def _on_connect(sock) cid_req = Gearman::Util::pack_req_command("set_client_id", @client_id) return nil unless Gearman::Util::send_req(sock, cid_req) # get this socket's state caught-up @can.each_key{|func| timeout = @timeouts[func] unless _set_ability(sock, func, timeout) return nil end } return 1 end def _set_ability(sock, func, timeout) # $func = join "\t", $self->prefix, $func if $self->prefix; if (!timeout.nil?) req = Gearman::Util::pack_req_command("can_do_timeout", "#{func}\0#{timeout}") else req = Gearman::Util::pack_req_command("can_do", func) end return Gearman::Util::send_req(sock, req) end # tell all the jobservers that this worker can't do anything def reset_abilities req = Gearman::Util::pack_req_command("reset_abilities") @job_servers.each {|js| jss = _get_js_sock(js) or next unless(Gearman::Util::send_req(jss, req)) @sock_cache.delete(js) end } @can = {} @timeouts = {} end # does one job and returns. no return value. def work(*args) opts = args.shift || {} stop_if = opts.delete("stop_if") || Proc.new { nil } complete_cb = opts.delete("on_complete") fail_cb = opts.delete("on_fail") start_cb = opts.delete("on_start") abort "Unknown opts" unless opts.empty? grab_req = Gearman::Util::pack_req_command("grab_job") presleep_req = Gearman::Util::pack_req_command("pre_sleep") fd_map = {} while 1 do js_socks = [] need_sleep = 1 @job_servers.each do |js| jss = _get_js_sock(js) or next # TODO: add an optional sleep in here for the test suite # to test gearmand server going away here. (SIGPIPE on # send_req, etc) this testing has been done manually, at # least. aa = Gearman::Util::send_req(jss, grab_req) bb = Gearman::Util::wait_for_readability(jss, 0.50) unless (aa && bb) @sock_cache.delete(js) next end begin err = "" res = Gearman::Util::read_res_packet(jss, err) # p res unless res warn "read_error #{err}" @sock_cache.delete(js) next end retry if(res[:type] == "noop") end js_socks.push([js, jss]) # p res if res[:type] == "no_job" next end if res[:type] != "job_assign" msg = "Uh, wasn't expecting a #{res[:type]} packet." if res[:type] == "error" msg += " [#{ res[:blobref] }]\n" msg.gsub!("\0", " -- ") end abort msg end need_sleep = false res[:blobref].sub!(/^(.+?)\0(.+?)\0/, "") or abort("Uh, regexp on job_assign failed") (handle, func) = [$1, $2] job = Gearman::Job.new(func, res[:blobref], handle, jss) jobhandle = "#{js}//" + job.handle start_cb.call(jobhandle) if start_cb handler = @can[func] begin ret = handler.call(job) rescue => err warn "Job '#{func}' died: #{err}" if err end if !ret.nil? rv = ret work_req = Gearman::Util::pack_req_command("work_complete", "#{handle}\0#{rv}") complete_cb.call(jobhandle, ret) if complete_cb else work_req = German::Util::pack_req_command("work_fail", handle) fail_cb.call(jobhandle, err) if fail_cb end unless Gearman::Util::send_req(jss, work_req) @sock_cache.delete(js) end end is_idle = false if need_sleep is_idle = true wake_vec = [] js_socks.each do |j| (i_js, i_jss) = j # p [i_js, i_jss, presleep_req] unless Gearman::Util::send_req(i_jss, presleep_req) @sock_cache.delete(i_js) next end wake_vec << i_jss end # chill for some arbitrary time until we're woken up again # p wake_vec nready = select(wake_vec, nil, nil, 10) # p nready is_idle = false if nready end return if stop_if.call(is_idle) end end def register_function(*args, &proc) func = args[0] timeout = args[1] unless(timeout.nil?) req = Gearman::Util::pack_req_command("can_do_timeout", "#{func}\0#{timeout}") @timeouts[func] = timeout else req = Gearman::Util::pack_req_command("can_do", func) end _register_all(req) @can[func] = proc end def _register_all(req) @job_servers.each {|js| jss = _get_js_sock(js) or next unless(Gearman::Util::send_req(jss, req)) @sock_cache.delete(js) end } end def job_servers(*args) # return if ($ENV{GEARMAN_WORKER_USE_STDIO}); # return $self->{job_servers} unless @_; return @job_servers if args.empty? list = args @js_count = list.size list = list.map{|js| js.include?(":") ? js : js + ":7003" } @job_servers = list end end end