commit d29a532abd4fd9811ef32f2047755f732ceb648e Author: Brian Muller Date: Thu Jan 2 21:39:06 2014 -0500 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..989e284 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +apidoc +*.pyc +build +dist +rpcudp.egg-info \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..b82fd50 --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +Copyright (c) 2014 Brian Muller + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..fcd211b --- /dev/null +++ b/Makefile @@ -0,0 +1,11 @@ +PYDOCTOR=pydoctor + +docs: + $(PYDOCTOR) --make-html --html-output apidoc --add-package rpcudp --project-name=rpcudp --project-url=http://github.com/bmuller/rpcudp --html-use-sorttable --html-use-splitlinks --html-shorten-lists + +lint: + pep8 --ignore=E303,E251,E201,E202 ./rpcudp --max-line-length=140 + find ./rpcudp -name '*.py' | xargs pyflakes + +install: + python setup.py install diff --git a/README.markdown b/README.markdown new file mode 100644 index 0000000..515d054 --- /dev/null +++ b/README.markdown @@ -0,0 +1,18 @@ +# [Kademlia](http://en.wikipedia.org/wiki/Kademlia) in Python + +## Installation + +``` +easy_install kademlia +``` + +## Usage +*This assumes you have a working familiarity with Twisted.* + + + +```python +from twisted.internet import reactor + +... +``` diff --git a/kademlia/__init__.py b/kademlia/__init__.py new file mode 100644 index 0000000..fdd6168 --- /dev/null +++ b/kademlia/__init__.py @@ -0,0 +1,2 @@ +version_info = (0, 0) +version = '.'.join(map(str, version_info)) diff --git a/kademlia/network.py b/kademlia/network.py new file mode 100644 index 0000000..1b360c3 --- /dev/null +++ b/kademlia/network.py @@ -0,0 +1,146 @@ +import hashlib +import random +import heapq + +from twisted.internet import log, defer + +from kademlia.protocol import KademliaProtocol +from kademlia.utils import deferredDict +from kademlia.storage import ForgetfulStorage + +ALPHA = 3 + +class NodeHeap(object): + def __init__(self, maxsize): + self.heap = [] + self.contacted = set() + self.maxsize = maxsize + + def remove(self, peerIDs): + """ + Remove a list of peer ids from this heap. Note that while this + heap retains a constant visible size (based on the iterator), it's + actual size may be quite a bit larger than what's exposed. Therefore, + removal of nodes may not change the visible size as previously added + nodes suddenly become visible. + """ + peerIDs = set(peerIDs) + if len(peerIDs) == 0: + return + nheap = [] + for distance, node in self.heap: + if not node.id in peerIDs: + heapq.heappush(nheap, (distance, node)) + self.heap = nheap + + def allBeenContacted(self): + return len(self.getUncontacted()) == 0 + + def getIDs(self): + return [n.id for n in self] + + def markContacted(self, node): + self.contacted.add(node.id) + + def push(self, distance, node): + heapq.heappush(self.heap, (distance, node)) + + def __len__(self): + return min(len(self.heap), self.maxsize) + + def __iter__(self): + return iter(heapq.nsmallest(self.maxsize, self.heap)) + + def getUncontacted(self): + return [n for n in self if not n.id in self.contacted] + + +class SpiderCrawl(object): + # call find_node to current ALPHA nearest not already queried, + # ...adding results to current nearest + # current nearest list needs to keep track of who has been queried already + # sort by nearest, keep KSIZE + # if list is same as last time, next call should be to everyone not + # yet queried + # repeat, unless nearest list has all been queried, then ur done + + def __init__(self, protocol, node, peers): + self.protocol = protocol + self.nearest = NodeHeap(KSIZE) + self.node = node + self.lastIDsCrawled = [] + for peer in peers: + self.nearest.push(self.node.distanceTo(peer), peer) + + + def findNodes(self): + return self.find(self.protocol.callFindNode) + + def findValue(self): + def handle(result): + if isinstance(result, dict): + return result['value'] + return None + d = self.find(self.protocol.callFindValue) + return d.addCallback(handle) + + def find(self, rpcmethod): + count = ALPHA + if self.nearest.getIDs() == self.lastIDsCrawled: + count = len(self.nearest) + self.lastIDsCrawled = self.nearest.getIDs() + + ds = {} + for peer in self.nearest.getUncontacted()[:count]: + ds[peer.id] = rpcmethod(peer, self.node) + self.nearest.markContacted(peer) + return deferredDict(ds).addCallback(self.nodesFound) + + def nodesFound(self, responses): + toremove = [] + for peerid, response in responses.items(): + # response will be a tuple of (, ) + # where will be a list of tuples if not found or + # a dictionary of {'value': v} where v is the value desired + if not response[0]: + toremove.push(peerid) + elif isinstance(response[1], dict): + return response[1] + for nodeple in (response[1] or []): + peer = Node(*nodeple) + self.nearest.push(self.node.distanceTo(peer), peer) + self.nearest.remove(toremove) + + if self.nearest.allBeenContacted(): + return list(self.nearest) + return self.findNodes() + + +class Server: + def __init__(self, port, ksize=20, alpha=3): + # 160 bit random id + rid = hashlib.sha1(str(random.getrandbits(255))).digest() + storage = ForgetfulStorage() + self.node = Node('127.0.0.1', port, rid) + self.prototcol = KademliaProtocol(self.node, storage, ksize, alpha) + + def bootstrap(self, nodes): + nodes = [ Node(*n) for n in nodes ] + spider = NetworkSpider(self.protocol, self.node, nodes) + return spider.findNodes() + + def get(self, key): + node = Node(None, None, key) + nearest = self.router.findNeighbors(node, ALPHA) + spider = NetworkSpider(self.protocol, node, nearest) + return spider.findValue() + + def set(self, key, value): + # TODO - if no one responds, freak out + def store(nodes): + ds = [self.protocol.callStore(node) for node in nodes] + return defer.gatherResults(ds) + node = Node(None, None, key) + nearest = self.router.findNeighbors(node, ALPHA) + spider = NetworkSpider(self.protocol, nearest) + return spider.findNodes(node).addCallback(store) diff --git a/kademlia/node.py b/kademlia/node.py new file mode 100644 index 0000000..3db1f25 --- /dev/null +++ b/kademlia/node.py @@ -0,0 +1,16 @@ +class Node: + def __init__(self, ip, port, id=None): + self.ip = ip + self.port = port + self.id = id + if id is not None: + self.long_id = long(id.encode('hex'), 16) + + def distnaceTo(self, node): + return self.long_id ^ node.long_id + + def __iter__(self): + """ + Enables use of Node as a tuple - i.e., tuple(node) works. + """ + return iter([self.ip, self.port, self.id]) diff --git a/kademlia/protocol.py b/kademlia/protocol.py new file mode 100644 index 0000000..8c8ccf0 --- /dev/null +++ b/kademlia/protocol.py @@ -0,0 +1,67 @@ +from twisted.internet import log + +from rpcudp.protocol import RPCProtocol +from kademlia.node import Node +from kademlia.routing import RoutingTable + + +class KademliaProtocol(RPCProtocol): + def __init__(self, node, storage, ksize, alpha): + RPCProtocol.__init__(self, node.port) + self.router = RoutingTable(self) + self.storage = storage + self.sourceID = node.id + + def rpc_ping(self, sender, nodeid): + source = Node(sender[0], sender[1], nodeid) + self.router.addContact(source) + return "pong" + + def rpc_store(self, sender, nodeid, key, value): + source = Node(sender[0], sender[1], nodeid) + self.router.addContact(source) + self.storage[key] = value + + def rpc_find_node(self, sender, nodeid, key): + source = Node(sender[0], sender[1], nodeid) + self.router.addContact(source) + return map(tuple, self.table.findNeighbors(Node(None, None, key)) + + def rpc_find_value(self, sender, nodeid, key): + source = Node(sender[0], sender[1], nodeid) + self.router.addContact(source) + value = self.storage.get(key, None) + if value is None: + return self.rpc_find_node(sender, nodeid, key) + return { 'value': value } + + def callFindNode(self, nodeToAsk, nodeToFind): + address = (nodeToAsk.ip, nodeToAsk.port) + d = self.find_node(address, self.sourceID, nodeToFind.id) + return d.addCallback(handleCallResponse, nodetoAsk) + + def callFindValue(self, nodeToAsk, nodeToFind): + address = (nodeToAsk.ip, nodeToAsk.port) + d = self.find_value(address, self.sourceID, nodeToFind.id) + return d.addCallback(handleCallResponse, nodetoAsk) + + def callPing(self, nodeToAsk): + address = (nodeToAsk.ip, nodeToAsk.port) + d = self.ping(address, self.sourceID) + return d.addCallback(handleCallResponse, nodetoAsk) + + def callStore(self, nodeToAsk, key, value): + address = (nodeToAsk.ip, nodeToAsk.port) + d = self.store(address, self.sourceID, key, value) + return d.addCallback(handleCallResponse, nodetoAsk) + + def handleCallResponse(self, result): + """ + If we get a response, add the node to the routing table. If + we get no response, make sure it's removed from the routing table. + """ + if result[0]: + self.router.addContact(node) + else: + self.router.removeContact(node) + return result diff --git a/kademlia/routing.py b/kademlia/routing.py new file mode 100644 index 0000000..4cf1667 --- /dev/null +++ b/kademlia/routing.py @@ -0,0 +1,146 @@ +import heapq +from collections import OrderedDict + +from twisted.internet.task import LoopingCall + +class KBucket(object): + def __init__(self, rangeLower, rangeUpper): + self.range = (rangeLower, rangeUpper) + self.nodes = OrderedDict() + self.touchLastUpdated() + + def touchLastUpdated(self): + self.lastUpdated = time.time() + + def getNodes(self): + return self.nodes.values() + + def split(self): + midpoint = self.range[1] - ((self.range[1] - self.range[0]) / 2) + one = KBucket(self.range[0], midpoint) + two = KBucket(midpoint+1, self.range[1]) + for node in self.nodes.values(): + bucket = one if node.long_id <= midpoint else two + bucket.nodes[node.id] = node + return (one, two) + + def removeNode(self, node): + if node.id in self.nodes: + del self.nodes[node.id] + + def hasInRange(self, node): + return rangeLower <= node.long_id <= rangeUpper + + def addNode(self, node): + """ + Add a C{Node} to the C{KBucket}. Return True if successful, + False if the bucket is full. + """ + if node.id in self.nodes: + del self.nodes[node.id] + self.nodes[node.id] = node + elif len(self) < KSIZE: + self.nodes[node.id] = node + else: + return False + return True + + def head(self): + return self.nodes.values()[0] + + def __getitem__(self, id): + return self.nodes.get(id, None) + + def __len__(self): + return len(self.nodes) + + +class TableTraverser(object): + def __init__(self, table, startNode): + index = table.getBucketFor(startNode) + bucket[index].touchLastUpdated() + self.currentNodes = table.buckets[index].getNodes() + self.leftBuckets = table.buckets[:index] + self.rightBuckets = table.buckets[(index+1):] + self.left = True + + def __iter__(self): + return self + + def next(self): + """ + Pop an item from the left subtree, then right, then left, etc. + """ + if len(self.currentNodes) > 0: + return self.currentNodes.pop() + + if self.left and len(self.leftBuckets) > 0: + self.currentNodes = self.leftBuckets.pop().getNodes() + self.left = False + return self.next() + + if len(self.rightBuckets) > 0: + self.currentNodes = self.rightBuckets.pop().getNodes() + self.left = True + return self.next() + + raise StopIteration + + +class RoutingTable(object): + def __init__(self, protocol): + self.protocol = protocol + self.buckets = [KBucket(0, 2**160)] + LoopingCall(self.refresh).start(3600) + + def splitBucket(self, index): + one, two = self.buckets[index].split() + self.buckets[index] = one + self.buckets.insert(index+1, two) + + # todo split one/two if needed based on section 4.2 + + def refresh(self): + ds = [] + for bucket in self.buckets: + if bucket.lastUpdated < (time.time() - 3600): + node = Node(None, None, random.randint(*bucket.range)) + nearest = self.findNeighbors(node, ALPHA) + spider = NetworkSpider(self.protocol, node, nearest) + ds.append(spider.findNodes()) + return defer.gatherResults(ds) + + def removeContact(self, node): + index = self.getBucketFor(self, node) + self.buckets[index].removeNode(node) + + def addContact(self, node): + index = self.getBucketFor(self, node) + bucket = self.buckets[index] + + # this will succeed unless the bucket is full + if bucket.addNode(node): + return + + if bucket.hasInRange(node): + self.splitBucket(index) + self.addContact(node) + else: + self.protocol.callPing(bucket.head()) + + def getBucketFor(self, node): + """ + Get the index of the bucket that the given node would fall into. + """ + for index, bucket in enumerate(self.buckets): + if node.long_id < bucket.range[1]: + return index + + def findNeighbors(self, node, k=KSIZE): + nodes = [] + for neighbor in TableTraverser(self, node): + if neighbor.id != node.id: + heapq.heappush(nodes, (node.distanceFrom(neighbor), neighbor)) + if len(nodes) == k: + break + return heapq.nsmallest(k, nodes) diff --git a/kademlia/storage.py b/kademlia/storage.py new file mode 100644 index 0000000..f155afe --- /dev/null +++ b/kademlia/storage.py @@ -0,0 +1,37 @@ +import time +from collections import OrderedDict + +class ForgetfulStorage(object): + def __init__(self, ttl=7200): + """ + By default, max age is 2 hours + """ + self.data = OrderedDict() + self.ttl = ttl + + def __setitem__(self, key, value): + if key in self.data: + del self.data[key] + self.data[key] = (time.time() + self.ttl, value) + self.cull() + + def cull(self): + pop = 0 + for value in self.data.itervalues(): + if value[0] >= time.time(): + break + pop += 1 + for _ in xrange(pop): + self.data.popitem(first=True) + + def __getitem__(self, key): + self.cull() + return self.data[key][1] + + def __iter__(self): + self.cull() + return iter(self.data) + + def __repr__(self): + self.cull() + return repr(self.data) diff --git a/kademlia/utils.py b/kademlia/utils.py new file mode 100644 index 0000000..5b315b6 --- /dev/null +++ b/kademlia/utils.py @@ -0,0 +1,27 @@ +""" +General catchall for functions that don't make sense as methods. +""" + +from twisted.internet import defer + +def deferredDict(d): + """ + Just like a C{defer.DeferredList} but instead accepts and returns a C{dict}. + + @param d: A C{dict} whose values are all C{Deferred} objects. + + @return: A C{DeferredList} whose callback will be given a dictionary whose + keys are the same as the parameter C{d}'s and whose values are the results + of each individual deferred call. + """ + if len(d) == 0: + return defer.succeed({}) + + def handle(results, names): + rvalue = {} + for index in range(len(results)): + rvalue[names[index]] = results[index][1] + return rvalue + + dl = defer.DeferredList(d.values()) + return dl.addCallback(handle, d.keys()) diff --git a/setup.py b/setup.py new file mode 100755 index 0000000..b780d14 --- /dev/null +++ b/setup.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python +from setuptools import setup, find_packages +from kademlia import version + +setup( + name="kademlia", + version=version, + description="Kademlia is a distributed hash table for decentralized peer-to-peer computer networks.", + author="Brian Muller", + author_email="bamuller@gmail.com", + license="MIT", + url="http://github.com/bmuller/kademlia", + packages=find_packages(), + requires=["twisted", "rpcudp"], + install_requires=['twisted>=12.0', "rpcudp>=0.3"] +)