first commit

This commit is contained in:
Brian Muller 2014-01-02 21:39:06 -05:00
commit d29a532abd
12 changed files with 511 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
apidoc
*.pyc
build
dist
rpcudp.egg-info

20
LICENSE Normal file
View File

@ -0,0 +1,20 @@
Copyright (c) 2014 Brian Muller
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

11
Makefile Normal file
View File

@ -0,0 +1,11 @@
PYDOCTOR=pydoctor
docs:
$(PYDOCTOR) --make-html --html-output apidoc --add-package rpcudp --project-name=rpcudp --project-url=http://github.com/bmuller/rpcudp --html-use-sorttable --html-use-splitlinks --html-shorten-lists
lint:
pep8 --ignore=E303,E251,E201,E202 ./rpcudp --max-line-length=140
find ./rpcudp -name '*.py' | xargs pyflakes
install:
python setup.py install

18
README.markdown Normal file
View File

@ -0,0 +1,18 @@
# [Kademlia](http://en.wikipedia.org/wiki/Kademlia) in Python
## Installation
```
easy_install kademlia
```
## Usage
*This assumes you have a working familiarity with Twisted.*
```python
from twisted.internet import reactor
...
```

2
kademlia/__init__.py Normal file
View File

@ -0,0 +1,2 @@
version_info = (0, 0)
version = '.'.join(map(str, version_info))

146
kademlia/network.py Normal file
View File

@ -0,0 +1,146 @@
import hashlib
import random
import heapq
from twisted.internet import log, defer
from kademlia.protocol import KademliaProtocol
from kademlia.utils import deferredDict
from kademlia.storage import ForgetfulStorage
ALPHA = 3
class NodeHeap(object):
def __init__(self, maxsize):
self.heap = []
self.contacted = set()
self.maxsize = maxsize
def remove(self, peerIDs):
"""
Remove a list of peer ids from this heap. Note that while this
heap retains a constant visible size (based on the iterator), it's
actual size may be quite a bit larger than what's exposed. Therefore,
removal of nodes may not change the visible size as previously added
nodes suddenly become visible.
"""
peerIDs = set(peerIDs)
if len(peerIDs) == 0:
return
nheap = []
for distance, node in self.heap:
if not node.id in peerIDs:
heapq.heappush(nheap, (distance, node))
self.heap = nheap
def allBeenContacted(self):
return len(self.getUncontacted()) == 0
def getIDs(self):
return [n.id for n in self]
def markContacted(self, node):
self.contacted.add(node.id)
def push(self, distance, node):
heapq.heappush(self.heap, (distance, node))
def __len__(self):
return min(len(self.heap), self.maxsize)
def __iter__(self):
return iter(heapq.nsmallest(self.maxsize, self.heap))
def getUncontacted(self):
return [n for n in self if not n.id in self.contacted]
class SpiderCrawl(object):
# call find_node to current ALPHA nearest not already queried,
# ...adding results to current nearest
# current nearest list needs to keep track of who has been queried already
# sort by nearest, keep KSIZE
# if list is same as last time, next call should be to everyone not
# yet queried
# repeat, unless nearest list has all been queried, then ur done
def __init__(self, protocol, node, peers):
self.protocol = protocol
self.nearest = NodeHeap(KSIZE)
self.node = node
self.lastIDsCrawled = []
for peer in peers:
self.nearest.push(self.node.distanceTo(peer), peer)
def findNodes(self):
return self.find(self.protocol.callFindNode)
def findValue(self):
def handle(result):
if isinstance(result, dict):
return result['value']
return None
d = self.find(self.protocol.callFindValue)
return d.addCallback(handle)
def find(self, rpcmethod):
count = ALPHA
if self.nearest.getIDs() == self.lastIDsCrawled:
count = len(self.nearest)
self.lastIDsCrawled = self.nearest.getIDs()
ds = {}
for peer in self.nearest.getUncontacted()[:count]:
ds[peer.id] = rpcmethod(peer, self.node)
self.nearest.markContacted(peer)
return deferredDict(ds).addCallback(self.nodesFound)
def nodesFound(self, responses):
toremove = []
for peerid, response in responses.items():
# response will be a tuple of (<response received>, <value>)
# where <value> will be a list of tuples if not found or
# a dictionary of {'value': v} where v is the value desired
if not response[0]:
toremove.push(peerid)
elif isinstance(response[1], dict):
return response[1]
for nodeple in (response[1] or []):
peer = Node(*nodeple)
self.nearest.push(self.node.distanceTo(peer), peer)
self.nearest.remove(toremove)
if self.nearest.allBeenContacted():
return list(self.nearest)
return self.findNodes()
class Server:
def __init__(self, port, ksize=20, alpha=3):
# 160 bit random id
rid = hashlib.sha1(str(random.getrandbits(255))).digest()
storage = ForgetfulStorage()
self.node = Node('127.0.0.1', port, rid)
self.prototcol = KademliaProtocol(self.node, storage, ksize, alpha)
def bootstrap(self, nodes):
nodes = [ Node(*n) for n in nodes ]
spider = NetworkSpider(self.protocol, self.node, nodes)
return spider.findNodes()
def get(self, key):
node = Node(None, None, key)
nearest = self.router.findNeighbors(node, ALPHA)
spider = NetworkSpider(self.protocol, node, nearest)
return spider.findValue()
def set(self, key, value):
# TODO - if no one responds, freak out
def store(nodes):
ds = [self.protocol.callStore(node) for node in nodes]
return defer.gatherResults(ds)
node = Node(None, None, key)
nearest = self.router.findNeighbors(node, ALPHA)
spider = NetworkSpider(self.protocol, nearest)
return spider.findNodes(node).addCallback(store)

