reworked spiders, network now storing closest node w/o value if found

This commit is contained in:
Brian Muller 2014-01-19 12:21:16 -05:00
parent ca49cde8f2
commit 7962e43a6c
5 changed files with 231 additions and 118 deletions

View File

@ -1,8 +1,10 @@
# Kademlia Distributed Hash Table in Python # Python Distributed Hash Table
[![Build Status](https://secure.travis-ci.org/bmuller/kademlia.png?branch=master)](https://travis-ci.org/bmuller/kademlia) [![Build Status](https://secure.travis-ci.org/bmuller/kademlia.png?branch=master)](https://travis-ci.org/bmuller/kademlia)
This library is an asynchronous Python implementation of the [Kademlia distributed hash table](http://en.wikipedia.org/wiki/Kademlia). It uses [Twisted]() to provide asynchronous communication. The nodes communicate using [RPC over UDP](https://github.com/bmuller/rpcudp) to communiate, meaning that it is capable of working behind a [NAT](http://en.wikipedia.org/wiki/NAT). This library is an asynchronous Python implementation of the [Kademlia distributed hash table](http://en.wikipedia.org/wiki/Kademlia). It uses [Twisted]() to provide asynchronous communication. The nodes communicate using [RPC over UDP](https://github.com/bmuller/rpcudp) to communiate, meaning that it is capable of working behind a [NAT](http://en.wikipedia.org/wiki/NAT).
This library aims to be as close to a reference implementation of the [Kademlia paper](http://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf) as possible.
## Installation ## Installation
``` ```

172
kademlia/crawling.py Normal file
View File

@ -0,0 +1,172 @@
from collections import Counter
from kademlia.log import Logger
from kademlia.utils import deferredDict
from kademlia.node import Node, NodeHeap
class SpiderCrawl(object):
"""
Crawl the network and look for given 160-bit keys.
"""
def __init__(self, protocol, node, peers, ksize, alpha):
"""
Create a new C{SpiderCrawl}er.
@param protocol: a C{KademliaProtocol} instance.
@param node: A C{Node} representing the key we're looking for
@param peers: A list of C{Node}s that provide the entry point for the network
@param ksize: The value for k based on the paper
@param alpha: The value for alpha based on the paper
"""
self.protocol = protocol
self.ksize = ksize
self.alpha = alpha
self.node = node
self.nearest = NodeHeap(self.node, self.ksize)
self.lastIDsCrawled = []
self.log = Logger(system=self)
self.log.info("creating spider with peers: %s" % peers)
self.nearest.push(peers)
def _find(self, rpcmethod):
"""
Get either a value or list of nodes.
@param rpcmethod: The protocol's C{callfindValue} or C{callFindNode}.
The process:
1. calls find_* to current ALPHA nearest not already queried nodes,
adding results to current nearest list of k nodes.
2. current nearest list needs to keep track of who has been queried already
sort by nearest, keep KSIZE
3. if list is same as last time, next call should be to everyone not
yet queried
4. repeat, unless nearest list has all been queried, then ur done
"""
self.log.info("crawling with nearest: %s" % str(tuple(self.nearest)))
count = self.alpha
if self.nearest.getIDs() == self.lastIDsCrawled:
self.log.info("last iteration same as current - checking all in list now")
count = len(self.nearest)
self.lastIDsCrawled = self.nearest.getIDs()
ds = {}
for peer in self.nearest.getUncontacted()[:count]:
ds[peer.id] = rpcmethod(peer, self.node)
self.nearest.markContacted(peer)
return deferredDict(ds).addCallback(self._nodesFound)
class ValueSpiderCrawl(SpiderCrawl):
def __init__(self, protocol, node, peers, ksize, alpha):
SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha)
# keep track of the single nearest node without value - per
# section 2.3 so we can set the key there if found
self.nearestWithoutValue = NodeHeap(self.node, 1)
def find(self):
"""
Find either the closest nodes or the value requested.
"""
return self._find(self.protocol.callFindValue)
def _nodesFound(self, responses):
"""
Handle the result of an iteration in C{_find}.
"""
toremove = []
foundValues = []
for peerid, response in responses.items():
response = RPCFindResponse(response)
if not response.happened():
toremove.append(peerid)
elif response.hasValue():
foundValues.append(response.getValue())
else:
peer = self.nearest.getById(peerid)
self.nearestWithoutValue.push(peer)
self.nearest.push(response.getNodeList())
self.nearest.remove(toremove)
if len(foundValues) > 0:
return self._handleFoundValues(foundValues)
if self.nearest.allBeenContacted():
# not found!
return None
return self.find()
def _handleFoundValues(self, values):
"""
We got some values! Exciting. But let's make sure
they're all the same or freak out a little bit. Also,
make sure we tell the nearest node that *didn't* have
the value to store it.
"""
valueCounts = Counter(values)
if len(valueCounts) != 1:
args = (self.node.long_id, str(values))
self.log.warning("Got multiple values for key %i: %s" % args)
value = valueCounts.most_common(1)[0][0]
peerToSaveTo = self.nearestWithoutValue.popleft()
if peerToSaveTo is not None:
d = self.protocol.callStore(peerToSaveTo, self.node.id, value)
return d.addCallback(lambda _: value)
return value
class NodeSpiderCrawl(SpiderCrawl):
def find(self):
"""
Find the closest nodes.
"""
return self._find(self.protocol.callFindNode)
def _nodesFound(self, responses):
"""
Handle the result of an iteration in C{_find}.
"""
toremove = []
for peerid, response in responses.items():
response = RPCFindResponse(response)
if not response.happened():
toremove.append(peerid)
else:
self.nearest.push(response.getNodeList())
self.nearest.remove(toremove)
if self.nearest.allBeenContacted():
return list(self.nearest)
return self.find()
class RPCFindResponse(object):
def __init__(self, response):
"""
@param response: This will be a tuple of (<response received>, <value>)
where <value> will be a list of tuples if not found or
a dictionary of {'value': v} where v is the value desired
"""
self.response = response
def happened(self):
"""
Did the other host actually respond?
"""
return self.response[0]
def hasValue(self):
return isinstance(self.response[1], dict)
def getValue(self):
return self.response[1]['value']
def getNodeList(self):
"""
Get the node list in the response. If there's no value, this should
be set.
"""
nodelist = self.response[1] or []
return [Node(*nodeple) for nodeple in nodelist]

View File

@ -11,101 +11,9 @@ from kademlia.log import Logger
from kademlia.protocol import KademliaProtocol from kademlia.protocol import KademliaProtocol
from kademlia.utils import deferredDict, digest from kademlia.utils import deferredDict, digest
from kademlia.storage import ForgetfulStorage from kademlia.storage import ForgetfulStorage
from kademlia.node import Node, NodeHeap from kademlia.node import Node
from kademlia.crawling import ValueSpiderCrawl
from kademlia.crawling import NodeSpiderCrawl
class SpiderCrawl(object):
"""
Crawl the network and look for given 160-bit keys.
"""
def __init__(self, protocol, node, peers, ksize, alpha):
"""
Create a new C{SpiderCrawl}er.
@param protocol: a C{KademliaProtocol} instance.
@param node: A C{Node} representing the key we're looking for
@param peers: A list of C{Node}s that provide the entry point for the network
@param ksize: The value for k based on the paper
@param alpha: The value for alpha based on the paper
"""
self.protocol = protocol
self.ksize = ksize
self.alpha = alpha
self.nearest = NodeHeap(self.ksize)
self.node = node
self.lastIDsCrawled = []
self.log = Logger(system=self)
self.log.info("creating spider with peers: %s" % peers)
for peer in peers:
self.nearest.push(self.node.distanceTo(peer), peer)
def findNodes(self):
"""
Find the closest nodes.
"""
return self._find(self.protocol.callFindNode)
def findValue(self):
"""
Find either the closest nodes or the value requested.
"""
def handle(result):
if isinstance(result, dict):
return result['value']
return None
d = self._find(self.protocol.callFindValue)
return d.addCallback(handle)
def _find(self, rpcmethod):
"""
Get either a value or list of nodes.
@param rpcmethod: The protocol's C{callfindValue} or C{callFindNode}.
The process:
1. calls find_* to current ALPHA nearest not already queried nodes,
adding results to current nearest list of k nodes.
2. current nearest list needs to keep track of who has been queried already
sort by nearest, keep KSIZE
3. if list is same as last time, next call should be to everyone not
yet queried
4. repeat, unless nearest list has all been queried, then ur done
"""
self.log.info("crawling with nearest: %s" % str(tuple(self.nearest)))
count = self.alpha
if self.nearest.getIDs() == self.lastIDsCrawled:
self.log.info("last iteration same as current - checking all in list now")
count = len(self.nearest)
self.lastIDsCrawled = self.nearest.getIDs()
ds = {}
for peer in self.nearest.getUncontacted()[:count]:
ds[peer.id] = rpcmethod(peer, self.node)
self.nearest.markContacted(peer)
return deferredDict(ds).addCallback(self._nodesFound)
def _nodesFound(self, responses):
"""
Handle the result of an iteration in C{_find}.
"""
toremove = []
for peerid, response in responses.items():
# response will be a tuple of (<response received>, <value>)
# where <value> will be a list of tuples if not found or
# a dictionary of {'value': v} where v is the value desired
if not response[0]:
toremove.append(peerid)
elif isinstance(response[1], dict):
self.log.debug("found value for %i" % self.node.long_id)
return response[1]
for nodeple in (response[1] or []):
peer = Node(*nodeple)
self.nearest.push(self.node.distanceTo(peer), peer)
self.nearest.remove(toremove)
if self.nearest.allBeenContacted():
return list(self.nearest)
return self.findNodes()
class Server(object): class Server(object):
@ -148,8 +56,8 @@ class Server(object):
for id in self.protocol.getRefreshIDs(): for id in self.protocol.getRefreshIDs():
node = Node(id) node = Node(id)
nearest = self.protocol.router.findNeighbors(node, self.alpha) nearest = self.protocol.router.findNeighbors(node, self.alpha)
spider = SpiderCrawl(self.protocol, node, nearest) spider = NodeSpiderCrawl(self.protocol, node, nearest)
ds.append(spider.findNodes()) ds.append(spider.find())
return defer.gatherResults(ds) return defer.gatherResults(ds)
def bootstrappableNeighbors(self): def bootstrappableNeighbors(self):
@ -182,8 +90,8 @@ class Server(object):
for addr, result in results.items(): for addr, result in results.items():
if result[0]: if result[0]:
nodes.append(Node(result[1], addr[0], addr[1])) nodes.append(Node(result[1], addr[0], addr[1]))
spider = SpiderCrawl(self.protocol, self.node, nodes, self.ksize, self.alpha) spider = NodeSpiderCrawl(self.protocol, self.node, nodes, self.ksize, self.alpha)
return spider.findNodes() return spider.find()
ds = {} ds = {}
for addr in addrs: for addr in addrs:
@ -215,8 +123,8 @@ class Server(object):
""" """
node = Node(digest(key)) node = Node(digest(key))
nearest = self.protocol.router.findNeighbors(node) nearest = self.protocol.router.findNeighbors(node)
spider = SpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) spider = ValueSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha)
return spider.findValue() return spider.find()
def set(self, key, value): def set(self, key, value):
""" """
@ -232,8 +140,8 @@ class Server(object):
node = Node(dkey) node = Node(dkey)
nearest = self.protocol.router.findNeighbors(node) nearest = self.protocol.router.findNeighbors(node)
spider = SpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) spider = NodeSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha)
return spider.findNodes().addCallback(store) return spider.find().addCallback(store)
def _anyRespondSuccess(self, responses): def _anyRespondSuccess(self, responses):
""" """

