aboutsummaryrefslogtreecommitdiffstats
path: root/src/cryptography/hazmat/primitives/padding.py
diff options
context:
space:
mode:
authorDonald Stufft <donald@stufft.io>2014-11-07 19:17:08 -0500
committerDonald Stufft <donald@stufft.io>2014-11-13 07:56:31 -0500
commitc62a78c015cf7aeb0c05bce82ef14cd86fe0b0fc (patch)
tree55482d6f2e98ff65f2174294f64ad96b7be68717 /src/cryptography/hazmat/primitives/padding.py
parentd9f137db78d451ecb6ef7925b7dec0139ca59898 (diff)
downloadcryptography-c62a78c015cf7aeb0c05bce82ef14cd86fe0b0fc.tar.gz
cryptography-c62a78c015cf7aeb0c05bce82ef14cd86fe0b0fc.tar.bz2
cryptography-c62a78c015cf7aeb0c05bce82ef14cd86fe0b0fc.zip
Move the cryptography package into a src/ subdirectory
Due to differences in how py.test determines which module to ``import`` the test suite actually runs against the cryptography which is in the *current* directory instead of the cryptography which is installed. The problem essentially boils down to when there is a tests/__init__.py then py.test adds the current directory to the front of the sys.path, causing it to take precedence over the installed location. This means that running the tests relies on the implicit compile that CFFI does instead of testing against what people will actually be runnning, which is the module compiled by setup.py.
Diffstat (limited to 'src/cryptography/hazmat/primitives/padding.py')
-rw-r--r--src/cryptography/hazmat/primitives/padding.py164
1 files changed, 164 insertions, 0 deletions
diff --git a/src/cryptography/hazmat/primitives/padding.py b/src/cryptography/hazmat/primitives/padding.py
new file mode 100644
index 00000000..7aeeff7c
--- /dev/null
+++ b/src/cryptography/hazmat/primitives/padding.py
@@ -0,0 +1,164 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
+import six
+
+from cryptography import utils
+from cryptography.exceptions import AlreadyFinalized
+from cryptography.hazmat.bindings.utils import LazyLibrary, build_ffi
+from cryptography.hazmat.primitives import interfaces
+
+
+TYPES = """
+uint8_t Cryptography_check_pkcs7_padding(const uint8_t *, uint8_t);
+"""
+
+FUNCTIONS = """
+/* Returns the value of the input with the most-significant-bit copied to all
+ of the bits. */
+static uint8_t Cryptography_DUPLICATE_MSB_TO_ALL(uint8_t a) {
+ return (1 - (a >> (sizeof(uint8_t) * 8 - 1))) - 1;
+}
+
+/* This returns 0xFF if a < b else 0x00, but does so in a constant time
+ fashion */
+static uint8_t Cryptography_constant_time_lt(uint8_t a, uint8_t b) {
+ a -= b;
+ return Cryptography_DUPLICATE_MSB_TO_ALL(a);
+}
+
+uint8_t Cryptography_check_pkcs7_padding(const uint8_t *data,
+ uint8_t block_len) {
+ uint8_t i;
+ uint8_t pad_size = data[block_len - 1];
+ uint8_t mismatch = 0;
+ for (i = 0; i < block_len; i++) {
+ unsigned int mask = Cryptography_constant_time_lt(i, pad_size);
+ uint8_t b = data[block_len - 1 - i];
+ mismatch |= (mask & (pad_size ^ b));
+ }
+
+ /* Check to make sure the pad_size was within the valid range. */
+ mismatch |= ~Cryptography_constant_time_lt(0, pad_size);
+ mismatch |= Cryptography_constant_time_lt(block_len, pad_size);
+
+ /* Make sure any bits set are copied to the lowest bit */
+ mismatch |= mismatch >> 4;
+ mismatch |= mismatch >> 2;
+ mismatch |= mismatch >> 1;
+ /* Now check the low bit to see if it's set */
+ return (mismatch & 1) == 0;
+}
+"""
+
+
+_ffi = build_ffi(cdef_source=TYPES, verify_source=FUNCTIONS)
+_lib = LazyLibrary(_ffi)
+
+
+class PKCS7(object):
+ def __init__(self, block_size):
+ if not (0 <= block_size < 256):
+ raise ValueError("block_size must be in range(0, 256).")
+
+ if block_size % 8 != 0:
+ raise ValueError("block_size must be a multiple of 8.")
+
+ self.block_size = block_size
+
+ def padder(self):
+ return _PKCS7PaddingContext(self.block_size)
+
+ def unpadder(self):
+ return _PKCS7UnpaddingContext(self.block_size)
+
+
+@utils.register_interface(interfaces.PaddingContext)
+class _PKCS7PaddingContext(object):
+ def __init__(self, block_size):
+ self.block_size = block_size
+ # TODO: more copies than necessary, we should use zero-buffer (#193)
+ self._buffer = b""
+
+ def update(self, data):
+ if self._buffer is None:
+ raise AlreadyFinalized("Context was already finalized.")
+
+ if not isinstance(data, bytes):
+ raise TypeError("data must be bytes.")
+
+ self._buffer += data
+
+ finished_blocks = len(self._buffer) // (self.block_size // 8)
+
+ result = self._buffer[:finished_blocks * (self.block_size // 8)]
+ self._buffer = self._buffer[finished_blocks * (self.block_size // 8):]
+
+ return result
+
+ def finalize(self):
+ if self._buffer is None:
+ raise AlreadyFinalized("Context was already finalized.")
+
+ pad_size = self.block_size // 8 - len(self._buffer)
+ result = self._buffer + six.int2byte(pad_size) * pad_size
+ self._buffer = None
+ return result
+
+
+@utils.register_interface(interfaces.PaddingContext)
+class _PKCS7UnpaddingContext(object):
+ def __init__(self, block_size):
+ self.block_size = block_size
+ # TODO: more copies than necessary, we should use zero-buffer (#193)
+ self._buffer = b""
+
+ def update(self, data):
+ if self._buffer is None:
+ raise AlreadyFinalized("Context was already finalized.")
+
+ if not isinstance(data, bytes):
+ raise TypeError("data must be bytes.")
+
+ self._buffer += data
+
+ finished_blocks = max(
+ len(self._buffer) // (self.block_size // 8) - 1,
+ 0
+ )
+
+ result = self._buffer[:finished_blocks * (self.block_size // 8)]
+ self._buffer = self._buffer[finished_blocks * (self.block_size // 8):]
+
+ return result
+
+ def finalize(self):
+ if self._buffer is None:
+ raise AlreadyFinalized("Context was already finalized.")
+
+ if len(self._buffer) != self.block_size // 8:
+ raise ValueError("Invalid padding bytes.")
+
+ valid = _lib.Cryptography_check_pkcs7_padding(
+ self._buffer, self.block_size // 8
+ )
+
+ if not valid:
+ raise ValueError("Invalid padding bytes.")
+
+ pad_size = six.indexbytes(self._buffer, -1)
+ res = self._buffer[:-pad_size]
+ self._buffer = None
+ return res