library now uses asyncio instead of twisted

This commit is contained in:
Brian Muller 2016-07-12 08:32:02 -04:00
parent 6bbd1bfa42
commit 464ae5bd1f
16 changed files with 206 additions and 363 deletions

View File

@ -1,13 +1,18 @@
from twisted.internet import reactor import logging
from twisted.python import log import asyncio
from kademlia.network import Server
import sys
log.startLogging(sys.stdout) from kademlia.network import Server
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
loop.set_debug(True)
server = Server()
server.listen(8468)
def done(result): def done(result):
print "Key result:", result print("Key result:", result)
reactor.stop()
def setDone(result, server): def setDone(result, server):
server.get("a key").addCallback(done) server.get("a key").addCallback(done)
@ -15,8 +20,12 @@ def setDone(result, server):
def bootstrapDone(found, server): def bootstrapDone(found, server):
server.set("a key", "a value").addCallback(setDone, server) server.set("a key", "a value").addCallback(setDone, server)
server = Server() #server.bootstrap([("1.2.3.4", 8468)]).addCallback(bootstrapDone, server)
server.listen(8468)
server.bootstrap([("1.2.3.4", 8468)]).addCallback(bootstrapDone, server)
reactor.run() try:
loop.run_forever()
except KeyboardInterrupt:
pass
server.close()
loop.close()

22
examples/get.py Normal file
View File

@ -0,0 +1,22 @@
import logging
import asyncio
import sys
from kademlia.network import Server
if len(sys.argv) != 2:
print("Usage: python get.py <key>")
sys.exit(1)
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
loop.set_debug(True)
server = Server()
server.listen(8469)
loop.run_until_complete(server.bootstrap([("127.0.0.1", 8468)]))
result = loop.run_until_complete(server.get(sys.argv[1]))
server.stop()
loop.close()
print("Get result:", result)

View File

@ -1,33 +0,0 @@
from twisted.internet import reactor
from twisted.python import log
from kademlia.network import Server
import sys
log.startLogging(sys.stdout)
if len(sys.argv) != 4:
print "Usage: python query.py <bootstrap ip> <bootstrap port> <key>"
sys.exit(1)
ip = sys.argv[1]
port = int(sys.argv[2])
key = sys.argv[3]
print "Getting %s (with bootstrap %s:%i)" % (key, ip, port)
def done(result):
print "Key result:"
print result
reactor.stop()
def bootstrapDone(found, server, key):
if len(found) == 0:
print "Could not connect to the bootstrap server."
reactor.stop()
server.get(key).addCallback(done)
server = Server()
server.listen(port)
server.bootstrap([(ip, port)]).addCallback(bootstrapDone, server, key)
reactor.run()

View File

@ -1,21 +0,0 @@
from twisted.application import service, internet
from twisted.python.log import ILogObserver
from twisted.internet import reactor, task
import sys, os
sys.path.append(os.path.dirname(__file__))
from kademlia.network import Server
from kademlia import log
application = service.Application("kademlia")
application.setComponent(ILogObserver, log.FileLogObserver(sys.stdout, log.INFO).emit)
if os.path.isfile('cache.pickle'):
kserver = Server.loadState('cache.pickle')
else:
kserver = Server()
kserver.bootstrap([("1.2.3.4", 8468)])
kserver.saveStateRegularly('cache.pickle', 10)
server = internet.UDPServer(8468, kserver.protocol)
server.setServiceParent(application)

20
examples/set.py Normal file
View File

@ -0,0 +1,20 @@
import logging
import asyncio
import sys
from kademlia.network import Server
if len(sys.argv) != 3:
print("Usage: python set.py <key> <value>")
sys.exit(1)
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
loop.set_debug(True)
server = Server()
server.listen(8469)
loop.run_until_complete(server.bootstrap([("127.0.0.1", 8468)]))
loop.run_until_complete(server.set(sys.argv[1], sys.argv[2]))
server.stop()
loop.close()

View File

