blob: a91d0fe92f779836c535ddfb6c9be2bee339dbad [file] [log] [blame]
# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2014 Regents of the University of California.
# Author: Teng Liang <philoliang2011@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU General Public License is in the file COPYING.
from pyndn.encoding import WireFormat
from pyndn.util import Blob
from pyndn.digest_sha256_signature import DigestSha256Signature
from pyndn import Data, KeyLocatorType, Interest, Name
from hashlib import sha256
from random import SystemRandom
from time import time as timestamp
import hmac
class HmacHelper(object):
def __init__(self, raw_key, wireFormat=None):
super(HmacHelper, self).__init__()
self.key = sha256(raw_key).digest()
self.random = SystemRandom()
if wireFormat is None:
self.wireFormat = WireFormat.getDefaultWireFormat()
else:
self.wireFormat = wireFormat
@classmethod
def generatePin(cls):
"""
Generate a pin to be entered into another device.
Restricting this to 8 bytes (16 hex chars) for now.
"""
pin = bytearray(8)
random = SystemRandom()
for i in range(8):
pin[i] = random.randint(0,0xff)
return str(pin).encode('hex')
@classmethod
def extractInterestSignature(cls, interest, wireFormat=None):
if wireFormat is None:
wireFormat = WireFormat.getDefaultWireFormat()
try:
signature = wireFormat.decodeSignatureInfoAndValue(
interest.getName().get(-2).getValue().buf(),
interest.getName().get(-1).getValue().buf())
except:
signature = None
return signature
def signData(self, data, keyName=None, wireFormat=None):
data.setSignature(DigestSha256Signature())
s = data.getSignature()
#s.getKeyLocator().setType(KeyLocatorType.KEYNAME)
#s.getKeyLocator().setKeyName(keyName)
if wireFormat is None:
wireFormat = WireFormat.getDefaultWireFormat()
encoded = data.wireEncode(wireFormat)
signer = hmac.new(self.key, bytearray(encoded.toSignedBuffer()), sha256)
s.setSignature(Blob(signer.digest()))
data.wireEncode(wireFormat)
def verifyData(self, data, wireFormat=None):
# clear out old signature so encoding does not include it
if wireFormat is None:
wireFormat = WireFormat.getDefaultWireFormat()
encoded = data.wireEncode(wireFormat)
hasher = hmac.new(self.key, bytearray(encoded.toSignedBuffer()), sha256)
sigBytes = data.getSignature().getSignature()
return sigBytes.toRawStr() == hasher.digest()
def signInterest(self, interest, keyName=None, wireFormat=None):
# Adds the nonce and timestamp here, because there is no
# 'makeCommandInterest' call for this yet
nonceValue = bytearray(8)
for i in range(8):
nonceValue[i] = self.random.randint(0,0xff)
timestampValue = bytearray(8)
ts = int(timestamp()*1000)
for i in range(8):
byte = ts & 0xff
timestampValue[-(i+1)] = byte
ts = ts >> 8
if wireFormat is None:
wireFormat = WireFormat.getDefaultWireFormat()
s = DigestSha256Signature()
#s.getKeyLocator().setType(KeyLocatorType.KEYNAME)
#s.getKeyLocator().setKeyName(keyName)
interestName = interest.getName()
interestName.append(nonceValue).append(timestampValue)
interestName.append(wireFormat.encodeSignatureInfo(s))
interestName.append(Name.Component())
encoding = interest.wireEncode(wireFormat)
signer = hmac.new(self.key, encoding.toSignedBuffer(), sha256)
s.setSignature(Blob(signer.digest()))
interest.setName(interestName.getPrefix(-1).append(
wireFormat.encodeSignatureValue(s)))
def verifyInterest(self, interest, wireFormat=None):
if wireFormat is None:
wireFormat = WireFormat.getDefaultWireFormat()
signature = self.extractInterestSignature(interest, wireFormat)
encoding = interest.wireEncode(wireFormat)
hasher = hmac.new(self.key, encoding.toSignedBuffer(), sha256)
return signature.getSignature().toRawStr() == hasher.digest()