1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
|
import os import string import random import socketserver import signal from hashlib import sha256
from ocb.aes import AES from ocb import OCB
FLAG =
BLOCKSIZE = 16 MENU = br""" [1] Encrypt [2] Decrypt [3] Get Flag [4] Exit """
class Task(socketserver.BaseRequestHandler): def _recvall(self): pass
def send(self, msg, newline=True): pass
def recv(self, prompt=b'> '): pass
def recvhex(self, prompt=b'> '): pass
def proof_of_work(self): pass
def timeout_handler(self, signum, frame): pass def encrypt(self, nonce, message, associate_data=b''): assert nonce not in self.NONCEs self.NONCEs.add(nonce) self.ocb.setNonce(nonce) tag, cipher = self.ocb.encrypt(bytearray(message), bytearray(associate_data)) return (bytes(cipher), bytes(tag)) def decrypt(self, nonce, cipher, tag, associate_data=b''): self.ocb.setNonce(nonce) authenticated, message = self.ocb.decrypt( *map(bytearray, (associate_data, cipher, tag)) ) if not authenticated: raise ValueError('REJECT') return bytes(message)
def handle(self): signal.signal(signal.SIGALRM, self.timeout_handler) signal.alarm(60) if not self.proof_of_work(): return
aes = AES(128) self.ocb = OCB(aes) KEY = os.urandom(BLOCKSIZE) self.ocb.setKey(KEY) self.NONCEs = set()
while True: USERNAME = self.recv(prompt=b'Enter username > ') if len(USERNAME) > BLOCKSIZE: self.send(b"I can't remember long names") continue if USERNAME == b'Alice': self.send(b'Name already used') continue break
signal.alarm(60) while True: self.send(MENU, newline=False) try: choice = int(self.recv(prompt=b'Enter option > '))
if choice == 1: nonce = self.recvhex(prompt=b'Enter nonce > ') message = self.recvhex(prompt=b'Enter message > ') associate_data = b'from ' + USERNAME ciphertext, tag = self.encrypt(nonce, message, associate_data) self.send(str.encode(f"ciphertext: {ciphertext.hex()}")) self.send(str.encode(f"tag: {tag.hex()}"))
elif choice == 2: nonce = self.recvhex(prompt=b'Enter nonce > ') ciphertext = self.recvhex(prompt=b'Enter ciphertext > ') tag = self.recvhex(prompt=b'Enter tag > ') associate_data = self.recvhex(prompt=b'Enter associate data > ') message = self.decrypt(nonce, ciphertext, tag, associate_data) self.send(str.encode(f"message: {message.hex()}"))
elif choice == 3: nonce = self.recvhex(prompt=b'Enter nonce > ') ciphertext = self.recvhex(prompt=b'Enter ciphertext > ') tag = self.recvhex(prompt=b'Enter tag > ') associate_data = b'from Alice' message = self.decrypt(nonce, ciphertext, tag, associate_data) if message == b'please_give_me_the_flag': self.send(FLAG)
elif choice == 4: break else: break
except: self.send(b'Error!') break signal.alarm(0)
self.send(b'Bye!') self.request.close()
class ForkedServer(socketserver.ForkingMixIn, socketserver.TCPServer): pass
if __name__ == "__main__": HOST, PORT = '0.0.0.0', 10000 print(HOST, PORT) server = ForkedServer((HOST, PORT), Task) server.allow_reuse_address = True server.serve_forever()
|