#!/usr/bin/python3 # -*- coding: utf-8 -*- import os import json import subprocess import argparse import urllib.request OPENAPI_LOGIN_URL = "https://openapi.linkfog.cn/sys/openapi/login" OPENAPI_PACKAGE_RESULT_CALLBACK_URL = "https://open.dianxinai.com/api/v2/channelPkg/buildQueryResult" def make_argument_parser(): parser = argparse.ArgumentParser(prog='deploy', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument( 'json_argv', nargs='?', default=None, help='json format argv' ) parser.add_argument( '--build-directory', default='/build', help='build directory' ) parser.add_argument( '--vendor', default='AB', help='vendor' ) parser.add_argument( '--orgcode', default='A03', help='organization code' ) parser.add_argument( '--channel-id', default='CBV9UHAA0320220905143123262', help='channel id' ) parser.add_argument( '--package-type', default='owner', help='package type: owner or other' ) parser.add_argument( '--signal', default='package', help='signal: pakage, notify_failure, notify_success' ) parser.add_argument( '--pipeline-id', default='', help='pipeline id' ) parser.add_argument( '--project-id', default='', help='project id' ) parser.add_argument( '--url-tag', default='', help='url tag' ) parser.add_argument( '--qrcode-app', default=False, type=bool, help='install qrcode app' ) parser.add_argument( '--event', default='', help='package event' ) return parser def package(options): argv = {} if options.json_argv: try: argv = json.loads(options.json_argv) except Exception: os._exit(1) argv['build_directory'] = argv.get('build_directory', options.build_directory) argv['vendor'] = argv.get('vendor', options.vendor) argv['orgcode'] = argv.get('orgcode', options.orgcode) argv['channel_id'] = argv.get('channel_id', options.channel_id) argv['package_type'] = argv.get('package_type', options.package_type) argv['project_id'] = argv.get('project_id', options.project_id) argv['pipeline_id'] = argv.get('pipeline_id', options.pipeline_id) argv['url_tag'] = argv.get('url_tag', options.url_tag) argv['qrcode_app'] = argv.get('qrcode_app', options.qrcode_app) if options.event == 'deploy': package_script = './pack-deploy-ci.sh' elif options.event == 'system-part': package_script = './pack-system-part-ci.sh' else: package_script = './pack-ci.sh' subprocess.run([ package_script, '--build-directory', argv['build_directory'], '--vendor', argv['vendor'], '--orgcode', argv['orgcode'], '--channel-id', argv['channel_id'], '--package-type', argv['package_type'], '--project-id', argv['project_id'], '--pipeline-id', argv['pipeline_id'], '--url-tag', argv['url_tag'], '--qrcode-app', str(argv['qrcode_app']).lower(), ]) def __login_openapi(): data = { "username": "open_admin", "password": "open_admin123" } encoded_data = json.dumps(data).encode('utf-8') req = urllib.request.Request(OPENAPI_LOGIN_URL, data=encoded_data, method='POST') req.add_header('Content-Type', 'application/json') try: with urllib.request.urlopen(req) as response: return json.loads(response.read()) except urllib.error.URLError as e: print(e.reason) os._exit(3) def notify_success(pipeline_id, project_id): data = { "pipelineId": pipeline_id, "projectId": project_id, "code": 0, "msg": "success", } with open('.result/{}-{}.json'.format(project_id, pipeline_id), 'r') as f: data.update(json.loads(f.read())) encoded_data = json.dumps(data).encode('utf-8') print(encoded_data) req = urllib.request.Request(OPENAPI_PACKAGE_RESULT_CALLBACK_URL, data=encoded_data, method='POST') req.add_header('Content-Type', 'application/json') try: with urllib.request.urlopen(req, timeout=10) as response: print(response.read()) except urllib.error.URLError as e: print(e.reason) os._exit(2) def notify_failure(pipeline_id, project_id): data = { "pipelineId": pipeline_id, "projectId": project_id, "code": 1, "msg": "failure" } encoded_data = json.dumps(data).encode('utf-8') print(encoded_data) req = urllib.request.Request(OPENAPI_PACKAGE_RESULT_CALLBACK_URL, data=encoded_data, method='POST') req.add_header('Content-Type', 'application/json') try: with urllib.request.urlopen(req, timeout=10) as response: print(response.read()) except urllib.error.URLError as e: print(e.reason) os._exit(2) def main(): parser = make_argument_parser() options = parser.parse_args() if options.signal == 'package': package(options) elif options.signal == 'notify_success': notify_success(options.pipeline_id, options.project_id) elif options.signal == 'notify_failure': notify_failure(options.pipeline_id, options.project_id) if __name__ == '__main__': main()