currnt code

import itertools
import collections
from contextlib import nested
import threading
from threading import Lock
import datetime
import uuid
from multiprocessing.managers import BaseManager
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler

from mako.template import Template

from veetwo.models import spider
from veetwo.models.spider_tables import url_queue_table

from veetwo.lib.utils.command import log_exceptions

from veetwo.lib.aggregation.spider import get_spider_config_value as c
from veetwo.lib.aggregation.spider.url_queue import (URLQueue,
URLQueueDuplicateEntryError,
URLQueueInvalidMetadataError)
from veetwo.lib.aggregation.spider.docprocessor_helpers import SourcesDatastore
from veetwo.lib.aggregation.spider.docprocessor_queue import DocProcessorQueue
from veetwo.lib.aggregation.spider.processors.dynurl import canonicalize_url

#from veetwo.tools.jobs.command import inject_logging
import logging
L = logging
D = logging.debug
I = logging.info
W = logging.warning
E = logging.error

MAX_SECONDS_BETWEEN_RDOMAIN_UPDATE = datetime.timedelta(seconds=1000)
MAX_SECONDS_BETWEEN_CONTACT = datetime.timedelta(seconds=300)

NUM_RECENTLY_USED_URLS = 200
BUFFER_MULTIPLIER_PER_DOMAIN = 1000

SKIP_REQUESTS_FOR_EMPTY_RDOMAIN = c(‘urlbroker.skip_requests_for_empty_rdomain’, default=100)

URL_BROKER_PORT = 5000
URL_BROKER_AUTHKEY = “doeswhateveraspidercan”

DOCUMENT_STORE_BUCKET_NAME = “com.wize.spider.docstore.production_spider”
PROCESSOR_QUEUE_NAME = “docprocessor_queue_v1_production_spider”

GLOBAL_URL_BROKER = None
WHITELISTED_RESPONSES = [500, 503, 504]

LOG = logging.getLogger(‘aggregation.spider.urlbroker’)