@ -1,59 +0,0 @@
from twisted.application import service, internet
from twisted.python.log import ILogObserver
from twisted.python import log
from twisted.internet import reactor, task
from twisted.web import resource, server
from twisted.web.resource import NoResource
import sys, os
sys.path.append(os.path.dirname(__file__))
from kademlia.network import Server
from kademlia import log
application = service.Application("kademlia")
application.setComponent(ILogObserver, log.FileLogObserver(sys.stdout, log.INFO).emit)
if os.path.isfile('cache.pickle'):
kserver = Server.loadState('cache.pickle')
else:
kserver = Server()
kserver.bootstrap([("1.2.3.4", 8468)])
kserver.saveStateRegularly('cache.pickle', 10)
udpserver = internet.UDPServer(8468, kserver.protocol)
udpserver.setServiceParent(application)
class WebResource(resource.Resource):
def __init__(self, kserver):
resource.Resource.__init__(self)
self.kserver = kserver
def getChild(self, child, request):
return self
def render_GET(self, request):
def respond(value):
value = value or NoResource().render(request)
request.write(value)
request.finish()
log.msg("Getting key: %s" % request.path.split('/')[-1])
d = self.kserver.get(request.path.split('/')[-1])
d.addCallback(respond)
return server.NOT_DONE_YET
def render_POST(self, request):
key = request.path.split('/')[-1]
value = request.content.getvalue()
log.msg("Setting %s = %s" % (key, value))
self.kserver.set(key, value)
return value
website = server.Site(WebResource(kserver))
webserver = internet.TCPServer(8080, website)
webserver.setServiceParent(application)
# To test, you can set with:
# $> curl --data "hi there" http://localhost:8080/one
# and get with:
# $> curl http://localhost:8080/one

View File

@ -1,5 +1,4 @@
""" """
Kademlia is a Python implementation of the Kademlia protocol for `Twisted <http://twistedmatrix.com>`_. Kademlia is a Python implementation of the Kademlia protocol which utilizes the asyncio library.
""" """
version_info = (0, 6) __version__ = "1.0"
version = '.'.join(map(str, version_info))

View File

@ -1,8 +1,8 @@
from collections import Counter from collections import Counter
from logging import getLogger
from kademlia.log import Logger
from kademlia.utils import deferredDict
from kademlia.node import Node, NodeHeap from kademlia.node import Node, NodeHeap
from kademlia.utils import gather_dict
class SpiderCrawl(object): class SpiderCrawl(object):
@ -26,12 +26,12 @@ class SpiderCrawl(object):
self.node = node self.node = node
self.nearest = NodeHeap(self.node, self.ksize) self.nearest = NodeHeap(self.node, self.ksize)
self.lastIDsCrawled = [] self.lastIDsCrawled = []
self.log = Logger(system=self) self.log = getLogger("kademlia-spider")
self.log.info("creating spider with peers: %s" % peers) self.log.info("creating spider with peers: %s" % peers)
self.nearest.push(peers) self.nearest.push(peers)
def _find(self, rpcmethod): async def _find(self, rpcmethod):
""" """
Get either a value or list of nodes. Get either a value or list of nodes.
@ -58,7 +58,8 @@ class SpiderCrawl(object):
for peer in self.nearest.getUncontacted()[:count]: for peer in self.nearest.getUncontacted()[:count]:
ds[peer.id] = rpcmethod(peer, self.node) ds[peer.id] = rpcmethod(peer, self.node)
self.nearest.markContacted(peer) self.nearest.markContacted(peer)
return deferredDict(ds).addCallback(self._nodesFound) found = await gather_dict(ds)
return await self._nodesFound(found)
class ValueSpiderCrawl(SpiderCrawl): class ValueSpiderCrawl(SpiderCrawl):
@ -68,13 +69,13 @@ class ValueSpiderCrawl(SpiderCrawl):
# section 2.3 so we can set the key there if found # section 2.3 so we can set the key there if found
self.nearestWithoutValue = NodeHeap(self.node, 1) self.nearestWithoutValue = NodeHeap(self.node, 1)
def find(self): async def find(self):
""" """
Find either the closest nodes or the value requested. Find either the closest nodes or the value requested.
""" """
return self._find(self.protocol.callFindValue) return await self._find(self.protocol.callFindValue)
def _nodesFound(self, responses): async def _nodesFound(self, responses):
""" """
Handle the result of an iteration in _find. Handle the result of an iteration in _find.
""" """
@ -93,13 +94,13 @@ class ValueSpiderCrawl(SpiderCrawl):
self.nearest.remove(toremove) self.nearest.remove(toremove)
if len(foundValues) > 0: if len(foundValues) > 0:
return self._handleFoundValues(foundValues) return await self._handleFoundValues(foundValues)
if self.nearest.allBeenContacted(): if self.nearest.allBeenContacted():
# not found! # not found!
return None return None
return self.find() return await self.find()
def _handleFoundValues(self, values): async def _handleFoundValues(self, values):
""" """
We got some values! Exciting. But let's make sure We got some values! Exciting. But let's make sure
they're all the same or freak out a little bit. Also, they're all the same or freak out a little bit. Also,
@ -114,19 +115,18 @@ class ValueSpiderCrawl(SpiderCrawl):
peerToSaveTo = self.nearestWithoutValue.popleft() peerToSaveTo = self.nearestWithoutValue.popleft()
if peerToSaveTo is not None: if peerToSaveTo is not None:
d = self.protocol.callStore(peerToSaveTo, self.node.id, value) await self.protocol.callStore(peerToSaveTo, self.node.id, value)
return d.addCallback(lambda _: value)
return value return value
class NodeSpiderCrawl(SpiderCrawl): class NodeSpiderCrawl(SpiderCrawl):
def find(self): async def find(self):
""" """
Find the closest nodes. Find the closest nodes.
""" """
return self._find(self.protocol.callFindNode) return await self._find(self.protocol.callFindNode)
def _nodesFound(self, responses): async def _nodesFound(self, responses):
""" """
Handle the result of an iteration in _find. Handle the result of an iteration in _find.
""" """

View File

@ -1,63 +0,0 @@
import sys
from twisted.python import log
INFO = 5
DEBUG = 4
WARNING = 3
ERROR = 2
CRITICAL = 1
class FileLogObserver(log.FileLogObserver):
def __init__(self, f=None, level=WARNING, default=DEBUG):
log.FileLogObserver.__init__(self, f or sys.stdout)
self.level = level
self.default = default
def emit(self, eventDict):
ll = eventDict.get('loglevel', self.default)
if eventDict['isError'] or 'failure' in eventDict or self.level >= ll:
log.FileLogObserver.emit(self, eventDict)
class Logger:
def __init__(self, **kwargs):
self.kwargs = kwargs
def msg(self, message, **kw):
kw.update(self.kwargs)
if 'system' in kw and not isinstance(kw['system'], str):
kw['system'] = kw['system'].__class__.__name__
log.msg(message, **kw)
def info(self, message, **kw):
kw['loglevel'] = INFO
self.msg("[INFO] %s" % message, **kw)
def debug(self, message, **kw):
kw['loglevel'] = DEBUG
self.msg("[DEBUG] %s" % message, **kw)
def warning(self, message, **kw):
kw['loglevel'] = WARNING
self.msg("[WARNING] %s" % message, **kw)
def error(self, message, **kw):
kw['loglevel'] = ERROR
self.msg("[ERROR] %s" % message, **kw)
def critical(self, message, **kw):
kw['loglevel'] = CRITICAL
self.msg("[CRITICAL] %s" % message, **kw)
try:
theLogger
except NameError:
theLogger = Logger()
msg = theLogger.msg
info = theLogger.info
debug = theLogger.debug
warning = theLogger.warning
error = theLogger.error
critical = theLogger.critical

View File

@ -3,13 +3,11 @@ Package for interacting on the network at a high level.
""" """
import random import random
import pickle import pickle
import asyncio
from logging import getLogger
from twisted.internet.task import LoopingCall
from twisted.internet import defer, reactor, task
from kademlia.log import Logger
from kademlia.protocol import KademliaProtocol from kademlia.protocol import KademliaProtocol
from kademlia.utils import deferredDict, digest from kademlia.utils import digest
from kademlia.storage import ForgetfulStorage from kademlia.storage import ForgetfulStorage
from kademlia.node import Node from kademlia.node import Node
from kademlia.crawling import ValueSpiderCrawl from kademlia.crawling import ValueSpiderCrawl
@ -34,25 +32,39 @@ class Server(object):
""" """
self.ksize = ksize self.ksize = ksize
self.alpha = alpha self.alpha = alpha
self.log = Logger(system=self) self.log = getLogger("kademlia-server")
self.storage = storage or ForgetfulStorage() self.storage = storage or ForgetfulStorage()
self.node = Node(id or digest(random.getrandbits(255))) self.node = Node(id or digest(random.getrandbits(255)))
self.protocol = KademliaProtocol(self.node, self.storage, ksize) self.transport = None
self.refreshLoop = LoopingCall(self.refreshTable).start(3600) self.protocol = None
self.refresh_loop = None
def listen(self, port, interface=""): def stop(self):
if self.refresh_loop is not None:
self.refresh_loop.cancel()
if self.transport is not None:
self.transport.close()
def listen(self, port, interface='0.0.0.0'):
""" """
Start listening on the given port. Start listening on the given port.
This is the same as calling::
reactor.listenUDP(port, server.protocol)
Provide interface="::" to accept ipv6 address Provide interface="::" to accept ipv6 address
""" """
return reactor.listenUDP(port, self.protocol, interface) proto_factory = lambda: KademliaProtocol(self.node, self.storage, self.ksize)
loop = asyncio.get_event_loop()
listen = loop.create_datagram_endpoint(proto_factory, local_addr=(interface, port))
self.transport, self.protocol = loop.run_until_complete(listen)
# finally, schedule refreshing table
self.refresh_table()
def refreshTable(self): def refresh_table(self):
asyncio.ensure_future(self._refresh_table())
loop = asyncio.get_event_loop()
self.refresh_loop = loop.call_later(3600, self.refresh_table)
async def _refresh_table(self):
""" """
Refresh buckets that haven't had any lookups in the last hour Refresh buckets that haven't had any lookups in the last hour
(per section 2.3 of the paper). (per section 2.3 of the paper).
@ -64,14 +76,12 @@ class Server(object):
spider = NodeSpiderCrawl(self.protocol, node, nearest) spider = NodeSpiderCrawl(self.protocol, node, nearest)
ds.append(spider.find()) ds.append(spider.find())
def republishKeys(_): # do our crawling
ds = [] await asyncio.gather(*ds)
# Republish keys older than one hour
for key, value in self.storage.iteritemsOlderThan(3600):
ds.append(self.set(key, value))
return defer.gatherResults(ds)
return defer.gatherResults(ds).addCallback(republishKeys) # now republish keys older than one hour
for key, value in self.storage.iteritemsOlderThan(3600):
await self.set(key, value)
def bootstrappableNeighbors(self): def bootstrappableNeighbors(self):
""" """
@ -86,7 +96,7 @@ class Server(object):
neighbors = self.protocol.router.findNeighbors(self.node) neighbors = self.protocol.router.findNeighbors(self.node)
return [ tuple(n)[-2:] for n in neighbors ] return [ tuple(n)[-2:] for n in neighbors ]
def bootstrap(self, addrs): async def bootstrap(self, addrs):
""" """
Bootstrap the server by connecting to other known nodes in the network. Bootstrap the server by connecting to other known nodes in the network.
@ -94,22 +104,14 @@ class Server(object):
addrs: A `list` of (ip, port) `tuple` pairs. Note that only IP addresses addrs: A `list` of (ip, port) `tuple` pairs. Note that only IP addresses
are acceptable - hostnames will cause an error. are acceptable - hostnames will cause an error.
""" """
# if the transport hasn't been initialized yet, wait a second cos = list(map(self.bootstrap_node, addrs))
if self.protocol.transport is None: nodes = [node for node in await asyncio.gather(*cos) if not node is None]
return task.deferLater(reactor, 1, self.bootstrap, addrs) spider = NodeSpiderCrawl(self.protocol, self.node, nodes, self.ksize, self.alpha)
return await spider.find()
def initTable(results):
nodes = [] async def bootstrap_node(self, addr):
for addr, result in results.items(): result = await self.protocol.ping(addr, self.node.id)
if result[0]: return Node(result[1], addr[0], addr[1]) if result[0] else None
nodes.append(Node(result[1], addr[0], addr[1]))
spider = NodeSpiderCrawl(self.protocol, self.node, nodes, self.ksize, self.alpha)
return spider.find()
ds = {}
for addr in addrs:
ds[addr] = self.protocol.ping(addr, self.node.id)
return deferredDict(ds).addCallback(initTable)
def inetVisibleIP(self): def inetVisibleIP(self):
""" """
@ -128,7 +130,7 @@ class Server(object):
ds.append(self.protocol.stun(neighbor)) ds.append(self.protocol.stun(neighbor))
return defer.gatherResults(ds).addCallback(handle) return defer.gatherResults(ds).addCallback(handle)
def get(self, key): async def get(self, key):
""" """
Get a key if the network has it. Get a key if the network has it.
@ -138,16 +140,16 @@ class Server(object):
dkey = digest(key) dkey = digest(key)
# if this node has it, return it # if this node has it, return it
if self.storage.get(dkey) is not None: if self.storage.get(dkey) is not None:
return defer.succeed(self.storage.get(dkey)) return self.storage.get(dkey)
node = Node(dkey) node = Node(dkey)
nearest = self.protocol.router.findNeighbors(node) nearest = self.protocol.router.findNeighbors(node)
if len(nearest) == 0: if len(nearest) == 0:
self.log.warning("There are no known neighbors to get key %s" % key) self.log.warning("There are no known neighbors to get key %s" % key)
return defer.succeed(None) return None
spider = ValueSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) spider = ValueSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha)
return spider.find() return await spider.find()
def set(self, key, value): async def set(self, key, value):
""" """
Set the given key to the given value in the network. Set the given key to the given value in the network.
""" """
@ -155,31 +157,21 @@ class Server(object):
dkey = digest(key) dkey = digest(key)
node = Node(dkey) node = Node(dkey)
def store(nodes):
self.log.info("setting '%s' on %s" % (key, map(str, nodes)))
# if this node is close too, then store here as well
if self.node.distanceTo(node) < max([n.distanceTo(node) for n in nodes]):
self.storage[dkey] = value
ds = [self.protocol.callStore(n, dkey, value) for n in nodes]
return defer.DeferredList(ds).addCallback(self._anyRespondSuccess)
nearest = self.protocol.router.findNeighbors(node) nearest = self.protocol.router.findNeighbors(node)
if len(nearest) == 0: if len(nearest) == 0:
self.log.warning("There are no known neighbors to set key %s" % key) self.log.warning("There are no known neighbors to set key %s" % key)
return defer.succeed(False) return False
spider = NodeSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha)
return spider.find().addCallback(store)
def _anyRespondSuccess(self, responses): spider = NodeSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha)
""" nodes = await spider.find()
Given the result of a DeferredList of calls to peers, ensure that at least self.log.info("setting '%s' on %s" % (key, list(map(str, nodes))))
one of them was contacted and responded with a Truthy result.
""" # if this node is close too, then store here as well
for deferSuccess, result in responses: if self.node.distanceTo(node) < max([n.distanceTo(node) for n in nodes]):
peerReached, peerResponse = result self.storage[dkey] = value
if deferSuccess and peerReached and peerResponse: ds = [self.protocol.callStore(n, dkey, value) for n in nodes]
return True # return true only if at least one store call succeeded
return False return any(await asyncio.gather(*ds))
def saveState(self, fname): def saveState(self, fname):
""" """

View File

@ -7,7 +7,7 @@ class Node:
self.id = id self.id = id
self.ip = ip self.ip = ip
self.port = port self.port = port
self.long_id = long(id.encode('hex'), 16) self.long_id = int(id.hex(), 16)
def sameHomeAs(self, node): def sameHomeAs(self, node):
return self.ip == node.ip and self.port == node.port return self.ip == node.ip and self.port == node.port

View File

@ -1,12 +1,10 @@
import random import random
from logging import getLogger
from twisted.internet import defer
from rpcudp.protocol import RPCProtocol from rpcudp.protocol import RPCProtocol
from kademlia.node import Node from kademlia.node import Node
from kademlia.routing import RoutingTable from kademlia.routing import RoutingTable
from kademlia.log import Logger
from kademlia.utils import digest from kademlia.utils import digest
@ -16,7 +14,7 @@ class KademliaProtocol(RPCProtocol):
self.router = RoutingTable(self, ksize, sourceNode) self.router = RoutingTable(self, ksize, sourceNode)
self.storage = storage self.storage = storage
self.sourceNode = sourceNode self.sourceNode = sourceNode
self.log = Logger(system=self) self.log = getLogger("kademlia-protocol")
def getRefreshIDs(self): def getRefreshIDs(self):
""" """
@ -43,11 +41,11 @@ class KademliaProtocol(RPCProtocol):
return True return True
def rpc_find_node(self, sender, nodeid, key): def rpc_find_node(self, sender, nodeid, key):
self.log.info("finding neighbors of %i in local table" % long(nodeid.encode('hex'), 16)) self.log.info("finding neighbors of %i in local table" % int(nodeid.hex(), 16))
source = Node(nodeid, sender[0], sender[1]) source = Node(nodeid, sender[0], sender[1])
self.welcomeIfNewNode(source) self.welcomeIfNewNode(source)
node = Node(key) node = Node(key)
return map(tuple, self.router.findNeighbors(node, exclude=source)) return list(map(tuple, self.router.findNeighbors(node, exclude=source)))
def rpc_find_value(self, sender, nodeid, key): def rpc_find_value(self, sender, nodeid, key):
source = Node(nodeid, sender[0], sender[1]) source = Node(nodeid, sender[0], sender[1])
@ -57,25 +55,25 @@ class KademliaProtocol(RPCProtocol):
return self.rpc_find_node(sender, nodeid, key) return self.rpc_find_node(sender, nodeid, key)
return { 'value': value } return { 'value': value }
def callFindNode(self, nodeToAsk, nodeToFind): async def callFindNode(self, nodeToAsk, nodeToFind):
address = (nodeToAsk.ip, nodeToAsk.port) address = (nodeToAsk.ip, nodeToAsk.port)
d = self.find_node(address, self.sourceNode.id, nodeToFind.id) result = await self.find_node(address, self.sourceNode.id, nodeToFind.id)
return d.addCallback(self.handleCallResponse, nodeToAsk) return self.handleCallResponse(result, nodeToAsk)
def callFindValue(self, nodeToAsk, nodeToFind): async def callFindValue(self, nodeToAsk, nodeToFind):
address = (nodeToAsk.ip, nodeToAsk.port) address = (nodeToAsk.ip, nodeToAsk.port)
d = self.find_value(address, self.sourceNode.id, nodeToFind.id) result = await self.find_value(address, self.sourceNode.id, nodeToFind.id)
return d.addCallback(self.handleCallResponse, nodeToAsk) return self.handleCallResponse(result, nodeToAsk)
def callPing(self, nodeToAsk): async def callPing(self, nodeToAsk):
address = (nodeToAsk.ip, nodeToAsk.port) address = (nodeToAsk.ip, nodeToAsk.port)
d = self.ping(address, self.sourceNode.id) result = await self.ping(address, self.sourceNode.id)
return d.addCallback(self.handleCallResponse, nodeToAsk) return self.handleCallResponse(result, nodeToAsk)
def callStore(self, nodeToAsk, key, value): async def callStore(self, nodeToAsk, key, value):
address = (nodeToAsk.ip, nodeToAsk.port) address = (nodeToAsk.ip, nodeToAsk.port)
d = self.store(address, self.sourceNode.id, key, value) result = await self.store(address, self.sourceNode.id, key, value)
return d.addCallback(self.handleCallResponse, nodeToAsk) return self.handleCallResponse(result, nodeToAsk)
def welcomeIfNewNode(self, node): def welcomeIfNewNode(self, node):
""" """
@ -91,28 +89,30 @@ class KademliaProtocol(RPCProtocol):
is closer than the closest in that list, then store the key/value is closer than the closest in that list, then store the key/value
on the new node (per section 2.5 of the paper) on the new node (per section 2.5 of the paper)
""" """
if self.router.isNewNode(node): if not self.router.isNewNode(node):
ds = [] return
for key, value in self.storage.iteritems():
keynode = Node(digest(key)) self.log.info("never seen %s before, adding to router and setting nearby " % node)
neighbors = self.router.findNeighbors(keynode) for key, value in self.storage.items():
if len(neighbors) > 0: keynode = Node(digest(key))
newNodeClose = node.distanceTo(keynode) < neighbors[-1].distanceTo(keynode) neighbors = self.router.findNeighbors(keynode)
thisNodeClosest = self.sourceNode.distanceTo(keynode) < neighbors[0].distanceTo(keynode) if len(neighbors) > 0:
if len(neighbors) == 0 or (newNodeClose and thisNodeClosest): newNodeClose = node.distanceTo(keynode) < neighbors[-1].distanceTo(keynode)
ds.append(self.callStore(node, key, value)) thisNodeClosest = self.sourceNode.distanceTo(keynode) < neighbors[0].distanceTo(keynode)
self.router.addContact(node) if len(neighbors) == 0 or (newNodeClose and thisNodeClosest):
return defer.gatherResults(ds) asyncio.ensure_future(self.callStore(node, key, value))
self.router.addContact(node)
def handleCallResponse(self, result, node): def handleCallResponse(self, result, node):
""" """
If we get a response, add the node to the routing table. If If we get a response, add the node to the routing table. If
we get no response, make sure it's removed from the routing table. we get no response, make sure it's removed from the routing table.
""" """
if result[0]: if not result[0]:
self.log.info("got response from %s, adding to router" % node) self.log.warning("no response from %s, removing from router" % node)
self.welcomeIfNewNode(node)
else:
self.log.debug("no response from %s, removing from router" % node)
self.router.removeContact(node) self.router.removeContact(node)
return result
self.log.info("got successful response from %s")
self.welcomeIfNewNode(node)
return result return result

View File

@ -18,7 +18,7 @@ class KBucket(object):
self.lastUpdated = time.time() self.lastUpdated = time.time()
def getNodes(self): def getNodes(self):
return self.nodes.values() return list(self.nodes.values())
def split(self): def split(self):
midpoint = (self.range[0] + self.range[1]) / 2 midpoint = (self.range[0] + self.range[1]) / 2
@ -68,7 +68,7 @@ class KBucket(object):
return len(sp) return len(sp)
def head(self): def head(self):
return self.nodes.values()[0] return list(self.nodes.values())[0]
def __getitem__(self, id): def __getitem__(self, id):
return self.nodes.get(id, None) return self.nodes.get(id, None)
@ -89,7 +89,7 @@ class TableTraverser(object):
def __iter__(self): def __iter__(self):
return self return self
def next(self): def __next__(self):
""" """
Pop an item from the left subtree, then right, then left, etc. Pop an item from the left subtree, then right, then left, etc.
""" """
@ -99,12 +99,12 @@ class TableTraverser(object):
if self.left and len(self.leftBuckets) > 0: if self.left and len(self.leftBuckets) > 0:
self.currentNodes = self.leftBuckets.pop().getNodes() self.currentNodes = self.leftBuckets.pop().getNodes()
self.left = False self.left = False
return self.next() return next(self)
if len(self.rightBuckets) > 0: if len(self.rightBuckets) > 0:
self.currentNodes = self.rightBuckets.pop().getNodes() self.currentNodes = self.rightBuckets.pop().getNodes()
self.left = True self.left = True
return self.next() return next(self)
raise StopIteration raise StopIteration
@ -177,4 +177,4 @@ class RoutingTable(object):
if len(nodes) == k: if len(nodes) == k:
break break
return map(operator.itemgetter(1), heapq.nsmallest(k, nodes)) return list(map(operator.itemgetter(1), heapq.nsmallest(k, nodes)))

View File

@ -1,15 +1,10 @@
import time import time
from itertools import izip
from itertools import imap
from itertools import takewhile from itertools import takewhile
import operator import operator
from collections import OrderedDict from collections import OrderedDict
from zope.interface import implements
from zope.interface import Interface
class IStorage:
class IStorage(Interface):
""" """
Local storage for this node. Local storage for this node.
""" """
@ -18,31 +13,34 @@ class IStorage(Interface):
""" """
Set a key to the given value. Set a key to the given value.
""" """
raise NotImplementedError
def __getitem__(key): def __getitem__(key):
""" """
Get the given key. If item doesn't exist, raises C{KeyError} Get the given key. If item doesn't exist, raises C{KeyError}
""" """
raise NotImplementedError
def get(key, default=None): def get(key, default=None):
""" """
Get given key. If not found, return default. Get given key. If not found, return default.
""" """
raise NotImplementedError
def iteritemsOlderThan(secondsOld): def iteritemsOlderThan(secondsOld):
""" """
Return the an iterator over (key, value) tuples for items older than the given secondsOld. Return the an iterator over (key, value) tuples for items older than the given secondsOld.
""" """
raise NotImplementedError
def iteritems(): def iteritems():
""" """
Get the iterator for this storage, should yield tuple of (key, value) Get the iterator for this storage, should yield tuple of (key, value)
""" """
raise NotImplementedError
class ForgetfulStorage(object): class ForgetfulStorage(IStorage):
implements(IStorage)
def __init__(self, ttl=604800): def __init__(self, ttl=604800):
""" """
By default, max age is a week. By default, max age is a week.
@ -82,16 +80,16 @@ class ForgetfulStorage(object):
minBirthday = time.time() - secondsOld minBirthday = time.time() - secondsOld
zipped = self._tripleIterable() zipped = self._tripleIterable()
matches = takewhile(lambda r: minBirthday >= r[1], zipped) matches = takewhile(lambda r: minBirthday >= r[1], zipped)
return imap(operator.itemgetter(0, 2), matches) return list(map(operator.itemgetter(0, 2), matches))
def _tripleIterable(self): def _tripleIterable(self):
ikeys = self.data.iterkeys() ikeys = self.data.keys()
ibirthday = imap(operator.itemgetter(0), self.data.itervalues()) ibirthday = map(operator.itemgetter(0), self.data.values())
ivalues = imap(operator.itemgetter(1), self.data.itervalues()) ivalues = map(operator.itemgetter(1), self.data.values())
return izip(ikeys, ibirthday, ivalues) return zip(ikeys, ibirthday, ivalues)
def iteritems(self): def items(self):
self.cull() self.cull()
ikeys = self.data.iterkeys() ikeys = self.data.keys()
ivalues = imap(operator.itemgetter(1), self.data.itervalues()) ivalues = map(operator.itemgetter(1), self.data.values())
return izip(ikeys, ivalues) return zip(ikeys, ivalues)

View File

@ -3,41 +3,21 @@ General catchall for functions that don't make sense as methods.
""" """
import hashlib import hashlib
import operator import operator
import asyncio
from twisted.internet import defer
async def gather_dict(d):
cors = list(d.values())
results = await asyncio.gather(*cors)
return dict(zip(d.keys(), results))
def digest(s): def digest(s):
if not isinstance(s, str): if not isinstance(s, bytes):
s = str(s) s = str(s).encode('utf8')
return hashlib.sha1(s).digest() return hashlib.sha1(s).digest()
def deferredDict(d):
"""
Just like a :class:`defer.DeferredList` but instead accepts and returns a :class:`dict`.
Args:
d: A :class:`dict` whose values are all :class:`defer.Deferred` objects.
Returns:
:class:`defer.DeferredList` whose callback will be given a dictionary whose
keys are the same as the parameter :obj:`d` and whose values are the results
of each individual deferred call.
"""
if len(d) == 0:
return defer.succeed({})
def handle(results, names):
rvalue = {}
for index in range(len(results)):
rvalue[names[index]] = results[index][1]
return rvalue
dl = defer.DeferredList(d.values())
return dl.addCallback(handle, d.keys())
class OrderedSet(list): class OrderedSet(list):
""" """
Acts like a list in all ways, except in the behavior of the :meth:`push` method. Acts like a list in all ways, except in the behavior of the :meth:`push` method.

View File

@ -1,16 +1,15 @@
#!/usr/bin/env python #!/usr/bin/env python
from setuptools import setup, find_packages from setuptools import setup, find_packages
from kademlia import version import kademlia
setup( setup(
name="kademlia", name="kademlia",
version=version, version=kademlia.__version__,
description="Kademlia is a distributed hash table for decentralized peer-to-peer computer networks.", description="Kademlia is a distributed hash table for decentralized peer-to-peer computer networks.",
author="Brian Muller", author="Brian Muller",
author_email="bamuller@gmail.com", author_email="bamuller@gmail.com",
license="MIT", license="MIT",
url="http://github.com/bmuller/kademlia", url="http://github.com/bmuller/kademlia",
packages=find_packages(), packages=find_packages(),
requires=["twisted", "rpcudp"], install_requires=["rpcudp>=3.0.0"]
install_requires=['twisted>=14.0', "rpcudp>=1.0"]
) )