16
kademlia/node.py Normal file
View File

@ -0,0 +1,16 @@
class Node:
def __init__(self, ip, port, id=None):
self.ip = ip
self.port = port
self.id = id
if id is not None:
self.long_id = long(id.encode('hex'), 16)
def distnaceTo(self, node):
return self.long_id ^ node.long_id
def __iter__(self):
"""
Enables use of Node as a tuple - i.e., tuple(node) works.
"""
return iter([self.ip, self.port, self.id])

67
kademlia/protocol.py Normal file
View File

@ -0,0 +1,67 @@
from twisted.internet import log
from rpcudp.protocol import RPCProtocol
from kademlia.node import Node
from kademlia.routing import RoutingTable
class KademliaProtocol(RPCProtocol):
def __init__(self, node, storage, ksize, alpha):
RPCProtocol.__init__(self, node.port)
self.router = RoutingTable(self)
self.storage = storage
self.sourceID = node.id
def rpc_ping(self, sender, nodeid):
source = Node(sender[0], sender[1], nodeid)
self.router.addContact(source)
return "pong"
def rpc_store(self, sender, nodeid, key, value):
source = Node(sender[0], sender[1], nodeid)
self.router.addContact(source)
self.storage[key] = value
def rpc_find_node(self, sender, nodeid, key):
source = Node(sender[0], sender[1], nodeid)
self.router.addContact(source)
return map(tuple, self.table.findNeighbors(Node(None, None, key))
def rpc_find_value(self, sender, nodeid, key):
source = Node(sender[0], sender[1], nodeid)
self.router.addContact(source)
value = self.storage.get(key, None)
if value is None:
return self.rpc_find_node(sender, nodeid, key)
return { 'value': value }
def callFindNode(self, nodeToAsk, nodeToFind):
address = (nodeToAsk.ip, nodeToAsk.port)
d = self.find_node(address, self.sourceID, nodeToFind.id)
return d.addCallback(handleCallResponse, nodetoAsk)
def callFindValue(self, nodeToAsk, nodeToFind):
address = (nodeToAsk.ip, nodeToAsk.port)
d = self.find_value(address, self.sourceID, nodeToFind.id)
return d.addCallback(handleCallResponse, nodetoAsk)
def callPing(self, nodeToAsk):
address = (nodeToAsk.ip, nodeToAsk.port)
d = self.ping(address, self.sourceID)
return d.addCallback(handleCallResponse, nodetoAsk)
def callStore(self, nodeToAsk, key, value):
address = (nodeToAsk.ip, nodeToAsk.port)
d = self.store(address, self.sourceID, key, value)
return d.addCallback(handleCallResponse, nodetoAsk)
def handleCallResponse(self, result):
"""
If we get a response, add the node to the routing table. If
we get no response, make sure it's removed from the routing table.
"""
if result[0]:
self.router.addContact(node)
else:
self.router.removeContact(node)
return result

146
kademlia/routing.py Normal file
View File

@ -0,0 +1,146 @@
import heapq
from collections import OrderedDict
from twisted.internet.task import LoopingCall
class KBucket(object):
def __init__(self, rangeLower, rangeUpper):
self.range = (rangeLower, rangeUpper)
self.nodes = OrderedDict()
self.touchLastUpdated()
def touchLastUpdated(self):
self.lastUpdated = time.time()
def getNodes(self):
return self.nodes.values()
def split(self):
midpoint = self.range[1] - ((self.range[1] - self.range[0]) / 2)
one = KBucket(self.range[0], midpoint)
two = KBucket(midpoint+1, self.range[1])
for node in self.nodes.values():
bucket = one if node.long_id <= midpoint else two
bucket.nodes[node.id] = node
return (one, two)
def removeNode(self, node):
if node.id in self.nodes:
del self.nodes[node.id]
def hasInRange(self, node):
return rangeLower <= node.long_id <= rangeUpper
def addNode(self, node):
"""
Add a C{Node} to the C{KBucket}. Return True if successful,
False if the bucket is full.
"""
if node.id in self.nodes:
del self.nodes[node.id]
self.nodes[node.id] = node
elif len(self) < KSIZE:
self.nodes[node.id] = node
else:
return False
return True
def head(self):
return self.nodes.values()[0]
def __getitem__(self, id):
return self.nodes.get(id, None)
def __len__(self):
return len(self.nodes)
class TableTraverser(object):
def __init__(self, table, startNode):
index = table.getBucketFor(startNode)
bucket[index].touchLastUpdated()
self.currentNodes = table.buckets[index].getNodes()
self.leftBuckets = table.buckets[:index]
self.rightBuckets = table.buckets[(index+1):]
self.left = True
def __iter__(self):
return self
def next(self):
"""
Pop an item from the left subtree, then right, then left, etc.
"""
if len(self.currentNodes) > 0:
return self.currentNodes.pop()
if self.left and len(self.leftBuckets) > 0:
self.currentNodes = self.leftBuckets.pop().getNodes()
self.left = False
return self.next()
if len(self.rightBuckets) > 0:
self.currentNodes = self.rightBuckets.pop().getNodes()
self.left = True
return self.next()
raise StopIteration
class RoutingTable(object):
def __init__(self, protocol):
self.protocol = protocol
self.buckets = [KBucket(0, 2**160)]
LoopingCall(self.refresh).start(3600)
def splitBucket(self, index):
one, two = self.buckets[index].split()
self.buckets[index] = one
self.buckets.insert(index+1, two)
# todo split one/two if needed based on section 4.2
def refresh(self):
ds = []
for bucket in self.buckets:
if bucket.lastUpdated < (time.time() - 3600):
node = Node(None, None, random.randint(*bucket.range))
nearest = self.findNeighbors(node, ALPHA)
spider = NetworkSpider(self.protocol, node, nearest)
ds.append(spider.findNodes())
return defer.gatherResults(ds)
def removeContact(self, node):
index = self.getBucketFor(self, node)
self.buckets[index].removeNode(node)
def addContact(self, node):
index = self.getBucketFor(self, node)
bucket = self.buckets[index]
# this will succeed unless the bucket is full
if bucket.addNode(node):
return
if bucket.hasInRange(node):
self.splitBucket(index)
self.addContact(node)
else:
self.protocol.callPing(bucket.head())
def getBucketFor(self, node):
"""
Get the index of the bucket that the given node would fall into.
"""
for index, bucket in enumerate(self.buckets):
if node.long_id < bucket.range[1]:
return index
def findNeighbors(self, node, k=KSIZE):
nodes = []
for neighbor in TableTraverser(self, node):
if neighbor.id != node.id:
heapq.heappush(nodes, (node.distanceFrom(neighbor), neighbor))
if len(nodes) == k:
break
return heapq.nsmallest(k, nodes)

37
kademlia/storage.py Normal file
View File

@ -0,0 +1,37 @@
import time
from collections import OrderedDict
class ForgetfulStorage(object):
def __init__(self, ttl=7200):
"""
By default, max age is 2 hours
"""
self.data = OrderedDict()
self.ttl = ttl
def __setitem__(self, key, value):
if key in self.data:
del self.data[key]
self.data[key] = (time.time() + self.ttl, value)
self.cull()
def cull(self):
pop = 0
for value in self.data.itervalues():
if value[0] >= time.time():
break
pop += 1
for _ in xrange(pop):
self.data.popitem(first=True)
def __getitem__(self, key):
self.cull()
return self.data[key][1]
def __iter__(self):
self.cull()
return iter(self.data)
def __repr__(self):
self.cull()
return repr(self.data)

27
kademlia/utils.py Normal file
View File

@ -0,0 +1,27 @@
"""
General catchall for functions that don't make sense as methods.
"""
from twisted.internet import defer
def deferredDict(d):
"""
Just like a C{defer.DeferredList} but instead accepts and returns a C{dict}.
@param d: A C{dict} whose values are all C{Deferred} objects.
@return: A C{DeferredList} whose callback will be given a dictionary whose
keys are the same as the parameter C{d}'s and whose values are the results
of each individual deferred call.
"""
if len(d) == 0:
return defer.succeed({})
def handle(results, names):
rvalue = {}
for index in range(len(results)):
rvalue[names[index]] = results[index][1]
return rvalue
dl = defer.DeferredList(d.values())
return dl.addCallback(handle, d.keys())

16
setup.py Executable file
View File

@ -0,0 +1,16 @@
#!/usr/bin/env python
from setuptools import setup, find_packages
from kademlia import version
setup(
name="kademlia",
version=version,
description="Kademlia is a distributed hash table for decentralized peer-to-peer computer networks.",
author="Brian Muller",
author_email="bamuller@gmail.com",
license="MIT",
url="http://github.com/bmuller/kademlia",
packages=find_packages(),
requires=["twisted", "rpcudp"],
install_requires=['twisted>=12.0', "rpcudp>=0.3"]
)