View File

@ -29,7 +29,17 @@ class Node:
class NodeHeap(object): class NodeHeap(object):
def __init__(self, maxsize): """
A heap of nodes ordered by distance to a given node.
"""
def __init__(self, node, maxsize):
"""
Constructor.
@param node: The node to measure all distnaces from.
@param maxsize: The maximum size that this heap can grow to.
"""
self.node = node
self.heap = [] self.heap = []
self.contacted = set() self.contacted = set()
self.maxsize = maxsize self.maxsize = maxsize
@ -51,6 +61,12 @@ class NodeHeap(object):
heapq.heappush(nheap, (distance, node)) heapq.heappush(nheap, (distance, node))
self.heap = nheap self.heap = nheap
def getNodeById(self, id):
for _, node in self.heap:
if node.id == id:
return node
return None
def allBeenContacted(self): def allBeenContacted(self):
return len(self.getUncontacted()) == 0 return len(self.getUncontacted()) == 0
@ -60,8 +76,23 @@ class NodeHeap(object):
def markContacted(self, node): def markContacted(self, node):
self.contacted.add(node.id) self.contacted.add(node.id)
def push(self, distance, node): def leftpop(self):
heapq.heappush(self.heap, (distance, node)) if len(self) > 0:
return heapq.heappop(self.heap)[1]
return None
def push(self, nodes):
"""
Push nodes onto heap.
@param nodes: This can be a single item or a C{list}.
"""
if not isinstance(nodes, list):
nodes = [nodes]
for node in nodes:
distance = self.node.distanceTo(node)
heapq.heappush(self.heap, (distance, node))
def __len__(self): def __len__(self):
return min(len(self.heap), self.maxsize) return min(len(self.heap), self.maxsize)

