为构建并部署在Docker容器中的FastAPI应用设计一个离线的软件许可证颁发方案。
前言 FastAPI是一个现代、快速(高性能)的Web框架,用于基于标准Python类型提示构建API。 在商业化场景中,需要为FastAPI添加软件许可验证机制,而现代化的FastAPI应用通常部署在Docker容器中(与宿主机隔离)。 为此,本文设计了一种基于Docker的应用软件离线许可证颁发方案,使用非对称加密技术,无需通过联网环境即可完成软件/产品的许可验证。
许可证生成架构图
基本流程 私钥与公钥生成 使用非对称加密算法,生成一组私钥和公钥。其中私钥用于生成License.lic,用户的FastAPI应用里只用公钥验证,整个过程完全离线。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from cryptography.hazmat.primitives import serializationfrom cryptography.hazmat.primitives.asymmetric import rsaprivate_key = rsa.generate_private_key(public_exponent=65537 , key_size=2048 ) public_key = private_key.public_key() with open ("private_key.pem" , "wb" ) as f: f.write(private_key.private_bytes( encoding=serialization.Encoding.PEM, format =serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() )) with open ("public_key.pem" , "wb" ) as f: f.write(public_key.public_bytes( encoding=serialization.Encoding.PEM, format =serialization.PublicFormat.SubjectPublicKeyInfo ))
运行后你将得到private_key.pem与public_key.pem两个文件。
许可证信息准备 在配置文件license_info.json中配置机器码machine_id和许可证过期时间expire_days。
1 2 3 4 { "machine_id" : "e29633fade0fc20cbf6a4ff8144f8b09ea97db1e83830e4a9551cef7ae482b94" , "expire_days" : 365 }
其中machine_id可用跨平台机器ID库py-machineid获取。
pip install py-machineid
该库在被调用时,会直接尝试读取宿主机的硬件信息(如Windows注册表、Linux的/etc/machine-id或macOS的硬件序列号)。这部分操作完全由Python代码执行,绕过了Kubernetes的环境注入机制。 由于容器内的Python进程看到的是独立的文件系统和注册表,py-machineid会读取容器内部的machine-id。这个ID通常是Docker为容器生成的虚拟ID,会随容器的重建而发生改变。
为了使py-machineid读取的始终为宿主机的ID,常规方法是通过挂载将宿主机的machine-id文件覆盖容器内的同名文件。例如宿主机为Linux系统,在运行容器时添加-v参数:
docker run -d -v /etc/machine-id:/etc/machine-id:ro your-image:latest
由于不同操作系统文件路径差异较大,最稳妥的方式还是通过环境变量传递machine-id,可在运行容器时添加-e参数:
docker run -d -e HOST_MACHINE_ID="your-actual-host-machine-id-here" your-image:latest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import osimport machineidimport hashlibdef get_machine_fingerprint () -> str : env_id = os.environ.get("HOST_MACHINE_ID" ) if env_id: print ("Using machine ID from environment variable" ) return hashlib.sha256(env_id.encode()).hexdigest() machine_id = machineid.id () hashed_id = machineid.hashed_id('your_app_name' ) print ("Using direct machine ID from system" ) return hashlib.sha256(hashed_id.encode()).hexdigest()
技术人员可使用license_info_generate.py脚本在docker应用程序的宿主机上自动生成license_info.json文件,并用私钥private_key.pem生成对应的软件许可证license.lic颁发给用户。
用户将license.lic文件放在相应目录后,每次FastAPI应用启动时都将使用public_key.pem对其解析并验证,验证过程中同样使用get_machine_fingerprint方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import hashlibimport jsonimport machineiddef get_machine_fingerprint () -> str : machine_id = machineid.id () hashed_id = machineid.hashed_id('your_app_name' ) print ("Using direct machine ID from system" ) return hashlib.sha256(hashed_id.encode()).hexdigest() machine_id = get_machine_fingerprint() data = { "machine_id" : machine_id, "expire_days" : 365 } with open ('license_info.json' , 'w' , encoding='utf-8' ) as f: json.dump(data, f, indent=4 ) print ("license_info.json file generated" )
许可证颁发 使用前面生成的私钥private_key.pem生成license.lic文件,用于后续的FastAPI应用许可验证。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 import jsonimport base64from cryptography.hazmat.primitives import hashes, serializationfrom cryptography.hazmat.primitives.asymmetric import paddingfrom datetime import datetime, timedelta, timezonedef generate_license (customer_machine_id: str , expire_days: int = 365 ): with open ("private_key.pem" , "rb" ) as f: private_key = serialization.load_pem_private_key(f.read(), password=None ) payload = { "machine_id" : customer_machine_id, "expire" : (datetime.now(timezone.utc) + timedelta(days=expire_days)).isoformat(), } payload_bytes = json.dumps(payload).encode('utf-8' ) signature = private_key.sign( payload_bytes, padding.PKCS1v15(), hashes.SHA256() ) license_data = base64.b64encode(payload_bytes).decode('utf-8' ) license_signature = base64.b64encode(signature).decode('utf-8' ) final_license = f"{license_data} .{license_signature} " print ("This license string is provided to the customer:" ) print (final_license) return final_license def write_license_simple (license_content ): """最简单的写入方式""" with open ("license.lic" , "w" ) as f: f.write(license_content) print ("License file created: license.lic" ) with open ('license_info.json' , 'r' ) as f: config = json.load(f) machine_id = config['machine_id' ] expire_days = config['expire_days' ] lic = generate_license(machine_id, expire_days) write_license_simple(lic)
应用许可验证 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 import platformimport uuidfrom fastapi import FastAPI, Depends, HTTPException, Requestfrom cryptography.hazmat.primitives import hashes, serializationfrom cryptography.hazmat.primitives.asymmetric import paddingimport jsonimport base64from datetime import datetime, timezoneapp = FastAPI() with open ("public_key.pem" , "rb" ) as f: PUBLIC_KEY = serialization.load_pem_public_key(f.read()) import subprocessimport hashlibimport hashlibimport uuiddef get_machine_fingerprint () -> str : def verify_license (license_str: str ): """验证License字符串是否有效、是否过期、是否匹配本机""" try : data_b64, signature_b64 = license_str.split('.' ) payload_bytes = base64.b64decode(data_b64) signature = base64.b64decode(signature_b64) PUBLIC_KEY.verify( signature, payload_bytes, padding.PKCS1v15(), hashes.SHA256() ) payload = json.loads(payload_bytes) expire_time = datetime.fromisoformat(payload["expire" ]) if expire_time < datetime.now(timezone.utc): raise HTTPException(status_code=403 , detail="License is expired" ) current_machine_id = get_machine_fingerprint() print ("current machine id:" + current_machine_id) if payload["machine_id" ] != current_machine_id: raise HTTPException(status_code=403 , detail="The license does not match the current device" ) return payload except Exception as e: raise HTTPException(status_code=403 , detail=f"License validate failed: {str (e)} " ) with open ("license.lic" , "r" ) as f: lic = f.read() print (f"read content: '{lic} '" ) res = verify_license(lic) print (res)
参考文档