From 7962e43a6c848e5abf770fc33187b3cdf2d0e4d5 Mon Sep 17 00:00:00 2001 From: Brian Muller Date: Sun, 19 Jan 2014 12:21:16 -0500 Subject: [PATCH] reworked spiders, network now storing closest node w/o value if found --- README.markdown | 4 +- kademlia/crawling.py | 172 ++++++++++++++++++++++++++++++++++++ kademlia/network.py | 114 +++--------------------- kademlia/node.py | 37 +++++++- kademlia/tests/test_node.py | 22 ++--- 5 files changed, 231 insertions(+), 118 deletions(-) create mode 100644 kademlia/crawling.py diff --git a/README.markdown b/README.markdown index ece7d8a..fc8a3d4 100644 --- a/README.markdown +++ b/README.markdown @@ -1,8 +1,10 @@ -# Kademlia Distributed Hash Table in Python +# Python Distributed Hash Table [![Build Status](https://secure.travis-ci.org/bmuller/kademlia.png?branch=master)](https://travis-ci.org/bmuller/kademlia) This library is an asynchronous Python implementation of the [Kademlia distributed hash table](http://en.wikipedia.org/wiki/Kademlia). It uses [Twisted]() to provide asynchronous communication. The nodes communicate using [RPC over UDP](https://github.com/bmuller/rpcudp) to communiate, meaning that it is capable of working behind a [NAT](http://en.wikipedia.org/wiki/NAT). +This library aims to be as close to a reference implementation of the [Kademlia paper](http://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf) as possible. + ## Installation ``` diff --git a/kademlia/crawling.py b/kademlia/crawling.py new file mode 100644 index 0000000..ade427d --- /dev/null +++ b/kademlia/crawling.py @@ -0,0 +1,172 @@ +from collections import Counter + +from kademlia.log import Logger +from kademlia.utils import deferredDict +from kademlia.node import Node, NodeHeap + + +class SpiderCrawl(object): + """ + Crawl the network and look for given 160-bit keys. + """ + def __init__(self, protocol, node, peers, ksize, alpha): + """ + Create a new C{SpiderCrawl}er. + + @param protocol: a C{KademliaProtocol} instance. + @param node: A C{Node} representing the key we're looking for + @param peers: A list of C{Node}s that provide the entry point for the network + @param ksize: The value for k based on the paper + @param alpha: The value for alpha based on the paper + """ + self.protocol = protocol + self.ksize = ksize + self.alpha = alpha + self.node = node + self.nearest = NodeHeap(self.node, self.ksize) + self.lastIDsCrawled = [] + self.log = Logger(system=self) + self.log.info("creating spider with peers: %s" % peers) + self.nearest.push(peers) + + + def _find(self, rpcmethod): + """ + Get either a value or list of nodes. + + @param rpcmethod: The protocol's C{callfindValue} or C{callFindNode}. + + The process: + 1. calls find_* to current ALPHA nearest not already queried nodes, + adding results to current nearest list of k nodes. + 2. current nearest list needs to keep track of who has been queried already + sort by nearest, keep KSIZE + 3. if list is same as last time, next call should be to everyone not + yet queried + 4. repeat, unless nearest list has all been queried, then ur done + """ + self.log.info("crawling with nearest: %s" % str(tuple(self.nearest))) + count = self.alpha + if self.nearest.getIDs() == self.lastIDsCrawled: + self.log.info("last iteration same as current - checking all in list now") + count = len(self.nearest) + self.lastIDsCrawled = self.nearest.getIDs() + + ds = {} + for peer in self.nearest.getUncontacted()[:count]: + ds[peer.id] = rpcmethod(peer, self.node) + self.nearest.markContacted(peer) + return deferredDict(ds).addCallback(self._nodesFound) + + +class ValueSpiderCrawl(SpiderCrawl): + def __init__(self, protocol, node, peers, ksize, alpha): + SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha) + # keep track of the single nearest node without value - per + # section 2.3 so we can set the key there if found + self.nearestWithoutValue = NodeHeap(self.node, 1) + + def find(self): + """ + Find either the closest nodes or the value requested. + """ + return self._find(self.protocol.callFindValue) + + def _nodesFound(self, responses): + """ + Handle the result of an iteration in C{_find}. + """ + toremove = [] + foundValues = [] + for peerid, response in responses.items(): + response = RPCFindResponse(response) + if not response.happened(): + toremove.append(peerid) + elif response.hasValue(): + foundValues.append(response.getValue()) + else: + peer = self.nearest.getById(peerid) + self.nearestWithoutValue.push(peer) + self.nearest.push(response.getNodeList()) + self.nearest.remove(toremove) + + if len(foundValues) > 0: + return self._handleFoundValues(foundValues) + if self.nearest.allBeenContacted(): + # not found! + return None + return self.find() + + def _handleFoundValues(self, values): + """ + We got some values! Exciting. But let's make sure + they're all the same or freak out a little bit. Also, + make sure we tell the nearest node that *didn't* have + the value to store it. + """ + valueCounts = Counter(values) + if len(valueCounts) != 1: + args = (self.node.long_id, str(values)) + self.log.warning("Got multiple values for key %i: %s" % args) + value = valueCounts.most_common(1)[0][0] + + peerToSaveTo = self.nearestWithoutValue.popleft() + if peerToSaveTo is not None: + d = self.protocol.callStore(peerToSaveTo, self.node.id, value) + return d.addCallback(lambda _: value) + return value + + +class NodeSpiderCrawl(SpiderCrawl): + def find(self): + """ + Find the closest nodes. + """ + return self._find(self.protocol.callFindNode) + + def _nodesFound(self, responses): + """ + Handle the result of an iteration in C{_find}. + """ + toremove = [] + for peerid, response in responses.items(): + response = RPCFindResponse(response) + if not response.happened(): + toremove.append(peerid) + else: + self.nearest.push(response.getNodeList()) + self.nearest.remove(toremove) + + if self.nearest.allBeenContacted(): + return list(self.nearest) + return self.find() + + +class RPCFindResponse(object): + def __init__(self, response): + """ + @param response: This will be a tuple of (, ) + where will be a list of tuples if not found or + a dictionary of {'value': v} where v is the value desired + """ + self.response = response + + def happened(self): + """ + Did the other host actually respond? + """ + return self.response[0] + + def hasValue(self): + return isinstance(self.response[1], dict) + + def getValue(self): + return self.response[1]['value'] + + def getNodeList(self): + """ + Get the node list in the response. If there's no value, this should + be set. + """ + nodelist = self.response[1] or [] + return [Node(*nodeple) for nodeple in nodelist] diff --git a/kademlia/network.py b/kademlia/network.py index 51e1484..fd69c32 100644 --- a/kademlia/network.py +++ b/kademlia/network.py @@ -11,101 +11,9 @@ from kademlia.log import Logger from kademlia.protocol import KademliaProtocol from kademlia.utils import deferredDict, digest from kademlia.storage import ForgetfulStorage -from kademlia.node import Node, NodeHeap - - -class SpiderCrawl(object): - """ - Crawl the network and look for given 160-bit keys. - """ - def __init__(self, protocol, node, peers, ksize, alpha): - """ - Create a new C{SpiderCrawl}er. - - @param protocol: a C{KademliaProtocol} instance. - @param node: A C{Node} representing the key we're looking for - @param peers: A list of C{Node}s that provide the entry point for the network - @param ksize: The value for k based on the paper - @param alpha: The value for alpha based on the paper - """ - self.protocol = protocol - self.ksize = ksize - self.alpha = alpha - self.nearest = NodeHeap(self.ksize) - self.node = node - self.lastIDsCrawled = [] - self.log = Logger(system=self) - self.log.info("creating spider with peers: %s" % peers) - for peer in peers: - self.nearest.push(self.node.distanceTo(peer), peer) - - def findNodes(self): - """ - Find the closest nodes. - """ - return self._find(self.protocol.callFindNode) - - def findValue(self): - """ - Find either the closest nodes or the value requested. - """ - def handle(result): - if isinstance(result, dict): - return result['value'] - return None - d = self._find(self.protocol.callFindValue) - return d.addCallback(handle) - - def _find(self, rpcmethod): - """ - Get either a value or list of nodes. - - @param rpcmethod: The protocol's C{callfindValue} or C{callFindNode}. - - The process: - 1. calls find_* to current ALPHA nearest not already queried nodes, - adding results to current nearest list of k nodes. - 2. current nearest list needs to keep track of who has been queried already - sort by nearest, keep KSIZE - 3. if list is same as last time, next call should be to everyone not - yet queried - 4. repeat, unless nearest list has all been queried, then ur done - """ - self.log.info("crawling with nearest: %s" % str(tuple(self.nearest))) - count = self.alpha - if self.nearest.getIDs() == self.lastIDsCrawled: - self.log.info("last iteration same as current - checking all in list now") - count = len(self.nearest) - self.lastIDsCrawled = self.nearest.getIDs() - - ds = {} - for peer in self.nearest.getUncontacted()[:count]: - ds[peer.id] = rpcmethod(peer, self.node) - self.nearest.markContacted(peer) - return deferredDict(ds).addCallback(self._nodesFound) - - def _nodesFound(self, responses): - """ - Handle the result of an iteration in C{_find}. - """ - toremove = [] - for peerid, response in responses.items(): - # response will be a tuple of (, ) - # where will be a list of tuples if not found or - # a dictionary of {'value': v} where v is the value desired - if not response[0]: - toremove.append(peerid) - elif isinstance(response[1], dict): - self.log.debug("found value for %i" % self.node.long_id) - return response[1] - for nodeple in (response[1] or []): - peer = Node(*nodeple) - self.nearest.push(self.node.distanceTo(peer), peer) - self.nearest.remove(toremove) - - if self.nearest.allBeenContacted(): - return list(self.nearest) - return self.findNodes() +from kademlia.node import Node +from kademlia.crawling import ValueSpiderCrawl +from kademlia.crawling import NodeSpiderCrawl class Server(object): @@ -148,8 +56,8 @@ class Server(object): for id in self.protocol.getRefreshIDs(): node = Node(id) nearest = self.protocol.router.findNeighbors(node, self.alpha) - spider = SpiderCrawl(self.protocol, node, nearest) - ds.append(spider.findNodes()) + spider = NodeSpiderCrawl(self.protocol, node, nearest) + ds.append(spider.find()) return defer.gatherResults(ds) def bootstrappableNeighbors(self): @@ -182,8 +90,8 @@ class Server(object): for addr, result in results.items(): if result[0]: nodes.append(Node(result[1], addr[0], addr[1])) - spider = SpiderCrawl(self.protocol, self.node, nodes, self.ksize, self.alpha) - return spider.findNodes() + spider = NodeSpiderCrawl(self.protocol, self.node, nodes, self.ksize, self.alpha) + return spider.find() ds = {} for addr in addrs: @@ -215,8 +123,8 @@ class Server(object): """ node = Node(digest(key)) nearest = self.protocol.router.findNeighbors(node) - spider = SpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) - return spider.findValue() + spider = ValueSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) + return spider.find() def set(self, key, value): """ @@ -232,8 +140,8 @@ class Server(object): node = Node(dkey) nearest = self.protocol.router.findNeighbors(node) - spider = SpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) - return spider.findNodes().addCallback(store) + spider = NodeSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) + return spider.find().addCallback(store) def _anyRespondSuccess(self, responses): """ diff --git a/kademlia/node.py b/kademlia/node.py index 83c7e16..236a840 100644 --- a/kademlia/node.py +++ b/kademlia/node.py @@ -29,7 +29,17 @@ class Node: class NodeHeap(object): - def __init__(self, maxsize): + """ + A heap of nodes ordered by distance to a given node. + """ + def __init__(self, node, maxsize): + """ + Constructor. + + @param node: The node to measure all distnaces from. + @param maxsize: The maximum size that this heap can grow to. + """ + self.node = node self.heap = [] self.contacted = set() self.maxsize = maxsize @@ -51,6 +61,12 @@ class NodeHeap(object): heapq.heappush(nheap, (distance, node)) self.heap = nheap + def getNodeById(self, id): + for _, node in self.heap: + if node.id == id: + return node + return None + def allBeenContacted(self): return len(self.getUncontacted()) == 0 @@ -60,8 +76,23 @@ class NodeHeap(object): def markContacted(self, node): self.contacted.add(node.id) - def push(self, distance, node): - heapq.heappush(self.heap, (distance, node)) + def leftpop(self): + if len(self) > 0: + return heapq.heappop(self.heap)[1] + return None + + def push(self, nodes): + """ + Push nodes onto heap. + + @param nodes: This can be a single item or a C{list}. + """ + if not isinstance(nodes, list): + nodes = [nodes] + + for node in nodes: + distance = self.node.distanceTo(node) + heapq.heappush(self.heap, (distance, node)) def __len__(self): return min(len(self.heap), self.maxsize) diff --git a/kademlia/tests/test_node.py b/kademlia/tests/test_node.py index 58d71b2..b42115a 100644 --- a/kademlia/tests/test_node.py +++ b/kademlia/tests/test_node.py @@ -26,32 +26,32 @@ class NodeTest(unittest.TestCase): class NodeHeapTest(unittest.TestCase): def test_maxSize(self): - n = NodeHeap(3) + n = NodeHeap(mknode(intid=0), 3) self.assertEqual(0, len(n)) for d in range(10): - n.push(d, mknode()) + n.push(mknode(intid=d)) self.assertEqual(3, len(n)) self.assertEqual(3, len(list(n))) def test_iteration(self): - heap = NodeHeap(5) - nodes = [mknode(digest(x)) for x in range(10)] + heap = NodeHeap(mknode(intid=0), 5) + nodes = [mknode(intid=x) for x in range(10)] for index, node in enumerate(nodes): - heap.push(index, node) + heap.push(node) for index, node in enumerate(heap): - self.assertEqual(digest(index), node.id) + self.assertEqual(index, node.long_id) self.assertTrue(index < 5) def test_remove(self): - heap = NodeHeap(5) - nodes = [mknode(digest(x)) for x in range(10)] - for index, node in enumerate(nodes): - heap.push(index, node) + heap = NodeHeap(mknode(intid=0), 5) + nodes = [mknode(intid=x) for x in range(10)] + for node in nodes: + heap.push(node) heap.remove([nodes[0].id, nodes[1].id]) self.assertEqual(len(list(heap)), 5) for index, node in enumerate(heap): - self.assertEqual(digest(index + 2), node.id) + self.assertEqual(index + 2, node.long_id) self.assertTrue(index < 5)