View File

@ -26,32 +26,32 @@ class NodeTest(unittest.TestCase):
class NodeHeapTest(unittest.TestCase): class NodeHeapTest(unittest.TestCase):
def test_maxSize(self): def test_maxSize(self):
n = NodeHeap(3) n = NodeHeap(mknode(intid=0), 3)
self.assertEqual(0, len(n)) self.assertEqual(0, len(n))
for d in range(10): for d in range(10):
n.push(d, mknode()) n.push(mknode(intid=d))
self.assertEqual(3, len(n)) self.assertEqual(3, len(n))
self.assertEqual(3, len(list(n))) self.assertEqual(3, len(list(n)))
def test_iteration(self): def test_iteration(self):
heap = NodeHeap(5) heap = NodeHeap(mknode(intid=0), 5)
nodes = [mknode(digest(x)) for x in range(10)] nodes = [mknode(intid=x) for x in range(10)]
for index, node in enumerate(nodes): for index, node in enumerate(nodes):
heap.push(index, node) heap.push(node)
for index, node in enumerate(heap): for index, node in enumerate(heap):
self.assertEqual(digest(index), node.id) self.assertEqual(index, node.long_id)
self.assertTrue(index < 5) self.assertTrue(index < 5)
def test_remove(self): def test_remove(self):
heap = NodeHeap(5) heap = NodeHeap(mknode(intid=0), 5)
nodes = [mknode(digest(x)) for x in range(10)] nodes = [mknode(intid=x) for x in range(10)]
for index, node in enumerate(nodes): for node in nodes:
heap.push(index, node) heap.push(node)
heap.remove([nodes[0].id, nodes[1].id]) heap.remove([nodes[0].id, nodes[1].id])
self.assertEqual(len(list(heap)), 5) self.assertEqual(len(list(heap)), 5)
for index, node in enumerate(heap): for index, node in enumerate(heap):
self.assertEqual(digest(index + 2), node.id) self.assertEqual(index + 2, node.long_id)
self.assertTrue(index < 5) self.assertTrue(index < 5)