diff --git a/examples/example.py b/examples/example.py index 54b6beb..f8f509c 100644 --- a/examples/example.py +++ b/examples/example.py @@ -1,13 +1,18 @@ -from twisted.internet import reactor -from twisted.python import log -from kademlia.network import Server -import sys +import logging +import asyncio -log.startLogging(sys.stdout) +from kademlia.network import Server + + +logging.basicConfig(level=logging.DEBUG) +loop = asyncio.get_event_loop() +loop.set_debug(True) + +server = Server() +server.listen(8468) def done(result): - print "Key result:", result - reactor.stop() + print("Key result:", result) def setDone(result, server): server.get("a key").addCallback(done) @@ -15,8 +20,12 @@ def setDone(result, server): def bootstrapDone(found, server): server.set("a key", "a value").addCallback(setDone, server) -server = Server() -server.listen(8468) -server.bootstrap([("1.2.3.4", 8468)]).addCallback(bootstrapDone, server) +#server.bootstrap([("1.2.3.4", 8468)]).addCallback(bootstrapDone, server) -reactor.run() +try: + loop.run_forever() +except KeyboardInterrupt: + pass + +server.close() +loop.close() diff --git a/examples/get.py b/examples/get.py new file mode 100644 index 0000000..7e62590 --- /dev/null +++ b/examples/get.py @@ -0,0 +1,22 @@ +import logging +import asyncio +import sys + +from kademlia.network import Server + +if len(sys.argv) != 2: + print("Usage: python get.py ") + sys.exit(1) + +logging.basicConfig(level=logging.DEBUG) +loop = asyncio.get_event_loop() +loop.set_debug(True) + +server = Server() +server.listen(8469) +loop.run_until_complete(server.bootstrap([("127.0.0.1", 8468)])) +result = loop.run_until_complete(server.get(sys.argv[1])) +server.stop() +loop.close() + +print("Get result:", result) diff --git a/examples/query.py b/examples/query.py deleted file mode 100644 index 18e94e5..0000000 --- a/examples/query.py +++ /dev/null @@ -1,33 +0,0 @@ -from twisted.internet import reactor -from twisted.python import log -from kademlia.network import Server -import sys - -log.startLogging(sys.stdout) - -if len(sys.argv) != 4: - print "Usage: python query.py " - sys.exit(1) - -ip = sys.argv[1] -port = int(sys.argv[2]) -key = sys.argv[3] - -print "Getting %s (with bootstrap %s:%i)" % (key, ip, port) - -def done(result): - print "Key result:" - print result - reactor.stop() - -def bootstrapDone(found, server, key): - if len(found) == 0: - print "Could not connect to the bootstrap server." - reactor.stop() - server.get(key).addCallback(done) - -server = Server() -server.listen(port) -server.bootstrap([(ip, port)]).addCallback(bootstrapDone, server, key) - -reactor.run() diff --git a/examples/server.tac b/examples/server.tac deleted file mode 100644 index 8973fbc..0000000 --- a/examples/server.tac +++ /dev/null @@ -1,21 +0,0 @@ -from twisted.application import service, internet -from twisted.python.log import ILogObserver -from twisted.internet import reactor, task - -import sys, os -sys.path.append(os.path.dirname(__file__)) -from kademlia.network import Server -from kademlia import log - -application = service.Application("kademlia") -application.setComponent(ILogObserver, log.FileLogObserver(sys.stdout, log.INFO).emit) - -if os.path.isfile('cache.pickle'): - kserver = Server.loadState('cache.pickle') -else: - kserver = Server() - kserver.bootstrap([("1.2.3.4", 8468)]) -kserver.saveStateRegularly('cache.pickle', 10) - -server = internet.UDPServer(8468, kserver.protocol) -server.setServiceParent(application) diff --git a/examples/set.py b/examples/set.py new file mode 100644 index 0000000..f561281 --- /dev/null +++ b/examples/set.py @@ -0,0 +1,20 @@ +import logging +import asyncio +import sys + +from kademlia.network import Server + +if len(sys.argv) != 3: + print("Usage: python set.py ") + sys.exit(1) + +logging.basicConfig(level=logging.DEBUG) +loop = asyncio.get_event_loop() +loop.set_debug(True) + +server = Server() +server.listen(8469) +loop.run_until_complete(server.bootstrap([("127.0.0.1", 8468)])) +loop.run_until_complete(server.set(sys.argv[1], sys.argv[2])) +server.stop() +loop.close() diff --git a/examples/webserver.tac b/examples/webserver.tac deleted file mode 100644 index 016edc7..0000000 --- a/examples/webserver.tac +++ /dev/null @@ -1,59 +0,0 @@ -from twisted.application import service, internet -from twisted.python.log import ILogObserver -from twisted.python import log -from twisted.internet import reactor, task -from twisted.web import resource, server -from twisted.web.resource import NoResource - -import sys, os -sys.path.append(os.path.dirname(__file__)) -from kademlia.network import Server -from kademlia import log - -application = service.Application("kademlia") -application.setComponent(ILogObserver, log.FileLogObserver(sys.stdout, log.INFO).emit) - -if os.path.isfile('cache.pickle'): - kserver = Server.loadState('cache.pickle') -else: - kserver = Server() - kserver.bootstrap([("1.2.3.4", 8468)]) -kserver.saveStateRegularly('cache.pickle', 10) - -udpserver = internet.UDPServer(8468, kserver.protocol) -udpserver.setServiceParent(application) - -class WebResource(resource.Resource): - def __init__(self, kserver): - resource.Resource.__init__(self) - self.kserver = kserver - - def getChild(self, child, request): - return self - - def render_GET(self, request): - def respond(value): - value = value or NoResource().render(request) - request.write(value) - request.finish() - log.msg("Getting key: %s" % request.path.split('/')[-1]) - d = self.kserver.get(request.path.split('/')[-1]) - d.addCallback(respond) - return server.NOT_DONE_YET - - def render_POST(self, request): - key = request.path.split('/')[-1] - value = request.content.getvalue() - log.msg("Setting %s = %s" % (key, value)) - self.kserver.set(key, value) - return value - -website = server.Site(WebResource(kserver)) -webserver = internet.TCPServer(8080, website) -webserver.setServiceParent(application) - - -# To test, you can set with: -# $> curl --data "hi there" http://localhost:8080/one -# and get with: -# $> curl http://localhost:8080/one diff --git a/kademlia/__init__.py b/kademlia/__init__.py index 00c3bce..01c4a98 100644 --- a/kademlia/__init__.py +++ b/kademlia/__init__.py @@ -1,5 +1,4 @@ """ -Kademlia is a Python implementation of the Kademlia protocol for `Twisted `_. +Kademlia is a Python implementation of the Kademlia protocol which utilizes the asyncio library. """ -version_info = (0, 6) -version = '.'.join(map(str, version_info)) +__version__ = "1.0" diff --git a/kademlia/crawling.py b/kademlia/crawling.py index 0206e89..ff867ab 100644 --- a/kademlia/crawling.py +++ b/kademlia/crawling.py @@ -1,8 +1,8 @@ from collections import Counter +from logging import getLogger -from kademlia.log import Logger -from kademlia.utils import deferredDict from kademlia.node import Node, NodeHeap +from kademlia.utils import gather_dict class SpiderCrawl(object): @@ -26,12 +26,12 @@ class SpiderCrawl(object): self.node = node self.nearest = NodeHeap(self.node, self.ksize) self.lastIDsCrawled = [] - self.log = Logger(system=self) + self.log = getLogger("kademlia-spider") self.log.info("creating spider with peers: %s" % peers) self.nearest.push(peers) - def _find(self, rpcmethod): + async def _find(self, rpcmethod): """ Get either a value or list of nodes. @@ -58,7 +58,8 @@ class SpiderCrawl(object): for peer in self.nearest.getUncontacted()[:count]: ds[peer.id] = rpcmethod(peer, self.node) self.nearest.markContacted(peer) - return deferredDict(ds).addCallback(self._nodesFound) + found = await gather_dict(ds) + return await self._nodesFound(found) class ValueSpiderCrawl(SpiderCrawl): @@ -68,13 +69,13 @@ class ValueSpiderCrawl(SpiderCrawl): # section 2.3 so we can set the key there if found self.nearestWithoutValue = NodeHeap(self.node, 1) - def find(self): + async def find(self): """ Find either the closest nodes or the value requested. """ - return self._find(self.protocol.callFindValue) + return await self._find(self.protocol.callFindValue) - def _nodesFound(self, responses): + async def _nodesFound(self, responses): """ Handle the result of an iteration in _find. """ @@ -93,13 +94,13 @@ class ValueSpiderCrawl(SpiderCrawl): self.nearest.remove(toremove) if len(foundValues) > 0: - return self._handleFoundValues(foundValues) + return await self._handleFoundValues(foundValues) if self.nearest.allBeenContacted(): # not found! return None - return self.find() + return await self.find() - def _handleFoundValues(self, values): + async def _handleFoundValues(self, values): """ We got some values! Exciting. But let's make sure they're all the same or freak out a little bit. Also, @@ -114,19 +115,18 @@ class ValueSpiderCrawl(SpiderCrawl): peerToSaveTo = self.nearestWithoutValue.popleft() if peerToSaveTo is not None: - d = self.protocol.callStore(peerToSaveTo, self.node.id, value) - return d.addCallback(lambda _: value) + await self.protocol.callStore(peerToSaveTo, self.node.id, value) return value class NodeSpiderCrawl(SpiderCrawl): - def find(self): + async def find(self): """ Find the closest nodes. """ - return self._find(self.protocol.callFindNode) + return await self._find(self.protocol.callFindNode) - def _nodesFound(self, responses): + async def _nodesFound(self, responses): """ Handle the result of an iteration in _find. """ diff --git a/kademlia/log.py b/kademlia/log.py deleted file mode 100644 index e7573d2..0000000 --- a/kademlia/log.py +++ /dev/null @@ -1,63 +0,0 @@ -import sys -from twisted.python import log - -INFO = 5 -DEBUG = 4 -WARNING = 3 -ERROR = 2 -CRITICAL = 1 - - -class FileLogObserver(log.FileLogObserver): - def __init__(self, f=None, level=WARNING, default=DEBUG): - log.FileLogObserver.__init__(self, f or sys.stdout) - self.level = level - self.default = default - - - def emit(self, eventDict): - ll = eventDict.get('loglevel', self.default) - if eventDict['isError'] or 'failure' in eventDict or self.level >= ll: - log.FileLogObserver.emit(self, eventDict) - - -class Logger: - def __init__(self, **kwargs): - self.kwargs = kwargs - - def msg(self, message, **kw): - kw.update(self.kwargs) - if 'system' in kw and not isinstance(kw['system'], str): - kw['system'] = kw['system'].__class__.__name__ - log.msg(message, **kw) - - def info(self, message, **kw): - kw['loglevel'] = INFO - self.msg("[INFO] %s" % message, **kw) - - def debug(self, message, **kw): - kw['loglevel'] = DEBUG - self.msg("[DEBUG] %s" % message, **kw) - - def warning(self, message, **kw): - kw['loglevel'] = WARNING - self.msg("[WARNING] %s" % message, **kw) - - def error(self, message, **kw): - kw['loglevel'] = ERROR - self.msg("[ERROR] %s" % message, **kw) - - def critical(self, message, **kw): - kw['loglevel'] = CRITICAL - self.msg("[CRITICAL] %s" % message, **kw) - -try: - theLogger -except NameError: - theLogger = Logger() - msg = theLogger.msg - info = theLogger.info - debug = theLogger.debug - warning = theLogger.warning - error = theLogger.error - critical = theLogger.critical diff --git a/kademlia/network.py b/kademlia/network.py index 044df4c..7859f96 100644 --- a/kademlia/network.py +++ b/kademlia/network.py @@ -3,13 +3,11 @@ Package for interacting on the network at a high level. """ import random import pickle +import asyncio +from logging import getLogger -from twisted.internet.task import LoopingCall -from twisted.internet import defer, reactor, task - -from kademlia.log import Logger from kademlia.protocol import KademliaProtocol -from kademlia.utils import deferredDict, digest +from kademlia.utils import digest from kademlia.storage import ForgetfulStorage from kademlia.node import Node from kademlia.crawling import ValueSpiderCrawl @@ -34,25 +32,39 @@ class Server(object): """ self.ksize = ksize self.alpha = alpha - self.log = Logger(system=self) + self.log = getLogger("kademlia-server") self.storage = storage or ForgetfulStorage() self.node = Node(id or digest(random.getrandbits(255))) - self.protocol = KademliaProtocol(self.node, self.storage, ksize) - self.refreshLoop = LoopingCall(self.refreshTable).start(3600) + self.transport = None + self.protocol = None + self.refresh_loop = None - def listen(self, port, interface=""): + def stop(self): + if self.refresh_loop is not None: + self.refresh_loop.cancel() + + if self.transport is not None: + self.transport.close() + + def listen(self, port, interface='0.0.0.0'): """ Start listening on the given port. - This is the same as calling:: - - reactor.listenUDP(port, server.protocol) - Provide interface="::" to accept ipv6 address """ - return reactor.listenUDP(port, self.protocol, interface) + proto_factory = lambda: KademliaProtocol(self.node, self.storage, self.ksize) + loop = asyncio.get_event_loop() + listen = loop.create_datagram_endpoint(proto_factory, local_addr=(interface, port)) + self.transport, self.protocol = loop.run_until_complete(listen) + # finally, schedule refreshing table + self.refresh_table() - def refreshTable(self): + def refresh_table(self): + asyncio.ensure_future(self._refresh_table()) + loop = asyncio.get_event_loop() + self.refresh_loop = loop.call_later(3600, self.refresh_table) + + async def _refresh_table(self): """ Refresh buckets that haven't had any lookups in the last hour (per section 2.3 of the paper). @@ -64,14 +76,12 @@ class Server(object): spider = NodeSpiderCrawl(self.protocol, node, nearest) ds.append(spider.find()) - def republishKeys(_): - ds = [] - # Republish keys older than one hour - for key, value in self.storage.iteritemsOlderThan(3600): - ds.append(self.set(key, value)) - return defer.gatherResults(ds) + # do our crawling + await asyncio.gather(*ds) - return defer.gatherResults(ds).addCallback(republishKeys) + # now republish keys older than one hour + for key, value in self.storage.iteritemsOlderThan(3600): + await self.set(key, value) def bootstrappableNeighbors(self): """ @@ -86,7 +96,7 @@ class Server(object): neighbors = self.protocol.router.findNeighbors(self.node) return [ tuple(n)[-2:] for n in neighbors ] - def bootstrap(self, addrs): + async def bootstrap(self, addrs): """ Bootstrap the server by connecting to other known nodes in the network. @@ -94,22 +104,14 @@ class Server(object): addrs: A `list` of (ip, port) `tuple` pairs. Note that only IP addresses are acceptable - hostnames will cause an error. """ - # if the transport hasn't been initialized yet, wait a second - if self.protocol.transport is None: - return task.deferLater(reactor, 1, self.bootstrap, addrs) - - def initTable(results): - nodes = [] - for addr, result in results.items(): - if result[0]: - nodes.append(Node(result[1], addr[0], addr[1])) - spider = NodeSpiderCrawl(self.protocol, self.node, nodes, self.ksize, self.alpha) - return spider.find() - - ds = {} - for addr in addrs: - ds[addr] = self.protocol.ping(addr, self.node.id) - return deferredDict(ds).addCallback(initTable) + cos = list(map(self.bootstrap_node, addrs)) + nodes = [node for node in await asyncio.gather(*cos) if not node is None] + spider = NodeSpiderCrawl(self.protocol, self.node, nodes, self.ksize, self.alpha) + return await spider.find() + + async def bootstrap_node(self, addr): + result = await self.protocol.ping(addr, self.node.id) + return Node(result[1], addr[0], addr[1]) if result[0] else None def inetVisibleIP(self): """ @@ -128,7 +130,7 @@ class Server(object): ds.append(self.protocol.stun(neighbor)) return defer.gatherResults(ds).addCallback(handle) - def get(self, key): + async def get(self, key): """ Get a key if the network has it. @@ -138,16 +140,16 @@ class Server(object): dkey = digest(key) # if this node has it, return it if self.storage.get(dkey) is not None: - return defer.succeed(self.storage.get(dkey)) + return self.storage.get(dkey) node = Node(dkey) nearest = self.protocol.router.findNeighbors(node) if len(nearest) == 0: self.log.warning("There are no known neighbors to get key %s" % key) - return defer.succeed(None) + return None spider = ValueSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) - return spider.find() + return await spider.find() - def set(self, key, value): + async def set(self, key, value): """ Set the given key to the given value in the network. """ @@ -155,31 +157,21 @@ class Server(object): dkey = digest(key) node = Node(dkey) - def store(nodes): - self.log.info("setting '%s' on %s" % (key, map(str, nodes))) - # if this node is close too, then store here as well - if self.node.distanceTo(node) < max([n.distanceTo(node) for n in nodes]): - self.storage[dkey] = value - ds = [self.protocol.callStore(n, dkey, value) for n in nodes] - return defer.DeferredList(ds).addCallback(self._anyRespondSuccess) - nearest = self.protocol.router.findNeighbors(node) if len(nearest) == 0: self.log.warning("There are no known neighbors to set key %s" % key) - return defer.succeed(False) - spider = NodeSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) - return spider.find().addCallback(store) + return False - def _anyRespondSuccess(self, responses): - """ - Given the result of a DeferredList of calls to peers, ensure that at least - one of them was contacted and responded with a Truthy result. - """ - for deferSuccess, result in responses: - peerReached, peerResponse = result - if deferSuccess and peerReached and peerResponse: - return True - return False + spider = NodeSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) + nodes = await spider.find() + self.log.info("setting '%s' on %s" % (key, list(map(str, nodes)))) + + # if this node is close too, then store here as well + if self.node.distanceTo(node) < max([n.distanceTo(node) for n in nodes]): + self.storage[dkey] = value + ds = [self.protocol.callStore(n, dkey, value) for n in nodes] + # return true only if at least one store call succeeded + return any(await asyncio.gather(*ds)) def saveState(self, fname): """ diff --git a/kademlia/node.py b/kademlia/node.py index d1b25f7..ad54bdc 100644 --- a/kademlia/node.py +++ b/kademlia/node.py @@ -7,7 +7,7 @@ class Node: self.id = id self.ip = ip self.port = port - self.long_id = long(id.encode('hex'), 16) + self.long_id = int(id.hex(), 16) def sameHomeAs(self, node): return self.ip == node.ip and self.port == node.port diff --git a/kademlia/protocol.py b/kademlia/protocol.py index 884aa85..504d3a5 100644 --- a/kademlia/protocol.py +++ b/kademlia/protocol.py @@ -1,12 +1,10 @@ import random - -from twisted.internet import defer +from logging import getLogger from rpcudp.protocol import RPCProtocol from kademlia.node import Node from kademlia.routing import RoutingTable -from kademlia.log import Logger from kademlia.utils import digest @@ -16,7 +14,7 @@ class KademliaProtocol(RPCProtocol): self.router = RoutingTable(self, ksize, sourceNode) self.storage = storage self.sourceNode = sourceNode - self.log = Logger(system=self) + self.log = getLogger("kademlia-protocol") def getRefreshIDs(self): """ @@ -43,11 +41,11 @@ class KademliaProtocol(RPCProtocol): return True def rpc_find_node(self, sender, nodeid, key): - self.log.info("finding neighbors of %i in local table" % long(nodeid.encode('hex'), 16)) + self.log.info("finding neighbors of %i in local table" % int(nodeid.hex(), 16)) source = Node(nodeid, sender[0], sender[1]) self.welcomeIfNewNode(source) node = Node(key) - return map(tuple, self.router.findNeighbors(node, exclude=source)) + return list(map(tuple, self.router.findNeighbors(node, exclude=source))) def rpc_find_value(self, sender, nodeid, key): source = Node(nodeid, sender[0], sender[1]) @@ -57,25 +55,25 @@ class KademliaProtocol(RPCProtocol): return self.rpc_find_node(sender, nodeid, key) return { 'value': value } - def callFindNode(self, nodeToAsk, nodeToFind): + async def callFindNode(self, nodeToAsk, nodeToFind): address = (nodeToAsk.ip, nodeToAsk.port) - d = self.find_node(address, self.sourceNode.id, nodeToFind.id) - return d.addCallback(self.handleCallResponse, nodeToAsk) + result = await self.find_node(address, self.sourceNode.id, nodeToFind.id) + return self.handleCallResponse(result, nodeToAsk) - def callFindValue(self, nodeToAsk, nodeToFind): + async def callFindValue(self, nodeToAsk, nodeToFind): address = (nodeToAsk.ip, nodeToAsk.port) - d = self.find_value(address, self.sourceNode.id, nodeToFind.id) - return d.addCallback(self.handleCallResponse, nodeToAsk) + result = await self.find_value(address, self.sourceNode.id, nodeToFind.id) + return self.handleCallResponse(result, nodeToAsk) - def callPing(self, nodeToAsk): + async def callPing(self, nodeToAsk): address = (nodeToAsk.ip, nodeToAsk.port) - d = self.ping(address, self.sourceNode.id) - return d.addCallback(self.handleCallResponse, nodeToAsk) + result = await self.ping(address, self.sourceNode.id) + return self.handleCallResponse(result, nodeToAsk) - def callStore(self, nodeToAsk, key, value): + async def callStore(self, nodeToAsk, key, value): address = (nodeToAsk.ip, nodeToAsk.port) - d = self.store(address, self.sourceNode.id, key, value) - return d.addCallback(self.handleCallResponse, nodeToAsk) + result = await self.store(address, self.sourceNode.id, key, value) + return self.handleCallResponse(result, nodeToAsk) def welcomeIfNewNode(self, node): """ @@ -91,28 +89,30 @@ class KademliaProtocol(RPCProtocol): is closer than the closest in that list, then store the key/value on the new node (per section 2.5 of the paper) """ - if self.router.isNewNode(node): - ds = [] - for key, value in self.storage.iteritems(): - keynode = Node(digest(key)) - neighbors = self.router.findNeighbors(keynode) - if len(neighbors) > 0: - newNodeClose = node.distanceTo(keynode) < neighbors[-1].distanceTo(keynode) - thisNodeClosest = self.sourceNode.distanceTo(keynode) < neighbors[0].distanceTo(keynode) - if len(neighbors) == 0 or (newNodeClose and thisNodeClosest): - ds.append(self.callStore(node, key, value)) - self.router.addContact(node) - return defer.gatherResults(ds) + if not self.router.isNewNode(node): + return + + self.log.info("never seen %s before, adding to router and setting nearby " % node) + for key, value in self.storage.items(): + keynode = Node(digest(key)) + neighbors = self.router.findNeighbors(keynode) + if len(neighbors) > 0: + newNodeClose = node.distanceTo(keynode) < neighbors[-1].distanceTo(keynode) + thisNodeClosest = self.sourceNode.distanceTo(keynode) < neighbors[0].distanceTo(keynode) + if len(neighbors) == 0 or (newNodeClose and thisNodeClosest): + asyncio.ensure_future(self.callStore(node, key, value)) + self.router.addContact(node) def handleCallResponse(self, result, node): """ If we get a response, add the node to the routing table. If we get no response, make sure it's removed from the routing table. """ - if result[0]: - self.log.info("got response from %s, adding to router" % node) - self.welcomeIfNewNode(node) - else: - self.log.debug("no response from %s, removing from router" % node) + if not result[0]: + self.log.warning("no response from %s, removing from router" % node) self.router.removeContact(node) + return result + + self.log.info("got successful response from %s") + self.welcomeIfNewNode(node) return result diff --git a/kademlia/routing.py b/kademlia/routing.py index 5bde1bf..3f00e92 100644 --- a/kademlia/routing.py +++ b/kademlia/routing.py @@ -18,7 +18,7 @@ class KBucket(object): self.lastUpdated = time.time() def getNodes(self): - return self.nodes.values() + return list(self.nodes.values()) def split(self): midpoint = (self.range[0] + self.range[1]) / 2 @@ -68,7 +68,7 @@ class KBucket(object): return len(sp) def head(self): - return self.nodes.values()[0] + return list(self.nodes.values())[0] def __getitem__(self, id): return self.nodes.get(id, None) @@ -89,7 +89,7 @@ class TableTraverser(object): def __iter__(self): return self - def next(self): + def __next__(self): """ Pop an item from the left subtree, then right, then left, etc. """ @@ -99,12 +99,12 @@ class TableTraverser(object): if self.left and len(self.leftBuckets) > 0: self.currentNodes = self.leftBuckets.pop().getNodes() self.left = False - return self.next() + return next(self) if len(self.rightBuckets) > 0: self.currentNodes = self.rightBuckets.pop().getNodes() self.left = True - return self.next() + return next(self) raise StopIteration @@ -177,4 +177,4 @@ class RoutingTable(object): if len(nodes) == k: break - return map(operator.itemgetter(1), heapq.nsmallest(k, nodes)) + return list(map(operator.itemgetter(1), heapq.nsmallest(k, nodes))) diff --git a/kademlia/storage.py b/kademlia/storage.py index 3cc5e5e..ba2339f 100644 --- a/kademlia/storage.py +++ b/kademlia/storage.py @@ -1,15 +1,10 @@ import time -from itertools import izip -from itertools import imap from itertools import takewhile import operator from collections import OrderedDict -from zope.interface import implements -from zope.interface import Interface - -class IStorage(Interface): +class IStorage: """ Local storage for this node. """ @@ -18,31 +13,34 @@ class IStorage(Interface): """ Set a key to the given value. """ + raise NotImplementedError def __getitem__(key): """ Get the given key. If item doesn't exist, raises C{KeyError} """ + raise NotImplementedError def get(key, default=None): """ Get given key. If not found, return default. """ + raise NotImplementedError def iteritemsOlderThan(secondsOld): """ Return the an iterator over (key, value) tuples for items older than the given secondsOld. """ + raise NotImplementedError def iteritems(): """ Get the iterator for this storage, should yield tuple of (key, value) """ + raise NotImplementedError -class ForgetfulStorage(object): - implements(IStorage) - +class ForgetfulStorage(IStorage): def __init__(self, ttl=604800): """ By default, max age is a week. @@ -82,16 +80,16 @@ class ForgetfulStorage(object): minBirthday = time.time() - secondsOld zipped = self._tripleIterable() matches = takewhile(lambda r: minBirthday >= r[1], zipped) - return imap(operator.itemgetter(0, 2), matches) + return list(map(operator.itemgetter(0, 2), matches)) def _tripleIterable(self): - ikeys = self.data.iterkeys() - ibirthday = imap(operator.itemgetter(0), self.data.itervalues()) - ivalues = imap(operator.itemgetter(1), self.data.itervalues()) - return izip(ikeys, ibirthday, ivalues) + ikeys = self.data.keys() + ibirthday = map(operator.itemgetter(0), self.data.values()) + ivalues = map(operator.itemgetter(1), self.data.values()) + return zip(ikeys, ibirthday, ivalues) - def iteritems(self): + def items(self): self.cull() - ikeys = self.data.iterkeys() - ivalues = imap(operator.itemgetter(1), self.data.itervalues()) - return izip(ikeys, ivalues) + ikeys = self.data.keys() + ivalues = map(operator.itemgetter(1), self.data.values()) + return zip(ikeys, ivalues) diff --git a/kademlia/utils.py b/kademlia/utils.py index 63209e8..7528703 100644 --- a/kademlia/utils.py +++ b/kademlia/utils.py @@ -3,41 +3,21 @@ General catchall for functions that don't make sense as methods. """ import hashlib import operator +import asyncio -from twisted.internet import defer + +async def gather_dict(d): + cors = list(d.values()) + results = await asyncio.gather(*cors) + return dict(zip(d.keys(), results)) def digest(s): - if not isinstance(s, str): - s = str(s) + if not isinstance(s, bytes): + s = str(s).encode('utf8') return hashlib.sha1(s).digest() -def deferredDict(d): - """ - Just like a :class:`defer.DeferredList` but instead accepts and returns a :class:`dict`. - - Args: - d: A :class:`dict` whose values are all :class:`defer.Deferred` objects. - - Returns: - :class:`defer.DeferredList` whose callback will be given a dictionary whose - keys are the same as the parameter :obj:`d` and whose values are the results - of each individual deferred call. - """ - if len(d) == 0: - return defer.succeed({}) - - def handle(results, names): - rvalue = {} - for index in range(len(results)): - rvalue[names[index]] = results[index][1] - return rvalue - - dl = defer.DeferredList(d.values()) - return dl.addCallback(handle, d.keys()) - - class OrderedSet(list): """ Acts like a list in all ways, except in the behavior of the :meth:`push` method. diff --git a/setup.py b/setup.py index b7453c3..f62a11c 100755 --- a/setup.py +++ b/setup.py @@ -1,16 +1,15 @@ #!/usr/bin/env python from setuptools import setup, find_packages -from kademlia import version +import kademlia setup( name="kademlia", - version=version, + version=kademlia.__version__, description="Kademlia is a distributed hash table for decentralized peer-to-peer computer networks.", author="Brian Muller", author_email="bamuller@gmail.com", license="MIT", url="http://github.com/bmuller/kademlia", packages=find_packages(), - requires=["twisted", "rpcudp"], - install_requires=['twisted>=14.0', "rpcudp>=1.0"] + install_requires=["rpcudp>=3.0.0"] )