def url_should_be_disabled(url_retriever_status):
return not isinstance(url_retriever_status, int) or (url_retriever_status >= 400 and url_retriever_status www1.xxy.com (for a/b tests, etc.)
#
if final_url == url or not final_url:
self.queue.update(url,
next_fetch=datetime.datetime.utcnow() + datetime.timedelta(days=7),
last_fetch=now,
metadata=meta)
if docid:
self.processor_queue.put(url=url,
docid=docid)

return

final_url, new_metadata = canonicalize_url(self.sources, final_url)
if new_metadata is not None:
meta.update(new_metadata)

meta[‘redirect_to’] = final_url
self.queue.update(url, next_fetch=datetime.datetime.utcnow() + datetime.timedelta(days=7), last_fetch=now, is_disabled=True, metadata=meta)

del meta[‘redirect_to’]
meta[‘original_url’] = url
try:
self.queue.put(final_url, product_id=item.product_id, metadata=meta);
except URLQueueDuplicateEntryError:
I(‘url-redirected-to-existing-url: Redirected from %s to already enqueued url %s’, url, final_url)
except URLQueueInvalidMetadataError:
E(‘error-processing-metadata: url=%s, metadata=%r’, final_url, meta)

def get_domains(self, fetcher_id):
with broker_lock:
return self.rdomains.get(fetcher_id)

def get_fetchers(self, update=False):
with broker_lock:
return self._get_active_fetchers(update=update)

def get_next_urls_for_source(self, rdomain):
if self._rdomain_to_skip_requests[rdomain] == 0:
max_num_fetchers = self.get_num_fetchers(rdomain)
num_urls = max_num_fetchers * BUFFER_MULTIPLIER_PER_DOMAIN
urls = self.queue.next(rdomain, limit=num_urls)
D(‘got-urls-from-queue: rdomain=%s, num_requested=%s, num_recieved=%s’,
rdomain, num_urls, len(urls))
if not urls:
self._rdomain_to_skip_requests[rdomain] = SKIP_REQUESTS_FOR_EMPTY_RDOMAIN
else:
D(‘skipping-rdomain-url-request: rdomain=%s, number skips remaining=%s’,
rdomain, self._rdomain_to_skip_requests[rdomain])
urls = []
self._rdomain_to_skip_requests[rdomain] -= 1
return urls

def get_num_fetchers(self, rdomain):
m = self.sources.get_rdomain_metadata(rdomain)
#@todo: add logic to remove disabled sources from consideration
max_num_fetchers = max(1, m.max_num_fetchers if m else 1)
return max_num_fetchers

def get_outstanding_urls(self):
with broker_lock:
return self.url_to_metadata.keys()

def get_sources(self):
with broker_lock:
return self.rdomains

def run(self):
pass

def _fill_url_buffer(self, rdomains):

with spider:
for rdomain in rdomains:
qurls = self.url_buffer[rdomain]
if not qurls:
qurls.extend(self.get_next_urls_for_source(rdomain))

def _get_active_fetchers(self, update=False):
now = datetime.datetime.utcnow()

zomburl_queueies = [fetcher for fetcher in self.fetchers.values()
if now – fetcher.last_contact > MAX_SECONDS_BETWEEN_CONTACT]

for z in zombies:
W(‘zombie-fetcher-found: fetcher: %s’, self.fetchers[z.guid])
del self.fetchers[z.guid]

if (update or zombies) and self.fetchers:
self._reassign_sources(self.fetchers.values())

return self.fetchers.values()

def _reassign_sources(self, fetchers):

self._update_rdomains_in_queue()

for f in fetchers:
f.domains = []

round_robin = itertools.cycle(fetchers)

for d in self.rdomains:
max_num_fetchers = self.get_num_fetchers(d)

D(‘assign-domain %s: num_fetchers:%d’, d, max_num_fetchers)

for count in xrange(len(fetchers)):
f = round_robin.next()
datetime
if d not in f.domains:
f.domains.append(d)
max_num_fetchers -= 1

if max_num_fetchers MAX_SECONDS_BETWEEN_RDOMAIN_UPDATE:
with spider:
I(‘getting-all-rdomains-from-queue’)
self.rdomains = self.queue.all_rdomains()
D(‘rdomains-received: rdomains=%r’, self.rdomains)

self.last_rdomain_update = datetime.datetime.utcnow()

TEMPLATE_STRING = ”’

print “dude”

td{border-right:1px solid #505050;}

${now}

Fetchers

%for f in fetchers:

%endfor

seq guid last-contact delta domains
${f.seq} ${f.guid} ${now-f.last_contact} ${f.domains}

Open URLs

    %for u in open_urls:

  • ${u}
  • %endfor

Recently Fetched URLs

    %for u in recent_urls:

  1. ${u}
  2. %endfor

Reload Sources

”’

TEMPLATE = Template(TEMPLATE_STRING)

class HandleRequest(BaseHTTPRequestHandler):

def do_GET(self):

if self.path == ‘/reload_sources’:
GLOBAL_URL_BROKER.get_fetchers(update=True)

now = datetime.datetime.utcnow()

self.send_response(200)
self.send_header(‘Content-type’, ‘text/html’)
self.end_headers()
self.wfile.write(TEMPLATE.render(now=now,
open_urls=GLOBAL_URL_BROKER.get_outstanding_urls(),
recent_urls=GLOBAL_URL_BROKER.recently_fetched_urls,
fetchers=GLOBAL_URL_BROKER.get_fetchers()))

def launch_httpd():
I(‘launching-embedded-httpd’)

s = HTTPServer((”, 6666),
HandleRequest)
s.serve_forever()

def launch_broker():
I(‘url-broker-launching’)

global GLOBAL_URL_BROKER

GLOBAL_URL_BROKER = UrlBroker()

I(‘url-broker-setting-up-server’)

manager = BaseManager(address=(”, URL_BROKER_PORT),
authkey=URL_BROKER_AUTHKEY)
manager.register(‘get_broker’, lambda: GLOBAL_URL_BROKER)
server = manager.get_server()

I(‘url-broker-listening’)
server.serve_forever()

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: