diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..b169104 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,534 @@ +[MASTER] + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code +extension-pkg-whitelist= + +# Add files or directories to the blacklist. They should be base names, not +# paths. +ignore=CVS + +# Add files or directories matching the regex patterns to the blacklist. The +# regex matches against base names, not paths. +ignore-patterns= + +# Python code to execute, usually for sys.path manipulation such as +# pygtk.require(). +#init-hook= + +# Use multiple processes to speed up Pylint. +jobs=1 + +# List of plugins (as comma separated values of python modules names) to load, +# usually to register additional checkers. +load-plugins= + +# Pickle collected data for later comparisons. +persistent=yes + +# Specify a configuration file. +rcfile=.pylintrc + +# When enabled, pylint would attempt to guess common misconfiguration and emit +# user-friendly hints instead of false-positive error messages +suggestion-mode=yes + +# Allow loading of arbitrary C extensions. Extensions are imported into the +# active Python interpreter and may run arbitrary code. +unsafe-load-any-extension=no + + +[MESSAGES CONTROL] + +# Only show warnings with the listed confidence levels. Leave empty to show +# all. Valid levels: HIGH, INFERENCE, INFERENCE_FAILURE, UNDEFINED +confidence= + +# Disable the message, report, category or checker with the given id(s). You +# can either give multiple identifiers separated by comma (,) or put this +# option multiple times (only on the command line, not in the configuration +# file where it should appear only once).You can also use "--disable=all" to +# disable everything first and then reenable specific checks. For example, if +# you want to run only the similarities checker, you can use "--disable=all +# --enable=similarities". If you want to run only the classes checker, but have +# no Warning level messages displayed, use"--disable=all --enable=classes +# --disable=W" +disable=print-statement, + parameter-unpacking, + unpacking-in-except, + old-raise-syntax, + backtick, + long-suffix, + old-ne-operator, + old-octal-literal, + import-star-module-level, + non-ascii-bytes-literal, + raw-checker-failed, + bad-inline-option, + locally-disabled, + locally-enabled, + file-ignored, + suppressed-message, + useless-suppression, + deprecated-pragma, + apply-builtin, + basestring-builtin, + buffer-builtin, + cmp-builtin, + coerce-builtin, + execfile-builtin, + file-builtin, + long-builtin, + raw_input-builtin, + reduce-builtin, + standarderror-builtin, + unicode-builtin, + xrange-builtin, + coerce-method, + delslice-method, + getslice-method, + setslice-method, + no-absolute-import, + old-division, + dict-iter-method, + dict-view-method, + next-method-called, + metaclass-assignment, + indexing-exception, + raising-string, + reload-builtin, + oct-method, + hex-method, + nonzero-method, + cmp-method, + input-builtin, + round-builtin, + intern-builtin, + unichr-builtin, + map-builtin-not-iterating, + zip-builtin-not-iterating, + range-builtin-not-iterating, + filter-builtin-not-iterating, + using-cmp-argument, + eq-without-hash, + div-method, + idiv-method, + rdiv-method, + exception-message-attribute, + invalid-str-codec, + sys-max-int, + bad-python3-import, + deprecated-string-function, + deprecated-str-translate-call, + deprecated-itertools-function, + deprecated-types-field, + next-method-defined, + dict-items-not-iterating, + dict-keys-not-iterating, + dict-values-not-iterating, + missing-docstring + +# Enable the message, report, category or checker with the given id(s). You can +# either give multiple identifier separated by comma (,) or put this option +# multiple time (only on the command line, not in the configuration file where +# it should appear only once). See also the "--disable" option for examples. +enable=c-extension-no-member + + +[REPORTS] + +# Python expression which should return a note less than 10 (10 is the highest +# note). You have access to the variables errors warning, statement which +# respectively contain the number of errors / warnings messages and the total +# number of statements analyzed. This is used by the global evaluation report +# (RP0004). +evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10) + +# Template used to display messages. This is a python new-style format string +# used to format the message information. See doc for all details +#msg-template= + +# Set the output format. Available formats are text, parseable, colorized, json +# and msvs (visual studio).You can also give a reporter class, eg +# mypackage.mymodule.MyReporterClass. +output-format=text + +# Tells whether to display a full report or only the messages +reports=no + +# Activate the evaluation score. +score=no + + +[REFACTORING] + +# Maximum number of nested blocks for function / method body +max-nested-blocks=5 + + +[LOGGING] + +# Logging modules to check that the string format arguments are in logging +# function parameter format +logging-modules=logging + + +[SPELLING] + +# Limits count of emitted suggestions for spelling mistakes +max-spelling-suggestions=4 + +# Spelling dictionary name. Available dictionaries: none. To make it working +# install python-enchant package. +spelling-dict= + +# List of comma separated words that should not be checked. +spelling-ignore-words= + +# A path to a file that contains private dictionary; one word per line. +spelling-private-dict-file= + +# Tells whether to store unknown words to indicated private dictionary in +# --spelling-private-dict-file option instead of raising a message. +spelling-store-unknown-words=no + + +[MISCELLANEOUS] + +# List of note tags to take in consideration, separated by a comma. +notes=FIXME, + XXX, + TODO + + +[TYPECHECK] + +# List of decorators that produce context managers, such as +# contextlib.contextmanager. Add to this list to register other decorators that +# produce valid context managers. +contextmanager-decorators=contextlib.contextmanager + +# List of members which are set dynamically and missed by pylint inference +# system, and so shouldn't trigger E1101 when accessed. Python regular +# expressions are accepted. +generated-members= + +# Tells whether missing members accessed in mixin class should be ignored. A +# mixin class is detected if its name ends with "mixin" (case insensitive). +ignore-mixin-members=yes + +# This flag controls whether pylint should warn about no-member and similar +# checks whenever an opaque object is returned when inferring. The inference +# can return multiple potential results while evaluating a Python object, but +# some branches might not be evaluated, which results in partial inference. In +# that case, it might be useful to still emit no-member and other checks for +# the rest of the inferred objects. +ignore-on-opaque-inference=yes + +# List of class names for which member attributes should not be checked (useful +# for classes with dynamically set attributes). This supports the use of +# qualified names. +ignored-classes=optparse.Values,thread._local,_thread._local + +# List of module names for which member attributes should not be checked +# (useful for modules/projects where namespaces are manipulated during runtime +# and thus existing member attributes cannot be deduced by static analysis. It +# supports qualified module names, as well as Unix pattern matching. +ignored-modules= + +# Show a hint with possible names when a member name was not found. The aspect +# of finding the hint is based on edit distance. +missing-member-hint=yes + +# The minimum edit distance a name should have in order to be considered a +# similar match for a missing member name. +missing-member-hint-distance=1 + +# The total number of similar names that should be taken in consideration when +# showing a hint for a missing member. +missing-member-max-choices=1 + + +[VARIABLES] + +# List of additional names supposed to be defined in builtins. Remember that +# you should avoid to define new builtins when possible. +additional-builtins= + +# Tells whether unused global variables should be treated as a violation. +allow-global-unused-variables=yes + +# List of strings which can identify a callback function by name. A callback +# name must start or end with one of those strings. +callbacks=cb_, + _cb + +# A regular expression matching the name of dummy variables (i.e. expectedly +# not used). +dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_ + +# Argument names that match this expression will be ignored. Default to name +# with leading underscore +ignored-argument-names=_.*|^ignored_|^unused_ + +# Tells whether we should check for unused import in __init__ files. +init-import=no + +# List of qualified module names which can have objects that can redefine +# builtins. +redefining-builtins-modules=six.moves,past.builtins,future.builtins + + +[FORMAT] + +# Expected format of line ending, e.g. empty (any line ending), LF or CRLF. +expected-line-ending-format= + +# Regexp for a line that is allowed to be longer than the limit. +ignore-long-lines=^\s*(# )??$ + +# Number of spaces of indent required inside a hanging or continued line. +indent-after-paren=4 + +# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1 +# tab). +indent-string=' ' + +# Maximum number of characters on a single line. +max-line-length=100 + +# Maximum number of lines in a module +max-module-lines=1000 + +# List of optional constructs for which whitespace checking is disabled. `dict- +# separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}. +# `trailing-comma` allows a space between comma and closing bracket: (a, ). +# `empty-line` allows space-only lines. +no-space-check=trailing-comma, + dict-separator + +# Allow the body of a class to be on the same line as the declaration if body +# contains single statement. +single-line-class-stmt=no + +# Allow the body of an if to be on the same line as the test if there is no +# else. +single-line-if-stmt=no + + +[SIMILARITIES] + +# Ignore comments when computing similarities. +ignore-comments=yes + +# Ignore docstrings when computing similarities. +ignore-docstrings=yes + +# Ignore imports when computing similarities. +ignore-imports=no + +# Minimum lines number of a similarity. +min-similarity-lines=4 + + +[BASIC] + +# Naming style matching correct argument names +argument-naming-style=snake_case + +# Regular expression matching correct argument names. Overrides argument- +# naming-style +#argument-rgx= + +# Naming style matching correct attribute names +attr-naming-style=snake_case + +# Regular expression matching correct attribute names. Overrides attr-naming- +# style +#attr-rgx= + +# Bad variable names which should always be refused, separated by a comma +bad-names=foo, + bar, + baz, + toto, + tutu, + tata + +# Naming style matching correct class attribute names +class-attribute-naming-style=any + +# Regular expression matching correct class attribute names. Overrides class- +# attribute-naming-style +#class-attribute-rgx= + +# Naming style matching correct class names +class-naming-style=PascalCase + +# Regular expression matching correct class names. Overrides class-naming-style +#class-rgx= + +# Naming style matching correct constant names +const-naming-style=UPPER_CASE + +# Regular expression matching correct constant names. Overrides const-naming- +# style +#const-rgx= + +# Minimum line length for functions/classes that require docstrings, shorter +# ones are exempt. +docstring-min-length=-1 + +# Naming style matching correct function names +function-naming-style=snake_case + +# Regular expression matching correct function names. Overrides function- +# naming-style +#function-rgx= + +# Good variable names which should always be accepted, separated by a comma +good-names=i, + j, + k, + ex, + Run, + _ + +# Include a hint for the correct naming format with invalid-name +include-naming-hint=no + +# Naming style matching correct inline iteration names +inlinevar-naming-style=any + +# Regular expression matching correct inline iteration names. Overrides +# inlinevar-naming-style +#inlinevar-rgx= + +# Naming style matching correct method names +method-naming-style=snake_case + +# Regular expression matching correct method names. Overrides method-naming- +# style +#method-rgx= + +# Naming style matching correct module names +module-naming-style=snake_case + +# Regular expression matching correct module names. Overrides module-naming- +# style +#module-rgx= + +# Colon-delimited sets of names that determine each other's naming style when +# the name regexes allow several styles. +name-group= + +# Regular expression which should only match function or class names that do +# not require a docstring. +no-docstring-rgx=^_ + +# List of decorators that produce properties, such as abc.abstractproperty. Add +# to this list to register other decorators that produce valid properties. +property-classes=abc.abstractproperty + +# Naming style matching correct variable names +variable-naming-style=snake_case + +# Regular expression matching correct variable names. Overrides variable- +# naming-style +#variable-rgx= + + +[IMPORTS] + +# Allow wildcard imports from modules that define __all__. +allow-wildcard-with-all=no + +# Analyse import fallback blocks. This can be used to support both Python 2 and +# 3 compatible code, which means that the block might have code that exists +# only in one or another interpreter, leading to false positives when analysed. +analyse-fallback-blocks=no + +# Deprecated modules which should not be used, separated by a comma +deprecated-modules=optparse,tkinter.tix + +# Create a graph of external dependencies in the given file (report RP0402 must +# not be disabled) +ext-import-graph= + +# Create a graph of every (i.e. internal and external) dependencies in the +# given file (report RP0402 must not be disabled) +import-graph= + +# Create a graph of internal dependencies in the given file (report RP0402 must +# not be disabled) +int-import-graph= + +# Force import order to recognize a module as part of the standard +# compatibility libraries. +known-standard-library= + +# Force import order to recognize a module as part of a third party library. +known-third-party=enchant + + +[CLASSES] + +# List of method names used to declare (i.e. assign) instance attributes. +defining-attr-methods=__init__, + __new__, + setUp + +# List of member names, which should be excluded from the protected access +# warning. +exclude-protected=_asdict, + _fields, + _replace, + _source, + _make + +# List of valid names for the first argument in a class method. +valid-classmethod-first-arg=cls + +# List of valid names for the first argument in a metaclass class method. +valid-metaclass-classmethod-first-arg=mcs + + +[DESIGN] + +# Maximum number of arguments for function / method +max-args=5 + +# Maximum number of attributes for a class (see R0902). +max-attributes=7 + +# Maximum number of boolean expressions in a if statement +max-bool-expr=5 + +# Maximum number of branch for function / method body +max-branches=12 + +# Maximum number of locals for function / method body +max-locals=15 + +# Maximum number of parents for a class (see R0901). +max-parents=7 + +# Maximum number of public methods for a class (see R0904). +max-public-methods=20 + +# Maximum number of return / yield for function / method body +max-returns=6 + +# Maximum number of statements in function / method body +max-statements=50 + +# Minimum number of public methods for a class (see R0903). +min-public-methods=2 + + +[EXCEPTIONS] + +# Exceptions that will emit a warning when being caught. Defaults to +# "Exception" +overgeneral-exceptions=Exception diff --git a/.travis.yml b/.travis.yml index 0d15646..f9b00f8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: python python: - - "2.7" - - "pypy" -install: pip install . pep8 pyflakes -script: make test + - "3.5" + - "3.6" +install: pip install . && pip install -r dev-requirements.txt +script: python -m unittest diff --git a/LICENSE b/LICENSE index b82fd50..f5e33d9 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2014 Brian Muller +Copyright (c) 2018 Brian Muller Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the diff --git a/Makefile b/Makefile deleted file mode 100644 index df9280c..0000000 --- a/Makefile +++ /dev/null @@ -1,11 +0,0 @@ -PYDOCTOR=pydoctor - -test: lint - trial kademlia - -lint: - pep8 --ignore=E303,E251,E201,E202 ./kademlia --max-line-length=140 - find ./kademlia -name '*.py' | xargs pyflakes - -install: - python setup.py install diff --git a/README.markdown b/README.markdown deleted file mode 100644 index 6225cfe..0000000 --- a/README.markdown +++ /dev/null @@ -1,67 +0,0 @@ -# Python Distributed Hash Table -[![Build Status](https://secure.travis-ci.org/bmuller/kademlia.png?branch=master)](https://travis-ci.org/bmuller/kademlia) -[![Docs Status](https://readthedocs.org/projects/kademlia/badge/?version=latest)](http://kademlia.readthedocs.org) - -**Documentation can be found at [kademlia.readthedocs.org](http://kademlia.readthedocs.org/).** - -This library is an asynchronous Python implementation of the [Kademlia distributed hash table](http://en.wikipedia.org/wiki/Kademlia). It uses [Twisted](https://twistedmatrix.com) to provide asynchronous communication. The nodes communicate using [RPC over UDP](https://github.com/bmuller/rpcudp) to communiate, meaning that it is capable of working behind a [NAT](http://en.wikipedia.org/wiki/NAT). - -This library aims to be as close to a reference implementation of the [Kademlia paper](http://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf) as possible. - -## Installation - -``` -pip install kademlia -``` - -## Usage -*This assumes you have a working familiarity with [Twisted](https://twistedmatrix.com).* - -Assuming you want to connect to an existing network (run the standalone server example below if you don't have a network): - -```python -from twisted.internet import reactor -from twisted.python import log -from kademlia.network import Server -import sys - -# log to std out -log.startLogging(sys.stdout) - -def quit(result): - print "Key result:", result - reactor.stop() - -def get(result, server): - return server.get("a key").addCallback(quit) - -def done(found, server): - log.msg("Found nodes: %s" % found) - return server.set("a key", "a value").addCallback(get, server) - -server = Server() -# next line, or use reactor.listenUDP(5678, server.protocol) -server.listen(5678) -server.bootstrap([('127.0.0.1', 1234)]).addCallback(done, server) - -reactor.run() -``` - -Check out the examples folder for other examples. - -## Stand-alone Server -If all you want to do is run a local server, just start the example server: - -``` -twistd -noy examples/server.tac -``` - -## Running Tests -To run tests: - -``` -trial kademlia -``` - -## Fidelity to Original Paper -The current implementation should be an accurate implementation of all aspects of the paper save one - in Section 2.3 there is the requirement that the original publisher of a key/value republish it every 24 hours. This library does not do this (though you can easily do this manually). diff --git a/README.md b/README.md new file mode 100644 index 0000000..05ae813 --- /dev/null +++ b/README.md @@ -0,0 +1,69 @@ +# Python Distributed Hash Table +[![Build Status](https://secure.travis-ci.org/bmuller/kademlia.png?branch=master)](https://travis-ci.org/bmuller/kademlia) +[![Docs Status](https://readthedocs.org/projects/kademlia/badge/?version=latest)](http://kademlia.readthedocs.org) + +**Documentation can be found at [kademlia.readthedocs.org](http://kademlia.readthedocs.org/).** + +This library is an asynchronous Python implementation of the [Kademlia distributed hash table](http://en.wikipedia.org/wiki/Kademlia). It uses the [asyncio library](https://docs.python.org/3/library/asyncio.html) in Python 3 to provide asynchronous communication. The nodes communicate using [RPC over UDP](https://github.com/bmuller/rpcudp) to communiate, meaning that it is capable of working behind a [NAT](http://en.wikipedia.org/wiki/NAT). + +This library aims to be as close to a reference implementation of the [Kademlia paper](http://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf) as possible. + +## Installation + +``` +pip install kademlia +``` + +## Usage +*This assumes you have a working familiarity with [asyncio](https://docs.python.org/3/library/asyncio.html).* + +Assuming you want to connect to an existing network: + +```python +import asyncio +from kademlia.network import Server + +# Create a node and start listening on port 5678 +node = Server() +node.listen(5678) + +# Bootstrap the node by connecting to other known nodes, in this case +# replace 123.123.123.123 with the IP of another node and optionally +# give as many ip/port combos as you can for other nodes. +loop = asyncio.get_event_loop() +loop.run_until_complete(node.bootstrap([("123.123.123.123", 5678)])) + +# set a value for the key "my-key" on the network +loop.run_until_complete(node.set("my-key", "my awesome value")) + +# get the value associated with "my-key" from the network +result = loop.run_until_complete(node.get("my-key")) +print(result) +``` + +## Initializing a Network +If you're starting a new network from scratch, just omit the `node.bootstrap` call in the example above. Then, bootstrap other nodes by connecting to the first node you started. + +See the examples folder for a first node example that other nodes can bootstrap connect to and some code that gets and sets a key/value. + +## Logging +This library uses the standard [Python logging library](https://docs.python.org/3/library/logging.html). To see debut output printed to STDOUT, for instance, use: + +```python +import logging + +log = logging.getLogger('kademlia') +log.setLevel(logging.DEBUG) +log.addHandler(logging.StreamHandler()) +``` + +## Running Tests +To run tests: + +``` +pip install -r dev-requirements.txt +python -m unittest +``` + +## Fidelity to Original Paper +The current implementation should be an accurate implementation of all aspects of the paper save one - in Section 2.3 there is the requirement that the original publisher of a key/value republish it every 24 hours. This library does not do this (though you can easily do this manually). diff --git a/dev-requirements.txt b/dev-requirements.txt new file mode 100644 index 0000000..9f1aa60 --- /dev/null +++ b/dev-requirements.txt @@ -0,0 +1,5 @@ +pycodestyle==2.3.1 +pylint==1.8.1 +sphinx>=1.6.5 +sphinxcontrib-napoleon>=0.6.1 +sphinxcontrib-zopeext>=0.2.1 diff --git a/docs/_static/.gitkeep b/docs/_static/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/docs/conf.py b/docs/conf.py index 12a05bd..6092ae7 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -50,7 +50,7 @@ master_doc = 'index' # General information about the project. project = u'Kademlia' -copyright = u'2015, Brian Muller' +copyright = u'2018, Brian Muller' # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the @@ -59,7 +59,7 @@ copyright = u'2015, Brian Muller' # The short X.Y version. sys.path.insert(0, os.path.abspath('..')) import kademlia -version = kademlia.version +version = kademlia.__version__ # The full version, including alpha/beta/rc tags. release = version diff --git a/docs/index.rst b/docs/index.rst index 6148bba..f8f4758 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -7,13 +7,13 @@ Kademlia Documentation ====================== .. note :: - This library assumes you have a working familiarity with Twisted_. + This library assumes you have a working familiarity with asyncio_. -This library is an asynchronous Python implementation of the `Kademlia distributed hash table `_. It uses Twisted_ to provide asynchronous communication. The nodes communicate using `RPC over UDP `_ to communiate, meaning that it is capable of working behind a `NAT `_. +This library is an asynchronous Python implementation of the `Kademlia distributed hash table `_. It uses asyncio_ to provide asynchronous communication. The nodes communicate using `RPC over UDP `_ to communiate, meaning that it is capable of working behind a `NAT `_. This library aims to be as close to a reference implementation of the `Kademlia paper `_ as possible. -.. _Twisted: https://twistedmatrix.com +.. _asyncio: https://docs.python.org/3/library/asyncio.html .. toctree:: :maxdepth: 3 diff --git a/docs/intro.rst b/docs/intro.rst index 9929d37..07110a4 100644 --- a/docs/intro.rst +++ b/docs/intro.rst @@ -8,29 +8,25 @@ The easiest (and best) way to install kademlia is through `pip =14.0 -rpcudp>=1.0 -sphinxcontrib-napoleon>=0.2.8 -sphinx==1.2.3 -sphinxcontrib-zopeext>=0.2.1 diff --git a/docs/source/kademlia.rst b/docs/source/kademlia.rst index 8934d0c..be6275d 100644 --- a/docs/source/kademlia.rst +++ b/docs/source/kademlia.rst @@ -11,14 +11,6 @@ kademlia.crawling module :undoc-members: :show-inheritance: -kademlia.log module -------------------- - -.. automodule:: kademlia.log - :members: - :undoc-members: - :show-inheritance: - kademlia.network module ----------------------- diff --git a/examples/example.py b/examples/example.py deleted file mode 100644 index 54b6beb..0000000 --- a/examples/example.py +++ /dev/null @@ -1,22 +0,0 @@ -from twisted.internet import reactor -from twisted.python import log -from kademlia.network import Server -import sys - -log.startLogging(sys.stdout) - -def done(result): - print "Key result:", result - reactor.stop() - -def setDone(result, server): - server.get("a key").addCallback(done) - -def bootstrapDone(found, server): - server.set("a key", "a value").addCallback(setDone, server) - -server = Server() -server.listen(8468) -server.bootstrap([("1.2.3.4", 8468)]).addCallback(bootstrapDone, server) - -reactor.run() diff --git a/examples/first_node.py b/examples/first_node.py new file mode 100644 index 0000000..2800f0d --- /dev/null +++ b/examples/first_node.py @@ -0,0 +1,25 @@ +import logging +import asyncio + +from kademlia.network import Server + +handler = logging.StreamHandler() +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) +log = logging.getLogger('kademlia') +log.addHandler(handler) +log.setLevel(logging.DEBUG) + +server = Server() +server.listen(8468) + +loop = asyncio.get_event_loop() +loop.set_debug(True) + +try: + loop.run_forever() +except KeyboardInterrupt: + pass +finally: + server.stop() + loop.close() diff --git a/examples/get.py b/examples/get.py new file mode 100644 index 0000000..b98c5b4 --- /dev/null +++ b/examples/get.py @@ -0,0 +1,29 @@ +import logging +import asyncio +import sys + +from kademlia.network import Server + +if len(sys.argv) != 4: + print("Usage: python get.py ") + sys.exit(1) + +handler = logging.StreamHandler() +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) +log = logging.getLogger('kademlia') +log.addHandler(handler) +log.setLevel(logging.DEBUG) + +loop = asyncio.get_event_loop() +loop.set_debug(True) + +server = Server() +server.listen(8469) +bootstrap_node = (sys.argv[1], int(sys.argv[2])) +loop.run_until_complete(server.bootstrap([bootstrap_node])) +result = loop.run_until_complete(server.get(sys.argv[3])) +server.stop() +loop.close() + +print("Get result:", result) diff --git a/examples/query.py b/examples/query.py deleted file mode 100644 index 18e94e5..0000000 --- a/examples/query.py +++ /dev/null @@ -1,33 +0,0 @@ -from twisted.internet import reactor -from twisted.python import log -from kademlia.network import Server -import sys - -log.startLogging(sys.stdout) - -if len(sys.argv) != 4: - print "Usage: python query.py " - sys.exit(1) - -ip = sys.argv[1] -port = int(sys.argv[2]) -key = sys.argv[3] - -print "Getting %s (with bootstrap %s:%i)" % (key, ip, port) - -def done(result): - print "Key result:" - print result - reactor.stop() - -def bootstrapDone(found, server, key): - if len(found) == 0: - print "Could not connect to the bootstrap server." - reactor.stop() - server.get(key).addCallback(done) - -server = Server() -server.listen(port) -server.bootstrap([(ip, port)]).addCallback(bootstrapDone, server, key) - -reactor.run() diff --git a/examples/server.tac b/examples/server.tac deleted file mode 100644 index 8973fbc..0000000 --- a/examples/server.tac +++ /dev/null @@ -1,21 +0,0 @@ -from twisted.application import service, internet -from twisted.python.log import ILogObserver -from twisted.internet import reactor, task - -import sys, os -sys.path.append(os.path.dirname(__file__)) -from kademlia.network import Server -from kademlia import log - -application = service.Application("kademlia") -application.setComponent(ILogObserver, log.FileLogObserver(sys.stdout, log.INFO).emit) - -if os.path.isfile('cache.pickle'): - kserver = Server.loadState('cache.pickle') -else: - kserver = Server() - kserver.bootstrap([("1.2.3.4", 8468)]) -kserver.saveStateRegularly('cache.pickle', 10) - -server = internet.UDPServer(8468, kserver.protocol) -server.setServiceParent(application) diff --git a/examples/set.py b/examples/set.py new file mode 100644 index 0000000..1b2aae2 --- /dev/null +++ b/examples/set.py @@ -0,0 +1,27 @@ +import logging +import asyncio +import sys + +from kademlia.network import Server + +if len(sys.argv) != 5: + print("Usage: python set.py ") + sys.exit(1) + +handler = logging.StreamHandler() +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) +log = logging.getLogger('kademlia') +log.addHandler(handler) +log.setLevel(logging.DEBUG) + +loop = asyncio.get_event_loop() +loop.set_debug(True) + +server = Server() +server.listen(8469) +bootstrap_node = (sys.argv[1], int(sys.argv[2])) +loop.run_until_complete(server.bootstrap([bootstrap_node])) +loop.run_until_complete(server.set(sys.argv[3], sys.argv[4])) +server.stop() +loop.close() diff --git a/examples/webserver.tac b/examples/webserver.tac deleted file mode 100644 index 016edc7..0000000 --- a/examples/webserver.tac +++ /dev/null @@ -1,59 +0,0 @@ -from twisted.application import service, internet -from twisted.python.log import ILogObserver -from twisted.python import log -from twisted.internet import reactor, task -from twisted.web import resource, server -from twisted.web.resource import NoResource - -import sys, os -sys.path.append(os.path.dirname(__file__)) -from kademlia.network import Server -from kademlia import log - -application = service.Application("kademlia") -application.setComponent(ILogObserver, log.FileLogObserver(sys.stdout, log.INFO).emit) - -if os.path.isfile('cache.pickle'): - kserver = Server.loadState('cache.pickle') -else: - kserver = Server() - kserver.bootstrap([("1.2.3.4", 8468)]) -kserver.saveStateRegularly('cache.pickle', 10) - -udpserver = internet.UDPServer(8468, kserver.protocol) -udpserver.setServiceParent(application) - -class WebResource(resource.Resource): - def __init__(self, kserver): - resource.Resource.__init__(self) - self.kserver = kserver - - def getChild(self, child, request): - return self - - def render_GET(self, request): - def respond(value): - value = value or NoResource().render(request) - request.write(value) - request.finish() - log.msg("Getting key: %s" % request.path.split('/')[-1]) - d = self.kserver.get(request.path.split('/')[-1]) - d.addCallback(respond) - return server.NOT_DONE_YET - - def render_POST(self, request): - key = request.path.split('/')[-1] - value = request.content.getvalue() - log.msg("Setting %s = %s" % (key, value)) - self.kserver.set(key, value) - return value - -website = server.Site(WebResource(kserver)) -webserver = internet.TCPServer(8080, website) -webserver.setServiceParent(application) - - -# To test, you can set with: -# $> curl --data "hi there" http://localhost:8080/one -# and get with: -# $> curl http://localhost:8080/one diff --git a/kademlia/__init__.py b/kademlia/__init__.py index 00c3bce..9b33e22 100644 --- a/kademlia/__init__.py +++ b/kademlia/__init__.py @@ -1,5 +1,5 @@ """ -Kademlia is a Python implementation of the Kademlia protocol for `Twisted `_. +Kademlia is a Python implementation of the Kademlia protocol which +utilizes the asyncio library. """ -version_info = (0, 6) -version = '.'.join(map(str, version_info)) +__version__ = "1.0" diff --git a/kademlia/crawling.py b/kademlia/crawling.py index 0206e89..b1460c2 100644 --- a/kademlia/crawling.py +++ b/kademlia/crawling.py @@ -1,8 +1,10 @@ from collections import Counter +import logging -from kademlia.log import Logger -from kademlia.utils import deferredDict from kademlia.node import Node, NodeHeap +from kademlia.utils import gather_dict + +log = logging.getLogger(__name__) class SpiderCrawl(object): @@ -15,8 +17,10 @@ class SpiderCrawl(object): Args: protocol: A :class:`~kademlia.protocol.KademliaProtocol` instance. - node: A :class:`~kademlia.node.Node` representing the key we're looking for - peers: A list of :class:`~kademlia.node.Node` instances that provide the entry point for the network + node: A :class:`~kademlia.node.Node` representing the key we're + looking for + peers: A list of :class:`~kademlia.node.Node` instances that + provide the entry point for the network ksize: The value for k based on the paper alpha: The value for alpha based on the paper """ @@ -26,12 +30,10 @@ class SpiderCrawl(object): self.node = node self.nearest = NodeHeap(self.node, self.ksize) self.lastIDsCrawled = [] - self.log = Logger(system=self) - self.log.info("creating spider with peers: %s" % peers) + log.info("creating spider with peers: %s", peers) self.nearest.push(peers) - - def _find(self, rpcmethod): + async def _find(self, rpcmethod): """ Get either a value or list of nodes. @@ -41,16 +43,15 @@ class SpiderCrawl(object): The process: 1. calls find_* to current ALPHA nearest not already queried nodes, adding results to current nearest list of k nodes. - 2. current nearest list needs to keep track of who has been queried already - sort by nearest, keep KSIZE + 2. current nearest list needs to keep track of who has been queried + already sort by nearest, keep KSIZE 3. if list is same as last time, next call should be to everyone not yet queried 4. repeat, unless nearest list has all been queried, then ur done """ - self.log.info("crawling with nearest: %s" % str(tuple(self.nearest))) + log.info("crawling network with nearest: %s", str(tuple(self.nearest))) count = self.alpha if self.nearest.getIDs() == self.lastIDsCrawled: - self.log.info("last iteration same as current - checking all in list now") count = len(self.nearest) self.lastIDsCrawled = self.nearest.getIDs() @@ -58,7 +59,11 @@ class SpiderCrawl(object): for peer in self.nearest.getUncontacted()[:count]: ds[peer.id] = rpcmethod(peer, self.node) self.nearest.markContacted(peer) - return deferredDict(ds).addCallback(self._nodesFound) + found = await gather_dict(ds) + return await self._nodesFound(found) + + async def _nodesFound(self, responses): + raise NotImplementedError class ValueSpiderCrawl(SpiderCrawl): @@ -68,13 +73,13 @@ class ValueSpiderCrawl(SpiderCrawl): # section 2.3 so we can set the key there if found self.nearestWithoutValue = NodeHeap(self.node, 1) - def find(self): + async def find(self): """ Find either the closest nodes or the value requested. """ - return self._find(self.protocol.callFindValue) + return await self._find(self.protocol.callFindValue) - def _nodesFound(self, responses): + async def _nodesFound(self, responses): """ Handle the result of an iteration in _find. """ @@ -93,13 +98,13 @@ class ValueSpiderCrawl(SpiderCrawl): self.nearest.remove(toremove) if len(foundValues) > 0: - return self._handleFoundValues(foundValues) + return await self._handleFoundValues(foundValues) if self.nearest.allBeenContacted(): # not found! return None - return self.find() + return await self.find() - def _handleFoundValues(self, values): + async def _handleFoundValues(self, values): """ We got some values! Exciting. But let's make sure they're all the same or freak out a little bit. Also, @@ -108,25 +113,24 @@ class ValueSpiderCrawl(SpiderCrawl): """ valueCounts = Counter(values) if len(valueCounts) != 1: - args = (self.node.long_id, str(values)) - self.log.warning("Got multiple values for key %i: %s" % args) + log.warning("Got multiple values for key %i: %s", + self.node.long_id, str(values)) value = valueCounts.most_common(1)[0][0] peerToSaveTo = self.nearestWithoutValue.popleft() if peerToSaveTo is not None: - d = self.protocol.callStore(peerToSaveTo, self.node.id, value) - return d.addCallback(lambda _: value) + await self.protocol.callStore(peerToSaveTo, self.node.id, value) return value class NodeSpiderCrawl(SpiderCrawl): - def find(self): + async def find(self): """ Find the closest nodes. """ - return self._find(self.protocol.callFindNode) + return await self._find(self.protocol.callFindNode) - def _nodesFound(self, responses): + async def _nodesFound(self, responses): """ Handle the result of an iteration in _find. """ @@ -141,7 +145,7 @@ class NodeSpiderCrawl(SpiderCrawl): if self.nearest.allBeenContacted(): return list(self.nearest) - return self.find() + return await self.find() class RPCFindResponse(object): diff --git a/kademlia/log.py b/kademlia/log.py deleted file mode 100644 index e7573d2..0000000 --- a/kademlia/log.py +++ /dev/null @@ -1,63 +0,0 @@ -import sys -from twisted.python import log - -INFO = 5 -DEBUG = 4 -WARNING = 3 -ERROR = 2 -CRITICAL = 1 - - -class FileLogObserver(log.FileLogObserver): - def __init__(self, f=None, level=WARNING, default=DEBUG): - log.FileLogObserver.__init__(self, f or sys.stdout) - self.level = level - self.default = default - - - def emit(self, eventDict): - ll = eventDict.get('loglevel', self.default) - if eventDict['isError'] or 'failure' in eventDict or self.level >= ll: - log.FileLogObserver.emit(self, eventDict) - - -class Logger: - def __init__(self, **kwargs): - self.kwargs = kwargs - - def msg(self, message, **kw): - kw.update(self.kwargs) - if 'system' in kw and not isinstance(kw['system'], str): - kw['system'] = kw['system'].__class__.__name__ - log.msg(message, **kw) - - def info(self, message, **kw): - kw['loglevel'] = INFO - self.msg("[INFO] %s" % message, **kw) - - def debug(self, message, **kw): - kw['loglevel'] = DEBUG - self.msg("[DEBUG] %s" % message, **kw) - - def warning(self, message, **kw): - kw['loglevel'] = WARNING - self.msg("[WARNING] %s" % message, **kw) - - def error(self, message, **kw): - kw['loglevel'] = ERROR - self.msg("[ERROR] %s" % message, **kw) - - def critical(self, message, **kw): - kw['loglevel'] = CRITICAL - self.msg("[CRITICAL] %s" % message, **kw) - -try: - theLogger -except NameError: - theLogger = Logger() - msg = theLogger.msg - info = theLogger.info - debug = theLogger.debug - warning = theLogger.warning - error = theLogger.error - critical = theLogger.critical diff --git a/kademlia/network.py b/kademlia/network.py index 2dbcbb6..b198f7a 100644 --- a/kademlia/network.py +++ b/kademlia/network.py @@ -4,80 +4,105 @@ Package for interacting on the network at a high level. import random import binascii import pickle +import asyncio +import logging -from twisted.internet.task import LoopingCall -from twisted.internet import defer, reactor, task - -from kademlia.log import Logger from kademlia.protocol import KademliaProtocol -from kademlia.utils import deferredDict, digest +from kademlia.utils import digest from kademlia.storage import ForgetfulStorage from kademlia.node import Node from kademlia.crawling import ValueSpiderCrawl from kademlia.crawling import NodeSpiderCrawl +log = logging.getLogger(__name__) + class Server(object): """ - High level view of a node instance. This is the object that should be created - to start listening as an active node on the network. + High level view of a node instance. This is the object that should be + created to start listening as an active node on the network. """ - def __init__(self, ksize=20, alpha=3, id=None, storage=None): + protocol_class = KademliaProtocol + + def __init__(self, ksize=20, alpha=3, node_id=None, storage=None): """ Create a server instance. This will start listening on the given port. Args: ksize (int): The k parameter from the paper alpha (int): The alpha parameter from the paper - id: The id for this node on the network. - storage: An instance that implements :interface:`~kademlia.storage.IStorage` + node_id: The id for this node on the network. + storage: An instance that implements + :interface:`~kademlia.storage.IStorage` """ self.ksize = ksize self.alpha = alpha - self.log = Logger(system=self) self.storage = storage or ForgetfulStorage() - self.node = Node(id or digest(random.getrandbits(255))) - self.protocol = KademliaProtocol(self.node, self.storage, ksize) - self.refreshLoop = LoopingCall(self.refreshTable).start(3600) + self.node = Node(node_id or digest(random.getrandbits(255))) + self.transport = None + self.protocol = None + self.refresh_loop = None + self.save_state_loop = None - def listen(self, port, interface=""): + def stop(self): + if self.transport is not None: + self.transport.close() + + if self.refresh_loop: + self.refresh_loop.cancel() + + if self.save_state_loop: + self.save_state_loop.cancel() + + def _create_protocol(self): + return self.protocol_class(self.node, self.storage, self.ksize) + + def listen(self, port, interface='0.0.0.0'): """ Start listening on the given port. - This is the same as calling:: - - reactor.listenUDP(port, server.protocol) - Provide interface="::" to accept ipv6 address """ - return reactor.listenUDP(port, self.protocol, interface) + loop = asyncio.get_event_loop() + listen = loop.create_datagram_endpoint(self._create_protocol, + local_addr=(interface, port)) + log.info("Node %i listening on %s:%i", + self.node.long_id, interface, port) + self.transport, self.protocol = loop.run_until_complete(listen) + # finally, schedule refreshing table + self.refresh_table() - def refreshTable(self): + def refresh_table(self): + log.debug("Refreshing routing table") + asyncio.ensure_future(self._refresh_table()) + loop = asyncio.get_event_loop() + self.refresh_loop = loop.call_later(3600, self.refresh_table) + + async def _refresh_table(self): """ Refresh buckets that haven't had any lookups in the last hour (per section 2.3 of the paper). """ ds = [] - for id in self.protocol.getRefreshIDs(): - node = Node(id) + for node_id in self.protocol.getRefreshIDs(): + node = Node(node_id) nearest = self.protocol.router.findNeighbors(node, self.alpha) - spider = NodeSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) + spider = NodeSpiderCrawl(self.protocol, node, nearest, + self.ksize, self.alpha) ds.append(spider.find()) - def republishKeys(_): - ds = [] - # Republish keys older than one hour - for dkey, value in self.storage.iteritemsOlderThan(3600): - ds.append(self.digest_set(dkey, value)) - return defer.gatherResults(ds) + # do our crawling + await asyncio.gather(*ds) - return defer.gatherResults(ds).addCallback(republishKeys) + # now republish keys older than one hour + for dkey, value in self.storage.iteritemsOlderThan(3600): + await self.set_digest(dkey, value) def bootstrappableNeighbors(self): """ - Get a :class:`list` of (ip, port) :class:`tuple` pairs suitable for use as an argument - to the bootstrap method. + Get a :class:`list` of (ip, port) :class:`tuple` pairs suitable for + use as an argument to the bootstrap method. The server should have been bootstrapped already - this is just a utility for getting some neighbors and then @@ -85,124 +110,100 @@ class Server(object): back up, the list of nodes can be used to bootstrap. """ neighbors = self.protocol.router.findNeighbors(self.node) - return [ tuple(n)[-2:] for n in neighbors ] + return [tuple(n)[-2:] for n in neighbors] - def bootstrap(self, addrs): + async def bootstrap(self, addrs): """ Bootstrap the server by connecting to other known nodes in the network. Args: - addrs: A `list` of (ip, port) `tuple` pairs. Note that only IP addresses - are acceptable - hostnames will cause an error. + addrs: A `list` of (ip, port) `tuple` pairs. Note that only IP + addresses are acceptable - hostnames will cause an error. """ - # if the transport hasn't been initialized yet, wait a second - if self.protocol.transport is None: - return task.deferLater(reactor, 1, self.bootstrap, addrs) + log.debug("Attempting to bootstrap node with %i initial contacts", + len(addrs)) + cos = list(map(self.bootstrap_node, addrs)) + gathered = await asyncio.gather(*cos) + nodes = [node for node in gathered if node is not None] + spider = NodeSpiderCrawl(self.protocol, self.node, nodes, + self.ksize, self.alpha) + return await spider.find() - def initTable(results): - nodes = [] - for addr, result in results.items(): - if result[0]: - nodes.append(Node(result[1], addr[0], addr[1])) - spider = NodeSpiderCrawl(self.protocol, self.node, nodes, self.ksize, self.alpha) - return spider.find() + async def bootstrap_node(self, addr): + result = await self.protocol.ping(addr, self.node.id) + return Node(result[1], addr[0], addr[1]) if result[0] else None - ds = {} - for addr in addrs: - ds[addr] = self.protocol.ping(addr, self.node.id) - return deferredDict(ds).addCallback(initTable) - - def inetVisibleIP(self): - """ - Get the internet visible IP's of this node as other nodes see it. - - Returns: - A `list` of IP's. If no one can be contacted, then the `list` will be empty. - """ - def handle(results): - ips = [ result[1][0] for result in results if result[0] ] - self.log.debug("other nodes think our ip is %s" % str(ips)) - return ips - - ds = [] - for neighbor in self.bootstrappableNeighbors(): - ds.append(self.protocol.stun(neighbor)) - return defer.gatherResults(ds).addCallback(handle) - - def get(self, key): + async def get(self, key): """ Get a key if the network has it. Returns: :class:`None` if not found, the value otherwise. """ + log.info("Looking up key %s", key) dkey = digest(key) # if this node has it, return it if self.storage.get(dkey) is not None: - return defer.succeed(self.storage.get(dkey)) + return self.storage.get(dkey) node = Node(dkey) nearest = self.protocol.router.findNeighbors(node) if len(nearest) == 0: - self.log.warning("There are no known neighbors to get key %s" % key) - return defer.succeed(None) - spider = ValueSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) - return spider.find() + log.warning("There are no known neighbors to get key %s", key) + return None + spider = ValueSpiderCrawl(self.protocol, node, nearest, + self.ksize, self.alpha) + return await spider.find() - def set(self, key, value): + async def set(self, key, value): """ - Set the given key to the given value in the network. + Set the given string key to the given value in the network. """ - self.log.debug("setting '%s' = '%s' on network" % (key, value)) + log.info("setting '%s' = '%s' on network", key, value) dkey = digest(key) - return self.digest_set(dkey, value) + return await self.set_digest(dkey, value) - def digest_set(self, dkey, value): + async def set_digest(self, dkey, value): """ - Set the given SHA1 digest key to the given value in the network. + Set the given SHA1 digest key (bytes) to the given value in the + network. """ node = Node(dkey) - # this is useful for debugging messages - hkey = binascii.hexlify(dkey) - - def store(nodes): - self.log.info("setting '%s' on %s" % (hkey, map(str, nodes))) - # if this node is close too, then store here as well - if self.node.distanceTo(node) < max([n.distanceTo(node) for n in nodes]): - self.storage[dkey] = value - ds = [self.protocol.callStore(n, dkey, value) for n in nodes] - return defer.DeferredList(ds).addCallback(self._anyRespondSuccess) nearest = self.protocol.router.findNeighbors(node) if len(nearest) == 0: - self.log.warning("There are no known neighbors to set key %s" % hkey) - return defer.succeed(False) - spider = NodeSpiderCrawl(self.protocol, node, nearest, self.ksize, self.alpha) - return spider.find().addCallback(store) + log.warning("There are no known neighbors to set key %s", + dkey.hex()) + return False - def _anyRespondSuccess(self, responses): - """ - Given the result of a DeferredList of calls to peers, ensure that at least - one of them was contacted and responded with a Truthy result. - """ - for deferSuccess, result in responses: - peerReached, peerResponse = result - if deferSuccess and peerReached and peerResponse: - return True - return False + spider = NodeSpiderCrawl(self.protocol, node, nearest, + self.ksize, self.alpha) + nodes = await spider.find() + log.info("setting '%s' on %s", dkey.hex(), list(map(str, nodes))) + + # if this node is close too, then store here as well + biggest = max([n.distanceTo(node) for n in nodes]) + if self.node.distanceTo(node) < biggest: + self.storage[dkey] = value + ds = [self.protocol.callStore(n, dkey, value) for n in nodes] + # return true only if at least one store call succeeded + return any(await asyncio.gather(*ds)) def saveState(self, fname): """ Save the state of this node (the alpha/ksize/id/immediate neighbors) to a cache file with the given fname. """ - data = { 'ksize': self.ksize, - 'alpha': self.alpha, - 'id': self.node.id, - 'neighbors': self.bootstrappableNeighbors() } + log.info("Saving state to %s", fname) + data = { + 'ksize': self.ksize, + 'alpha': self.alpha, + 'id': self.node.id, + 'neighbors': self.bootstrappableNeighbors() + } if len(data['neighbors']) == 0: - self.log.warning("No known neighbors, so not writing to cache.") + log.warning("No known neighbors, so not writing to cache.") return - with open(fname, 'w') as f: + with open(fname, 'wb') as f: pickle.dump(data, f) @classmethod @@ -211,7 +212,8 @@ class Server(object): Load the state of this node (the alpha/ksize/id/immediate neighbors) from a cache file with the given fname. """ - with open(fname, 'r') as f: + log.info("Loading state from %s", fname) + with open(fname, 'rb') as f: data = pickle.load(f) s = Server(data['ksize'], data['alpha'], data['id']) if len(data['neighbors']) > 0: @@ -225,9 +227,12 @@ class Server(object): Args: fname: File name to save retularly to - frequencey: Frequency in seconds that the state should be saved. + frequency: Frequency in seconds that the state should be saved. By default, 10 minutes. """ - loop = LoopingCall(self.saveState, fname) - loop.start(frequency) - return loop + self.saveState(fname) + loop = asyncio.get_event_loop() + self.save_state_loop = loop.call_later(frequency, + self.saveStateRegularly, + fname, + frequency) diff --git a/kademlia/node.py b/kademlia/node.py index d1b25f7..b25e9f8 100644 --- a/kademlia/node.py +++ b/kademlia/node.py @@ -3,11 +3,11 @@ import heapq class Node: - def __init__(self, id, ip=None, port=None): - self.id = id + def __init__(self, node_id, ip=None, port=None): + self.id = node_id self.ip = ip self.port = port - self.long_id = long(id.encode('hex'), 16) + self.long_id = int(node_id.hex(), 16) def sameHomeAs(self, node): return self.ip == node.ip and self.port == node.port @@ -64,9 +64,9 @@ class NodeHeap(object): heapq.heappush(nheap, (distance, node)) self.heap = nheap - def getNodeById(self, id): + def getNodeById(self, node_id): for _, node in self.heap: - if node.id == id: + if node.id == node_id: return node return None @@ -106,7 +106,7 @@ class NodeHeap(object): return iter(map(itemgetter(1), nodes)) def __contains__(self, node): - for distance, n in self.heap: + for _, n in self.heap: if node.id == n.id: return True return False diff --git a/kademlia/protocol.py b/kademlia/protocol.py index 884aa85..6e22f8b 100644 --- a/kademlia/protocol.py +++ b/kademlia/protocol.py @@ -1,14 +1,15 @@ import random - -from twisted.internet import defer +import asyncio +import logging from rpcudp.protocol import RPCProtocol from kademlia.node import Node from kademlia.routing import RoutingTable -from kademlia.log import Logger from kademlia.utils import digest +log = logging.getLogger(__name__) + class KademliaProtocol(RPCProtocol): def __init__(self, sourceNode, storage, ksize): @@ -16,7 +17,6 @@ class KademliaProtocol(RPCProtocol): self.router = RoutingTable(self, ksize, sourceNode) self.storage = storage self.sourceNode = sourceNode - self.log = Logger(system=self) def getRefreshIDs(self): """ @@ -24,7 +24,8 @@ class KademliaProtocol(RPCProtocol): """ ids = [] for bucket in self.router.getLonelyBuckets(): - ids.append(random.randint(*bucket.range)) + rid = random.randint(*bucket.range).to_bytes(20, byteorder='big') + ids.append(rid) return ids def rpc_stun(self, sender): @@ -38,16 +39,19 @@ class KademliaProtocol(RPCProtocol): def rpc_store(self, sender, nodeid, key, value): source = Node(nodeid, sender[0], sender[1]) self.welcomeIfNewNode(source) - self.log.debug("got a store request from %s, storing value" % str(sender)) + log.debug("got a store request from %s, storing '%s'='%s'", + sender, key.hex(), value) self.storage[key] = value return True def rpc_find_node(self, sender, nodeid, key): - self.log.info("finding neighbors of %i in local table" % long(nodeid.encode('hex'), 16)) + log.info("finding neighbors of %i in local table", + int(nodeid.hex(), 16)) source = Node(nodeid, sender[0], sender[1]) self.welcomeIfNewNode(source) node = Node(key) - return map(tuple, self.router.findNeighbors(node, exclude=source)) + neighbors = self.router.findNeighbors(node, exclude=source) + return list(map(tuple, neighbors)) def rpc_find_value(self, sender, nodeid, key): source = Node(nodeid, sender[0], sender[1]) @@ -55,27 +59,29 @@ class KademliaProtocol(RPCProtocol): value = self.storage.get(key, None) if value is None: return self.rpc_find_node(sender, nodeid, key) - return { 'value': value } + return {'value': value} - def callFindNode(self, nodeToAsk, nodeToFind): + async def callFindNode(self, nodeToAsk, nodeToFind): address = (nodeToAsk.ip, nodeToAsk.port) - d = self.find_node(address, self.sourceNode.id, nodeToFind.id) - return d.addCallback(self.handleCallResponse, nodeToAsk) + result = await self.find_node(address, self.sourceNode.id, + nodeToFind.id) + return self.handleCallResponse(result, nodeToAsk) - def callFindValue(self, nodeToAsk, nodeToFind): + async def callFindValue(self, nodeToAsk, nodeToFind): address = (nodeToAsk.ip, nodeToAsk.port) - d = self.find_value(address, self.sourceNode.id, nodeToFind.id) - return d.addCallback(self.handleCallResponse, nodeToAsk) + result = await self.find_value(address, self.sourceNode.id, + nodeToFind.id) + return self.handleCallResponse(result, nodeToAsk) - def callPing(self, nodeToAsk): + async def callPing(self, nodeToAsk): address = (nodeToAsk.ip, nodeToAsk.port) - d = self.ping(address, self.sourceNode.id) - return d.addCallback(self.handleCallResponse, nodeToAsk) + result = await self.ping(address, self.sourceNode.id) + return self.handleCallResponse(result, nodeToAsk) - def callStore(self, nodeToAsk, key, value): + async def callStore(self, nodeToAsk, key, value): address = (nodeToAsk.ip, nodeToAsk.port) - d = self.store(address, self.sourceNode.id, key, value) - return d.addCallback(self.handleCallResponse, nodeToAsk) + result = await self.store(address, self.sourceNode.id, key, value) + return self.handleCallResponse(result, nodeToAsk) def welcomeIfNewNode(self, node): """ @@ -91,28 +97,32 @@ class KademliaProtocol(RPCProtocol): is closer than the closest in that list, then store the key/value on the new node (per section 2.5 of the paper) """ - if self.router.isNewNode(node): - ds = [] - for key, value in self.storage.iteritems(): - keynode = Node(digest(key)) - neighbors = self.router.findNeighbors(keynode) - if len(neighbors) > 0: - newNodeClose = node.distanceTo(keynode) < neighbors[-1].distanceTo(keynode) - thisNodeClosest = self.sourceNode.distanceTo(keynode) < neighbors[0].distanceTo(keynode) - if len(neighbors) == 0 or (newNodeClose and thisNodeClosest): - ds.append(self.callStore(node, key, value)) - self.router.addContact(node) - return defer.gatherResults(ds) + if not self.router.isNewNode(node): + return + + log.info("never seen %s before, adding to router", node) + for key, value in self.storage.items(): + keynode = Node(digest(key)) + neighbors = self.router.findNeighbors(keynode) + if len(neighbors) > 0: + last = neighbors[-1].distanceTo(keynode) + newNodeClose = node.distanceTo(keynode) < last + first = neighbors[0].distanceTo(keynode) + thisNodeClosest = self.sourceNode.distanceTo(keynode) < first + if len(neighbors) == 0 or (newNodeClose and thisNodeClosest): + asyncio.ensure_future(self.callStore(node, key, value)) + self.router.addContact(node) def handleCallResponse(self, result, node): """ If we get a response, add the node to the routing table. If we get no response, make sure it's removed from the routing table. """ - if result[0]: - self.log.info("got response from %s, adding to router" % node) - self.welcomeIfNewNode(node) - else: - self.log.debug("no response from %s, removing from router" % node) + if not result[0]: + log.warning("no response from %s, removing from router", node) self.router.removeContact(node) + return result + + log.info("got successful response from %s", node) + self.welcomeIfNewNode(node) return result diff --git a/kademlia/routing.py b/kademlia/routing.py index 5bde1bf..c026312 100644 --- a/kademlia/routing.py +++ b/kademlia/routing.py @@ -1,9 +1,10 @@ import heapq import time import operator -from collections import OrderedDict +import asyncio -from kademlia.utils import OrderedSet, sharedPrefix +from collections import OrderedDict +from kademlia.utils import OrderedSet, sharedPrefix, bytesToBitString class KBucket(object): @@ -18,7 +19,7 @@ class KBucket(object): self.lastUpdated = time.time() def getNodes(self): - return self.nodes.values() + return list(self.nodes.values()) def split(self): midpoint = (self.range[0] + self.range[1]) / 2 @@ -64,14 +65,15 @@ class KBucket(object): return True def depth(self): - sp = sharedPrefix([n.id for n in self.nodes.values()]) + vals = self.nodes.values() + sp = sharedPrefix([bytesToBitString(n.id) for n in vals]) return len(sp) def head(self): - return self.nodes.values()[0] + return list(self.nodes.values())[0] - def __getitem__(self, id): - return self.nodes.get(id, None) + def __getitem__(self, node_id): + return self.nodes.get(node_id, None) def __len__(self): return len(self.nodes) @@ -89,7 +91,7 @@ class TableTraverser(object): def __iter__(self): return self - def next(self): + def __next__(self): """ Pop an item from the left subtree, then right, then left, etc. """ @@ -99,12 +101,12 @@ class TableTraverser(object): if self.left and len(self.leftBuckets) > 0: self.currentNodes = self.leftBuckets.pop().getNodes() self.left = False - return self.next() + return next(self) if len(self.rightBuckets) > 0: self.currentNodes = self.rightBuckets.pop().getNodes() self.left = True - return self.next() + return next(self) raise StopIteration @@ -134,7 +136,8 @@ class RoutingTable(object): Get all of the buckets that haven't been updated in over an hour. """ - return [b for b in self.buckets if b.lastUpdated < (time.time() - 3600)] + hrago = time.time() - 3600 + return [b for b in self.buckets if b.lastUpdated < hrago] def removeContact(self, node): index = self.getBucketFor(node) @@ -152,13 +155,13 @@ class RoutingTable(object): if bucket.addNode(node): return - # Per section 4.2 of paper, split if the bucket has the node in its range - # or if the depth is not congruent to 0 mod 5 + # Per section 4.2 of paper, split if the bucket has the node + # in its range or if the depth is not congruent to 0 mod 5 if bucket.hasInRange(self.node) or bucket.depth() % 5 != 0: self.splitBucket(index) self.addContact(node) else: - self.protocol.callPing(bucket.head()) + asyncio.ensure_future(self.protocol.callPing(bucket.head())) def getBucketFor(self, node): """ @@ -172,9 +175,10 @@ class RoutingTable(object): k = k or self.ksize nodes = [] for neighbor in TableTraverser(self, node): - if neighbor.id != node.id and (exclude is None or not neighbor.sameHomeAs(exclude)): + notexcluded = exclude is None or not neighbor.sameHomeAs(exclude) + if neighbor.id != node.id and notexcluded: heapq.heappush(nodes, (node.distanceTo(neighbor), neighbor)) if len(nodes) == k: break - return map(operator.itemgetter(1), heapq.nsmallest(k, nodes)) + return list(map(operator.itemgetter(1), heapq.nsmallest(k, nodes))) diff --git a/kademlia/storage.py b/kademlia/storage.py index 3cc5e5e..30293d8 100644 --- a/kademlia/storage.py +++ b/kademlia/storage.py @@ -1,48 +1,47 @@ import time -from itertools import izip -from itertools import imap from itertools import takewhile import operator from collections import OrderedDict -from zope.interface import implements -from zope.interface import Interface - -class IStorage(Interface): +class IStorage: """ Local storage for this node. """ - def __setitem__(key, value): + def __setitem__(self, key, value): """ Set a key to the given value. """ + raise NotImplementedError - def __getitem__(key): + def __getitem__(self, key): """ Get the given key. If item doesn't exist, raises C{KeyError} """ + raise NotImplementedError - def get(key, default=None): + def get(self, key, default=None): """ Get given key. If not found, return default. """ + raise NotImplementedError - def iteritemsOlderThan(secondsOld): + def iteritemsOlderThan(self, secondsOld): """ - Return the an iterator over (key, value) tuples for items older than the given secondsOld. + Return the an iterator over (key, value) tuples for items older + than the given secondsOld. """ + raise NotImplementedError - def iteritems(): + def __iter__(self): """ Get the iterator for this storage, should yield tuple of (key, value) """ + raise NotImplementedError -class ForgetfulStorage(object): - implements(IStorage) - +class ForgetfulStorage(IStorage): def __init__(self, ttl=604800): """ By default, max age is a week. @@ -57,7 +56,7 @@ class ForgetfulStorage(object): self.cull() def cull(self): - for k, v in self.iteritemsOlderThan(self.ttl): + for _, _ in self.iteritemsOlderThan(self.ttl): self.data.popitem(last=False) def get(self, key, default=None): @@ -82,16 +81,16 @@ class ForgetfulStorage(object): minBirthday = time.time() - secondsOld zipped = self._tripleIterable() matches = takewhile(lambda r: minBirthday >= r[1], zipped) - return imap(operator.itemgetter(0, 2), matches) + return list(map(operator.itemgetter(0, 2), matches)) def _tripleIterable(self): - ikeys = self.data.iterkeys() - ibirthday = imap(operator.itemgetter(0), self.data.itervalues()) - ivalues = imap(operator.itemgetter(1), self.data.itervalues()) - return izip(ikeys, ibirthday, ivalues) + ikeys = self.data.keys() + ibirthday = map(operator.itemgetter(0), self.data.values()) + ivalues = map(operator.itemgetter(1), self.data.values()) + return zip(ikeys, ibirthday, ivalues) - def iteritems(self): + def items(self): self.cull() - ikeys = self.data.iterkeys() - ivalues = imap(operator.itemgetter(1), self.data.itervalues()) - return izip(ikeys, ivalues) + ikeys = self.data.keys() + ivalues = map(operator.itemgetter(1), self.data.values()) + return zip(ikeys, ivalues) diff --git a/kademlia/tests/test_linting.py b/kademlia/tests/test_linting.py new file mode 100644 index 0000000..3998423 --- /dev/null +++ b/kademlia/tests/test_linting.py @@ -0,0 +1,27 @@ +import unittest +from glob import glob + +import pycodestyle + +from pylint import epylint as lint + + +class LintError(Exception): + pass + + +class TestCodeLinting(unittest.TestCase): + # pylint: disable=no-self-use + def test_pylint(self): + (stdout, _) = lint.py_run('kademlia', return_std=True) + errors = stdout.read() + if errors.strip(): + raise LintError(errors) + + # pylint: disable=no-self-use + def test_pep8(self): + style = pycodestyle.StyleGuide() + files = glob('kademlia/**/*.py', recursive=True) + result = style.check_files(files) + if result.total_errors > 0: + raise LintError("Code style errors found.") diff --git a/kademlia/tests/test_node.py b/kademlia/tests/test_node.py index 60ca14b..fb797ee 100644 --- a/kademlia/tests/test_node.py +++ b/kademlia/tests/test_node.py @@ -1,7 +1,7 @@ +import unittest import random import hashlib -from twisted.trial import unittest from kademlia.node import Node, NodeHeap from kademlia.tests.utils import mknode @@ -9,15 +9,15 @@ from kademlia.tests.utils import mknode class NodeTest(unittest.TestCase): def test_longID(self): - rid = hashlib.sha1(str(random.getrandbits(255))).digest() + rid = hashlib.sha1(str(random.getrandbits(255)).encode()).digest() n = Node(rid) - self.assertEqual(n.long_id, long(rid.encode('hex'), 16)) + self.assertEqual(n.long_id, int(rid.hex(), 16)) def test_distanceCalculation(self): - ridone = hashlib.sha1(str(random.getrandbits(255))) - ridtwo = hashlib.sha1(str(random.getrandbits(255))) + ridone = hashlib.sha1(str(random.getrandbits(255)).encode()) + ridtwo = hashlib.sha1(str(random.getrandbits(255)).encode()) - shouldbe = long(ridone.hexdigest(), 16) ^ long(ridtwo.hexdigest(), 16) + shouldbe = int(ridone.hexdigest(), 16) ^ int(ridtwo.hexdigest(), 16) none = Node(ridone.digest()) ntwo = Node(ridtwo.digest()) self.assertEqual(none.distanceTo(ntwo), shouldbe) diff --git a/kademlia/tests/test_routing.py b/kademlia/tests/test_routing.py index 460302a..8c77b53 100644 --- a/kademlia/tests/test_routing.py +++ b/kademlia/tests/test_routing.py @@ -1,4 +1,4 @@ -from twisted.trial import unittest +import unittest from kademlia.routing import KBucket from kademlia.tests.utils import mknode, FakeProtocol diff --git a/kademlia/tests/test_server.py b/kademlia/tests/test_server.py new file mode 100644 index 0000000..71de5e8 --- /dev/null +++ b/kademlia/tests/test_server.py @@ -0,0 +1,45 @@ +import unittest + +from kademlia.network import Server +from kademlia.protocol import KademliaProtocol + + +class SwappableProtocolTests(unittest.TestCase): + + def test_default_protocol(self): + """ + An ordinary Server object will initially not have a protocol, but will + have a KademliaProtocol object as its protocol after its listen() + method is called. + """ + server = Server() + self.assertIsNone(server.protocol) + server.listen(8469) + self.assertIsInstance(server.protocol, KademliaProtocol) + server.stop() + + def test_custom_protocol(self): + """ + A subclass of Server which overrides the protocol_class attribute will + have an instance of that class as its protocol after its listen() + method is called. + """ + + # Make a custom Protocol and Server to go with hit. + class CoconutProtocol(KademliaProtocol): + pass + + class HuskServer(Server): + protocol_class = CoconutProtocol + + # An ordinary server does NOT have a CoconutProtocol as its protocol... + server = Server() + server.listen(8469) + self.assertNotIsInstance(server.protocol, CoconutProtocol) + server.stop() + + # ...but our custom server does. + husk_server = HuskServer() + husk_server.listen(8469) + self.assertIsInstance(husk_server.protocol, CoconutProtocol) + husk_server.stop() diff --git a/kademlia/tests/test_utils.py b/kademlia/tests/test_utils.py index d667657..073d0c3 100644 --- a/kademlia/tests/test_utils.py +++ b/kademlia/tests/test_utils.py @@ -1,16 +1,15 @@ import hashlib - -from twisted.trial import unittest +import unittest from kademlia.utils import digest, sharedPrefix, OrderedSet class UtilsTest(unittest.TestCase): def test_digest(self): - d = hashlib.sha1('1').digest() + d = hashlib.sha1(b'1').digest() self.assertEqual(d, digest(1)) - d = hashlib.sha1('another').digest() + d = hashlib.sha1(b'another').digest() self.assertEqual(d, digest('another')) def test_sharedPrefix(self): diff --git a/kademlia/tests/utils.py b/kademlia/tests/utils.py index 1c4da17..23ff415 100644 --- a/kademlia/tests/utils.py +++ b/kademlia/tests/utils.py @@ -9,86 +9,20 @@ from kademlia.node import Node from kademlia.routing import RoutingTable -def mknode(id=None, ip=None, port=None, intid=None): +def mknode(node_id=None, ip=None, port=None, intid=None): """ Make a node. Created a random id if not specified. """ if intid is not None: - id = pack('>l', intid) - id = id or hashlib.sha1(str(random.getrandbits(255))).digest() - return Node(id, ip, port) + node_id = pack('>l', intid) + if not node_id: + randbits = str(random.getrandbits(255)) + node_id = hashlib.sha1(randbits.encode()).digest() + return Node(node_id, ip, port) -class FakeProtocol(object): +class FakeProtocol: def __init__(self, sourceID, ksize=20): self.router = RoutingTable(self, ksize, Node(sourceID)) self.storage = {} self.sourceID = sourceID - - def getRefreshIDs(self): - """ - Get ids to search for to keep old buckets up to date. - """ - ids = [] - for bucket in self.router.getLonelyBuckets(): - ids.append(random.randint(*bucket.range)) - return ids - - def rpc_ping(self, sender, nodeid): - source = Node(nodeid, sender[0], sender[1]) - self.router.addContact(source) - return self.sourceID - - def rpc_store(self, sender, nodeid, key, value): - source = Node(nodeid, sender[0], sender[1]) - self.router.addContact(source) - self.log.debug("got a store request from %s, storing value" % str(sender)) - self.storage[key] = value - - def rpc_find_node(self, sender, nodeid, key): - self.log.info("finding neighbors of %i in local table" % long(nodeid.encode('hex'), 16)) - source = Node(nodeid, sender[0], sender[1]) - self.router.addContact(source) - node = Node(key) - return map(tuple, self.router.findNeighbors(node, exclude=source)) - - def rpc_find_value(self, sender, nodeid, key): - source = Node(nodeid, sender[0], sender[1]) - self.router.addContact(source) - value = self.storage.get(key, None) - if value is None: - return self.rpc_find_node(sender, nodeid, key) - return { 'value': value } - - def callFindNode(self, nodeToAsk, nodeToFind): - address = (nodeToAsk.ip, nodeToAsk.port) - d = self.find_node(address, self.sourceID, nodeToFind.id) - return d.addCallback(self.handleCallResponse, nodeToAsk) - - def callFindValue(self, nodeToAsk, nodeToFind): - address = (nodeToAsk.ip, nodeToAsk.port) - d = self.find_value(address, self.sourceID, nodeToFind.id) - return d.addCallback(self.handleCallResponse, nodeToAsk) - - def callPing(self, nodeToAsk): - address = (nodeToAsk.ip, nodeToAsk.port) - d = self.ping(address, self.sourceID) - return d.addCallback(self.handleCallResponse, nodeToAsk) - - def callStore(self, nodeToAsk, key, value): - address = (nodeToAsk.ip, nodeToAsk.port) - d = self.store(address, self.sourceID, key, value) - return d.addCallback(self.handleCallResponse, nodeToAsk) - - def handleCallResponse(self, result, node): - """ - If we get a response, add the node to the routing table. If - we get no response, make sure it's removed from the routing table. - """ - if result[0]: - self.log.info("got response from %s, adding to router" % node) - self.router.addContact(node) - else: - self.log.debug("no response from %s, removing from router" % node) - self.router.removeContact(node) - return result diff --git a/kademlia/utils.py b/kademlia/utils.py index 63209e8..9732bf5 100644 --- a/kademlia/utils.py +++ b/kademlia/utils.py @@ -3,44 +3,25 @@ General catchall for functions that don't make sense as methods. """ import hashlib import operator +import asyncio -from twisted.internet import defer + +async def gather_dict(d): + cors = list(d.values()) + results = await asyncio.gather(*cors) + return dict(zip(d.keys(), results)) def digest(s): - if not isinstance(s, str): - s = str(s) + if not isinstance(s, bytes): + s = str(s).encode('utf8') return hashlib.sha1(s).digest() -def deferredDict(d): - """ - Just like a :class:`defer.DeferredList` but instead accepts and returns a :class:`dict`. - - Args: - d: A :class:`dict` whose values are all :class:`defer.Deferred` objects. - - Returns: - :class:`defer.DeferredList` whose callback will be given a dictionary whose - keys are the same as the parameter :obj:`d` and whose values are the results - of each individual deferred call. - """ - if len(d) == 0: - return defer.succeed({}) - - def handle(results, names): - rvalue = {} - for index in range(len(results)): - rvalue[names[index]] = results[index][1] - return rvalue - - dl = defer.DeferredList(d.values()) - return dl.addCallback(handle, d.keys()) - - class OrderedSet(list): """ - Acts like a list in all ways, except in the behavior of the :meth:`push` method. + Acts like a list in all ways, except in the behavior of the + :meth:`push` method. """ def push(self, thing): @@ -69,3 +50,8 @@ def sharedPrefix(args): break i += 1 return args[0][:i] + + +def bytesToBitString(bites): + bits = [bin(bite)[2:].rjust(8, '0') for bite in bites] + return "".join(bits) diff --git a/setup.py b/setup.py index b7453c3..f62a11c 100755 --- a/setup.py +++ b/setup.py @@ -1,16 +1,15 @@ #!/usr/bin/env python from setuptools import setup, find_packages -from kademlia import version +import kademlia setup( name="kademlia", - version=version, + version=kademlia.__version__, description="Kademlia is a distributed hash table for decentralized peer-to-peer computer networks.", author="Brian Muller", author_email="bamuller@gmail.com", license="MIT", url="http://github.com/bmuller/kademlia", packages=find_packages(), - requires=["twisted", "rpcudp"], - install_requires=['twisted>=14.0', "rpcudp>=1.0"] + install_requires=["rpcudp>=3.0.0"] )