diff --git a/actions/run.py b/actions/run.py index 51aa452d5da07dc10400605f546b5024bff50924..a960f3be201a0b2626b0823be6dd101b126daa64 100644 --- a/actions/run.py +++ b/actions/run.py @@ -18,6 +18,7 @@ from common.communication.messages_pb2 import PerceptionRequest, PerceptionRespo WINDOW = 64 +# Actions: https://rose1.ntu.edu.sg/dataset/actionRecognition/ - resulting actions start from 1 as well def to_pose_25(pose_18): # 'root', 'RHip', 'RKnee', 'RAnkle', 'LHip', 'LKnee', 'LAnkle', 'torso', 'neck', 'nose', 'head', 'LShoulder', @@ -194,9 +195,10 @@ def process_distance_requests(requests: Iterable[PerceptionRequest], results: It people_last_frame_ind = {p_id: -1 for p_id in people_ids} for frame_ind, p_id_and_poses in enumerate(actions_info[0]): for p_id, pose in list(people_poses.items()): - if p_id not in p_id_and_poses and frame_ind - people_last_frame_ind[p_id] > max_occlusion_dur_frames: - del people_poses[p_id] - del people_last_frame_ind[p_id] + if p_id not in p_id_and_poses: + if frame_ind - people_last_frame_ind[p_id] > max_occlusion_dur_frames: + del people_poses[p_id] + del people_last_frame_ind[p_id] continue pose[frame_ind, :, :] = p_id_and_poses[p_id] people_last_frame_ind[p_id] = frame_ind @@ -291,7 +293,7 @@ def run(parser: argparse.ArgumentParser): 'missing and still be taken into ' 'account for actions classification') parser.add_argument('--detection_threshold', type=float, default=5, help='all actions with a greater score will ' - 'be returned') + 'be returned') args = parser.parse_args() @@ -340,9 +342,3 @@ def run(parser: argparse.ArgumentParser): if __name__ == '__main__': run(argparse.ArgumentParser()) - -# todo: План: -# 1) Посмотреть, из чего составлен датасет - длительность в кадках и секундах, как описаны позы с одним/двумя людьми, -# могут ли быть несколько действий, есть ли специальное null действие -# 4) Тест, где мы возьмем из тестового набора приеры и предскажим -# 5) Медленно работает, может быть, интерполяция? Попробовать удалить деетктирование и посмотреть, как без него работает \ No newline at end of file diff --git a/common/communication/kafka_common.py b/common/communication/kafka_common.py index c35087a5f79f203f9feec84930397ea8fdf53729..d163197a92b05ce26f29cff7c88c28a1e6d4b141 100644 --- a/common/communication/kafka_common.py +++ b/common/communication/kafka_common.py @@ -7,6 +7,7 @@ from typing import Callable, Iterable, Deque, Dict # noinspection PyProtectedMember from kafka import KafkaConsumer, KafkaProducer, TopicPartition, OffsetAndMetadata, ConsumerRebalanceListener +from kafka.errors import NoBrokersAvailable from common.communication.messages_pb2 import * @@ -80,7 +81,16 @@ class KafkaRequestProcessor: producer_config['sasl_plain_username'] = username producer_config['sasl_plain_password'] = password - self.consumer = KafkaConsumer(**consumer_config) + # На случай старта kafka вместе с остальными контейнерами ожидаем ее инициализацию + initialization_limit_time = time() + 60 + while True: + try: + self.consumer = KafkaConsumer(**consumer_config) + break + except NoBrokersAvailable as e: + if time() > initialization_limit_time: + raise e + sleep(1) self.producer = KafkaProducer(**producer_config) @staticmethod diff --git a/detection_cbnet/run.py b/detection_cbnet/run.py index 82815a0d6a1a78a798a0f115f6dcc0ad2463d215..1ab0b39297c3dc07a47100c6f098f4a169eac223 100644 --- a/detection_cbnet/run.py +++ b/detection_cbnet/run.py @@ -1,5 +1,6 @@ import argparse import logging +import math import os from typing import Iterable @@ -110,9 +111,9 @@ def predict_image(images, model, max_size): ori_h, ori_w = img.shape[:2] cur_size = ori_h * ori_w if cur_size > max_size: - size_scale = max_size / (ori_h * ori_w) + size_scale = math.sqrt(max_size / (ori_h * ori_w)) img = mmcv.imrescale(img, size_scale, return_scale=False, backend='cv2') - logging.info('The image is too large, scaling down ' + str(size_scale)) + logging.info('The image is too large, scaling down ' + str(size_scale) + '. New shape: ' + str(img.shape)) new_h, new_w = img.shape[:2] w_scale = new_w / ori_w h_scale = new_h / ori_h diff --git a/test/actions_test/actions_general_test.py b/test/actions_test/actions_general_test.py index d2c62c4dc5b1e636348e21e79f96554874c86938..171532d7cc61a806c93362d08ed58c47352eb172 100644 --- a/test/actions_test/actions_general_test.py +++ b/test/actions_test/actions_general_test.py @@ -3,7 +3,6 @@ import os import shutil import unittest -# noinspection PyProtectedMember import numpy as np from kafka import KafkaProducer, KafkaConsumer, TopicPartition from kafka.producer.future import FutureRecordMetadata @@ -17,6 +16,9 @@ from test.pose3d_test import pose3d_utils from test.pose3d_test.pose3d_general_test import Pose3DServiceTest +# noinspection PyProtectedMember + + # noinspection DuplicatedCode @@ -153,6 +155,7 @@ class ActionsServiceTest(unittest.TestCase): actions[p_key].append(action_info.action) f.write('{} {} {}\n'.format(frame_ind+1, frame_result.actions.latency_frames, actions)) + print("Building skeletons animation") scene_file_path = os.path.join(result_dir_path, poses_file_name + '.html') pose3d_utils.build_3d_skeletons_animation(poses_sequence, VHM_PARENT_IDS, scene_file_path) diff --git a/test/common_test/kafka_common_test.py b/test/common_test/kafka_common_test.py index 5e376241bfa83b375f7001fd9ff4cbb6844d4d40..117f6de55bde167a9dc1be893cd9b31d446fef0b 100644 --- a/test/common_test/kafka_common_test.py +++ b/test/common_test/kafka_common_test.py @@ -1,3 +1,4 @@ +import collections import logging import socket import threading @@ -199,19 +200,39 @@ class KafkaRequestProcessorTest(unittest.TestCase): return timeout_ms @staticmethod - def wait_for_results(consumer, expected_count, timeout_ms): - received_results = {} + def wait_for_results(consumer, expected_count, timeout_ms, results_storage=None): + no_leftovers = results_storage is None + results_storage = {} if results_storage is None else results_storage + results = {} res_count = 0 - while timeout_ms > 0 and res_count < expected_count: - start_time = time.time() - for tp, res_list in consumer.poll(timeout_ms).items(): - res_count += len(res_list) - if tp not in received_results: - received_results[tp] = [] - for r in res_list: - received_results[tp].append(r.value) - timeout_ms -= round((time.time() - start_time) * 1000) - return received_results + start_time = time.time() + while res_count < expected_count: + if len(results_storage) == 0: + left_time_ms = timeout_ms - round((time.time() - start_time) * 1000) + if left_time_ms <= 0: + break + + poll_data = consumer.poll(left_time_ms) + if len(poll_data) == 0: + continue + + for tp, res_list in poll_data.items(): + res_queue = collections.deque() + res_queue.extend(res_list) + results_storage[tp] = res_queue + + tp, res_queue = next(iter(results_storage.items())) + if tp not in results: + results[tp] = [] + results[tp].append(res_queue.popleft().value) + res_count += 1 + if len(res_queue) == 0: + del results_storage[tp] + + if no_leftovers and len(results_storage) > 0: + raise RuntimeError('There are left records ' + str(results_storage)) + + return results def send(self, topic_name, partition, manager_id, video_id, frame_id=None, finished=None): r = PerceptionRequest(manager_id=manager_id, video_id=video_id) diff --git a/test/complete_perception_test/complete-perception-test-env-start.sh b/test/complete_perception_test/complete-perception-test-env-start.sh index 5eb7b5c1753b567e5f6c8893fa0462c4ee531e19..6c7620000226068e44e5ab8899a676530d155184 100755 --- a/test/complete_perception_test/complete-perception-test-env-start.sh +++ b/test/complete_perception_test/complete-perception-test-env-start.sh @@ -1,5 +1,5 @@ #!/bin/bash current_dir=$(pwd) cd "$(dirname "${BASH_SOURCE[0]}")" || exit 1 -docker-compose up -d --force-recreate --build +docker-compose up -d --force-recreate --build "$@" cd "$current_dir" || exit 1 \ No newline at end of file diff --git a/test/complete_perception_test/complete-perception-test-env-stop.sh b/test/complete_perception_test/complete-perception-test-env-stop.sh index 8ab874984076ec390d48eca6aff1de54aeef804b..2aa765b6993926d29327b856f7b71d30a0c626f1 100755 --- a/test/complete_perception_test/complete-perception-test-env-stop.sh +++ b/test/complete_perception_test/complete-perception-test-env-stop.sh @@ -1,6 +1,5 @@ #!/bin/bash current_dir=$(pwd) cd "$(dirname "${BASH_SOURCE[0]}")" || exit 1 -docker-compose down -docker-compose rm +echo "y" | docker-compose rm -s -v "$@" cd "$current_dir" || exit 1 \ No newline at end of file diff --git a/test/complete_perception_test/complete_perception_general_test.py b/test/complete_perception_test/complete_perception_general_test.py index b384b04d20e448345e38dc564717cc3dfbf68d96..caa8c040475d342a2275def7f2951bb80faffa90 100644 --- a/test/complete_perception_test/complete_perception_general_test.py +++ b/test/complete_perception_test/complete_perception_general_test.py @@ -53,55 +53,31 @@ class CompletePerceptionTest(unittest.TestCase): """Для выполнения данного теста необходимо запустить тестовую Kafka на порту 9097, тестовые detection, pose3d и distance сервисы, работающие с ней. Это можно сделать запустив скрипт distance-test-env-start.sh""" - bootstrap_servers = 'localhost:9097' + self.bootstrap_servers = 'localhost:9097' self.detection_topic_name = 'detection-requests' - detection_result_topic_prefix = 'detection-results-' + self.detection_result_topic_prefix = 'detection-results-' self.tracking_topic_name = 'tracking-requests' - tracking_result_topic_prefix = 'tracking-results-' + self.tracking_result_topic_prefix = 'tracking-results-' self.pose3d_topic_name = 'pose3d-requests' - pose3d_result_topic_prefix = 'pose3d-results-' + self.pose3d_result_topic_prefix = 'pose3d-results-' self.distance_topic_name = 'distance-requests' - distance_result_topic_prefix = 'distance-results-' + self.distance_result_topic_prefix = 'distance-results-' self.actions_topic_name = 'actions-requests' - actions_result_topic_prefix = 'actions-results-' + self.actions_result_topic_prefix = 'actions-results-' - Pose3DServiceTest.ensure_topic_empty(bootstrap_servers, self.detection_topic_name, 0) - Pose3DServiceTest.ensure_topic_empty(bootstrap_servers, self.tracking_topic_name, 0) - Pose3DServiceTest.ensure_topic_empty(bootstrap_servers, self.pose3d_topic_name, 0) - Pose3DServiceTest.ensure_topic_empty(bootstrap_servers, self.distance_topic_name, 0) - Pose3DServiceTest.ensure_topic_empty(bootstrap_servers, self.actions_topic_name, 0) + Pose3DServiceTest.ensure_topic_empty(self.bootstrap_servers, self.detection_topic_name, 0) + Pose3DServiceTest.ensure_topic_empty(self.bootstrap_servers, self.tracking_topic_name, 0) + Pose3DServiceTest.ensure_topic_empty(self.bootstrap_servers, self.pose3d_topic_name, 0) + Pose3DServiceTest.ensure_topic_empty(self.bootstrap_servers, self.distance_topic_name, 0) + Pose3DServiceTest.ensure_topic_empty(self.bootstrap_servers, self.actions_topic_name, 0) - self.producer: KafkaProducer = KafkaRequestProcessorTest.get_test_producer(bootstrap_servers) + self.detection_tp = TopicPartition(self.detection_result_topic_prefix + '1', 0) + self.tracking_tp = TopicPartition(self.tracking_result_topic_prefix + '1', 0) + self.pose3d_tp = TopicPartition(self.pose3d_result_topic_prefix + '1', 0) + self.distance_tp = TopicPartition(self.distance_result_topic_prefix + '1', 0) + self.actions_tp = TopicPartition(self.actions_result_topic_prefix + '1', 0) - detection_consumer: KafkaConsumer = KafkaRequestProcessorTest.get_test_consumer(bootstrap_servers, - client_id='test-det-c') - detection_tp = TopicPartition(detection_result_topic_prefix + '1', 0) - detection_consumer.assign([detection_tp]) - detection_consumer.seek(detection_tp, 0) - - tracking_consumer: KafkaConsumer = KafkaRequestProcessorTest.get_test_consumer(bootstrap_servers, - client_id='test-track-c') - tracking_tp = TopicPartition(tracking_result_topic_prefix + '1', 0) - tracking_consumer.assign([tracking_tp]) - tracking_consumer.seek(tracking_tp, 0) - - pose3d_consumer: KafkaConsumer = KafkaRequestProcessorTest.get_test_consumer(bootstrap_servers, - client_id='test-pose3d-c') - pose3d_tp = TopicPartition(pose3d_result_topic_prefix + '1', 0) - pose3d_consumer.assign([pose3d_tp]) - pose3d_consumer.seek(pose3d_tp, 0) - - distance_consumer: KafkaConsumer = KafkaRequestProcessorTest.get_test_consumer(bootstrap_servers, - client_id='test-dist-c') - distance_tp = TopicPartition(distance_result_topic_prefix + '1', 0) - distance_consumer.assign([distance_tp]) - distance_consumer.seek(distance_tp, 0) - - actions_consumer: KafkaConsumer = KafkaRequestProcessorTest.get_test_consumer(bootstrap_servers, - client_id='test-act-c') - actions_tp = TopicPartition(actions_result_topic_prefix + '1', 0) - actions_consumer.assign([actions_tp]) - actions_consumer.seek(actions_tp, 0) + self.producer: KafkaProducer = KafkaRequestProcessorTest.get_test_producer(self.bootstrap_servers) videos_dir_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'videos/') # Следует указать список видеозаписей и соответствующих им параметров камеры @@ -117,14 +93,33 @@ class CompletePerceptionTest(unittest.TestCase): 'S001C001P001R001A057_rgb.avi': {'camera_height_mm': 2524-160, 'focal_length_mm': 2.8, 'pixel_size_mm': 5.376 / 1920, 'window_size_frames': 0}, 'S001C001P001R001A058_rgb.avi': {'camera_height_mm': 2524-160, 'focal_length_mm': 2.8, 'pixel_size_mm': 5.376 / 1920, 'window_size_frames': 0}, 'S001C001P001R001A059_rgb.avi': {'camera_height_mm': 2524-160, 'focal_length_mm': 2.8, 'pixel_size_mm': 5.376 / 1920, 'window_size_frames': 0}, - 'S001C001P001R001A060_rgb.avi': {'camera_height_mm': 2524-160, 'focal_length_mm': 2.8, 'pixel_size_mm': 5.376 / 1920, 'window_size_frames': 0}} + 'S001C001P001R001A060_rgb.avi': {'camera_height_mm': 2524-160, 'focal_length_mm': 2.8, 'pixel_size_mm': 5.376 / 1920, 'window_size_frames': 0} + } result_dir_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'results/') if os.path.exists(result_dir_path): shutil.rmtree(result_dir_path) os.mkdir(result_dir_path) - for i, (video_name, params) in enumerate(videos.items()): + self.run_stage('detection and tracking', videos, videos_dir_path, result_dir_path, + self.detection_and_tracking_stage) + self.run_stage('pose3d', videos, videos_dir_path, result_dir_path, self.pose3d_stage) + self.run_stage('distance and actions', videos, videos_dir_path, result_dir_path, + self.distance_and_actions_stage) + + def run_stage(self, stage_name, videos, videos_dir_path, result_dir_path, stage_video_executor): + print('Stage ' + stage_name) + detection_consumer = self.create_detection_consumer() + tracking_consumer = self.create_tracking_consumer() + pose3d_consumer = self.create_pose3d_consumer() + distance_consumer = self.create_distance_consumer() + actions_consumer = self.create_actions_consumer() + detection_storage = {} + tracking_storage = {} + pose3d_storage = {} + distance_storage = {} + actions_storage = {} + for i, (video_name, params) in enumerate(sorted(videos.items())): print('Processing ' + video_name) video_id = i+1 video_path = os.path.join(videos_dir_path, video_name) @@ -135,173 +130,297 @@ class CompletePerceptionTest(unittest.TestCase): frames_count = int(video_src.get(cv2.CAP_PROP_FRAME_COUNT)) print("Frames count: " + str(frames_count)) - video_tracking_res_path = os.path.join(result_dir_path, os.path.splitext(video_name)[0] + '-track.avi') - video_tracking_res = cv2.VideoWriter(video_tracking_res_path, cv2.VideoWriter_fourcc(*'XVID'), fps, - (video_width, video_height)) - - actions_file_path = os.path.join(result_dir_path, video_name + '-actions.txt') - actions_f = open(actions_file_path, 'w') - - resulting_poses = [] - normal_vectors = [] - frame_id = 0 - while True: - success, frame = video_src.read() - if not success: - break - frame_id += 1 - frame_bytes = cv2.imencode('.jpg', frame)[1].tobytes() - - # Detection - print("Detection: " + str(frame_id)) - self.send_detection(manager_id=1, video_id=video_id, frame_id=frame_id, image=frame_bytes) - self.producer.flush() - detection_tp_results = KafkaRequestProcessorTest.wait_for_results(detection_consumer, 1, 60_000) - self.assertEqual(1, len(detection_tp_results)) - detection_results = detection_tp_results[detection_tp] - self.assertEqual(1, len(detection_results)) - if detection_results[0].HasField("err"): - raise RuntimeError(detection_results[0].err.msg) - - # Tracking - print("Tracking: " + str(frame_id)) - self.send_tracking(manager_id=1, video_id=video_id, frame_id=frame_id, image=frame_bytes, - detections=detection_results[0].detection.entries) - self.producer.flush() - tracking_tp_results = KafkaRequestProcessorTest.wait_for_results(tracking_consumer, 1, 60_000) - self.assertEqual(1, len(tracking_tp_results)) - tracking_results = tracking_tp_results[tracking_tp] - self.assertEqual(1, len(tracking_results)) - if tracking_results[0].HasField("err"): - raise RuntimeError(tracking_results[0].err.msg) - - # Drawing boxes and track ids - boxes = [] - for tracking_entry in tracking_results[0].tracking.entries: - boxes.append({'id': tracking_entry.id, - 'class': tracking_entry.detection.class_id, - 'score': tracking_entry.detection.score, - 'box': [tracking_entry.detection.box_top_left_x, - tracking_entry.detection.box_top_left_y, - tracking_entry.detection.box_bottom_right_x, - tracking_entry.detection.box_bottom_right_y]}) - frame_with_detections = draw_boxes(frame, boxes) - - # Pose estimation - patches_offsets = [] - patches_bytes = [] - poses_ids = [] - for tracking_entry in tracking_results[0].tracking.entries: - if tracking_entry.detection.class_id == 1 and tracking_entry.detection.score > 0.5: - # noinspection PyTypeChecker - patch = cut_patch(frame, (tracking_entry.detection.box_top_left_x, - tracking_entry.detection.box_top_left_y, - tracking_entry.detection.box_bottom_right_x, - tracking_entry.detection.box_bottom_right_y)) - patch_bytes = cv2.imencode('.jpg', patch)[1].tobytes() - poses_ids.append(tracking_entry.id) - patches_bytes.append(patch_bytes) - patches_offsets.append((tracking_entry.detection.box_top_left_x, - tracking_entry.detection.box_top_left_y)) - - print("Pose estimation: " + str(frame_id)) - self.send_pose3d(manager_id=1, video_id=video_id, frame_id=frame_id, patches=patches_bytes) - self.producer.flush() - - pose3d_tp_results = KafkaRequestProcessorTest.wait_for_results(pose3d_consumer, 1, 60_000) - self.assertEqual(1, len(pose3d_tp_results)) - pose3d_results = pose3d_tp_results[pose3d_tp] - self.assertEqual(1, len(pose3d_results)) - self.assertEqual(len(pose3d_results[0].pose3d.entries), len(patches_bytes)) - if pose3d_results[0].HasField("err"): - raise RuntimeError(pose3d_results[0].err.msg) - - pose_entries = [] - for entry, offset in zip(pose3d_results[0].pose3d.entries, patches_offsets): - pose = [] - for kp_ind, joint in enumerate(entry.joints): - pose.append([joint.x + offset[0], joint.y + offset[1], joint.z]) - pose_entries.append(pose) - pose_entries = np.array(pose_entries) - - # Drawing poses - for pose in pose_entries: - frame_with_detections = draw_skeleton(pose, VHM_PARENT_IDS, frame_with_detections) - video_tracking_res.write(frame_with_detections) - - # Estimating distance - print("Distance estimation: " + str(frame_id)) - self.send_distance(manager_id=1, video_id=video_id, frame_id=frame_id, image=frame_bytes, - camera_height_mm=params['camera_height_mm'], - focal_length_mm=params['focal_length_mm'], - pixel_size_mm=params['pixel_size_mm'], entries=pose_entries) - self.producer.flush() - - distance_tp_results = KafkaRequestProcessorTest.wait_for_results(distance_consumer, 1, 60_000) - self.assertEqual(1, len(distance_tp_results)) - distance_results = distance_tp_results[distance_tp] - self.assertEqual(1, len(distance_results)) - self.assertEqual(len(pose_entries), len(distance_results[0].distance.entries)) - if distance_results[0].HasField("err"): - raise RuntimeError(distance_results[0].err.msg) - - nv_obj = distance_results[0].distance.normal_vector - normal_vector = (nv_obj.x, nv_obj.y, nv_obj.z) - - corrected_pose_entries = {} - for p_id, entry in zip(poses_ids, distance_results[0].distance.entries): - pose = [] - for p in entry.points: - pose.append([p.x, p.y, p.z]) - corrected_pose_entries[p_id] = np.array(pose) - print("Corrected poses: " + str(corrected_pose_entries)) - - resulting_poses.append(corrected_pose_entries) - normal_vectors.append(normal_vector) - - # Actions detection - print("Actions detection: " + str(frame_id)) - self.send_actions(manager_id=1, video_id=video_id, frame_id=frame_id, poses=corrected_pose_entries, - window_size_frames=frames_count) - self.producer.flush() - - actions_tp_results = KafkaRequestProcessorTest.wait_for_results(actions_consumer, frames_count + 1, - 60_000) - self.assertEqual(len(actions_tp_results), 1) - actions_results = actions_tp_results[actions_tp] - self.assertEqual(len(actions_results), 1) - if actions_results[0].HasField("err"): - raise RuntimeError(actions_results[0].err.msg) - - actions = {} - for action_info in actions_results[0].actions.entries: - p_key = (action_info.first_person_id,) if action_info.second_person_id == 0 else \ - (action_info.first_person_id, action_info.second_person_id) - if p_key not in actions: - actions[p_key] = [] - actions[p_key].append(action_info.action) - actions_f.write('{} {} {}\n'.format(frame_id, actions_results[0].actions.latency_frames, actions)) - - scene_file_path = os.path.join(result_dir_path, video_name + '.html') - pose3d_utils.build_3d_skeletons_animation(resulting_poses, VHM_PARENT_IDS, scene_file_path, - normal_vec_list=normal_vectors, - camera_height_mm=params['camera_height_mm']) - - actions_f.close() - - self.send_detection(manager_id=1, video_id=video_id, frame_id=frame_id, finished=True) - self.send_tracking(manager_id=1, video_id=video_id, frame_id=frame_id, finished=True) - self.send_pose3d(manager_id=1, video_id=video_id, frame_id=frame_id, finished=True) - self.send_distance(manager_id=1, video_id=video_id, frame_id=frame_id, finished=True) - self.send_actions(manager_id=1, video_id=video_id, frame_id=frame_id, finished=True) + stage_video_executor(video_id, video_name, params, fps, video_width, video_height, frames_count, video_src, + result_dir_path, detection_consumer, tracking_consumer, pose3d_consumer, + distance_consumer, actions_consumer, detection_storage, tracking_storage, + pose3d_storage, distance_storage, actions_storage) + detection_consumer.close() + tracking_consumer.close() + pose3d_consumer.close() + distance_consumer.close() + actions_consumer.close() + + def detection_and_tracking_stage(self, video_id, video_name, params, fps, video_width, video_height, frames_count, + video_src, result_dir_path, detection_consumer, tracking_consumer, pose3d_consumer, + distance_consumer, actions_consumer, detection_storage, tracking_storage, + pose3d_storage, distance_storage, actions_storage): + frame_id = 0 + while True: + success, frame = video_src.read() + if not success: + break + frame_id += 1 + frame_bytes = cv2.imencode('.jpg', frame)[1].tobytes() + + # Detection + print("Detection: " + str(frame_id)) + self.send_detection(manager_id=1, video_id=video_id, frame_id=frame_id, image=frame_bytes) + self.producer.flush() + detection_tp_results = KafkaRequestProcessorTest.wait_for_results(detection_consumer, 1, 60_000) + self.assertEqual(1, len(detection_tp_results)) + detection_results = detection_tp_results[self.detection_tp] + self.assertEqual(1, len(detection_results)) + if detection_results[0].HasField("err"): + raise RuntimeError(detection_results[0].err.msg) + + # Tracking + print("Tracking: " + str(frame_id)) + self.send_tracking(manager_id=1, video_id=video_id, frame_id=frame_id, image=frame_bytes, + detections=detection_results[0].detection.entries) + self.producer.flush() + tracking_tp_results = KafkaRequestProcessorTest.wait_for_results(tracking_consumer, 1, 60_000) + self.assertEqual(1, len(tracking_tp_results)) + tracking_results = tracking_tp_results[self.tracking_tp] + self.assertEqual(1, len(tracking_results)) + if tracking_results[0].HasField("err"): + raise RuntimeError(tracking_results[0].err.msg) + + self.send_detection(manager_id=1, video_id=video_id, frame_id=frame_id, finished=True) + self.send_tracking(manager_id=1, video_id=video_id, frame_id=frame_id, finished=True) + self.producer.flush() + + self.assertEqual(len(KafkaRequestProcessorTest.wait_for_results(detection_consumer, 1, 60_000)), 1) + self.assertEqual(len(KafkaRequestProcessorTest.wait_for_results(tracking_consumer, 1, 60_000)), 1) + + def pose3d_stage(self, video_id, video_name, params, fps, video_width, video_height, frames_count, video_src, + result_dir_path, detection_consumer, tracking_consumer, pose3d_consumer, distance_consumer, + actions_consumer, detection_storage, tracking_storage, pose3d_storage, distance_storage, + actions_storage): + video_tracking_res_path = os.path.join(result_dir_path, os.path.splitext(video_name)[0] + '-track.avi') + video_tracking_res = cv2.VideoWriter(video_tracking_res_path, cv2.VideoWriter_fourcc(*'XVID'), fps, + (video_width, video_height)) + first_request = True + frame_id = 0 + while True: + success, frame = video_src.read() + if not success: + break + frame_id += 1 + + tracking_tp_results = KafkaRequestProcessorTest.wait_for_results(tracking_consumer, 1, 60_000, + tracking_storage) + tracking_results = tracking_tp_results[self.tracking_tp] + + # Drawing boxes and track ids + boxes = [] + for tracking_entry in tracking_results[0].tracking.entries: + boxes.append({'id': tracking_entry.id, + 'class': tracking_entry.detection.class_id, + 'score': tracking_entry.detection.score, + 'box': [tracking_entry.detection.box_top_left_x, + tracking_entry.detection.box_top_left_y, + tracking_entry.detection.box_bottom_right_x, + tracking_entry.detection.box_bottom_right_y]}) + frame_with_detections = draw_boxes(frame, boxes) + + # Pose estimation + patches_offsets = [] + patches_bytes = [] + poses_ids = [] + for tracking_entry in tracking_results[0].tracking.entries: + if tracking_entry.detection.class_id == 1 and tracking_entry.detection.score > 0.5: + # noinspection PyTypeChecker + patch = cut_patch(frame, (tracking_entry.detection.box_top_left_x, + tracking_entry.detection.box_top_left_y, + tracking_entry.detection.box_bottom_right_x, + tracking_entry.detection.box_bottom_right_y)) + patch_bytes = cv2.imencode('.jpg', patch)[1].tobytes() + poses_ids.append(tracking_entry.id) + patches_bytes.append(patch_bytes) + patches_offsets.append((tracking_entry.detection.box_top_left_x, + tracking_entry.detection.box_top_left_y)) + + print("Pose estimation: " + str(frame_id)) + self.send_pose3d(manager_id=1, video_id=video_id, frame_id=frame_id, patches=patches_bytes) self.producer.flush() - self.assertEqual(len(KafkaRequestProcessorTest.wait_for_results(detection_consumer, 1, 60_000)), 1) - self.assertEqual(len(KafkaRequestProcessorTest.wait_for_results(tracking_consumer, 1, 60_000)), 1) - self.assertEqual(len(KafkaRequestProcessorTest.wait_for_results(pose3d_consumer, 1, 60_000)), 1) - self.assertEqual(len(KafkaRequestProcessorTest.wait_for_results(distance_consumer, 1, 60_000)), 1) - self.assertEqual(len(KafkaRequestProcessorTest.wait_for_results(actions_consumer, 1, 60_000)), 1) + if first_request: + print("Waiting for the pose3d estimation service to response...") + pose3d_tp_results = KafkaRequestProcessorTest.wait_for_results(pose3d_consumer, 1, + 300_000 if first_request else 60_000) + self.assertEqual(1, len(pose3d_tp_results)) + pose3d_results = pose3d_tp_results[self.pose3d_tp] + self.assertEqual(1, len(pose3d_results)) + self.assertEqual(len(pose3d_results[0].pose3d.entries), len(patches_bytes)) + if pose3d_results[0].HasField("err"): + raise RuntimeError(pose3d_results[0].err.msg) + + pose_entries = [] + for entry, offset in zip(pose3d_results[0].pose3d.entries, patches_offsets): + pose = [] + for kp_ind, joint in enumerate(entry.joints): + pose.append([joint.x + offset[0], joint.y + offset[1], joint.z]) + pose_entries.append(pose) + pose_entries = np.array(pose_entries) + + # Drawing poses + for pose in pose_entries: + frame_with_detections = draw_skeleton(pose, VHM_PARENT_IDS, frame_with_detections) + video_tracking_res.write(frame_with_detections) + + first_request = False + + video_tracking_res.release() + + self.send_pose3d(manager_id=1, video_id=video_id, frame_id=frame_id, finished=True) + self.producer.flush() + + self.assertEqual(len(KafkaRequestProcessorTest.wait_for_results(tracking_consumer, 1, 60_000, + tracking_storage)), 1) + self.assertEqual(len(KafkaRequestProcessorTest.wait_for_results(pose3d_consumer, 1, 60_000)), 1) + + def distance_and_actions_stage(self, video_id, video_name, params, fps, video_width, video_height, frames_count, + video_src, result_dir_path, detection_consumer, tracking_consumer, pose3d_consumer, + distance_consumer, actions_consumer, detection_storage, tracking_storage, + pose3d_storage, distance_storage, actions_storage): + actions_file_path = os.path.join(result_dir_path, video_name + '-actions.txt') + actions_f = open(actions_file_path, 'w') + + resulting_poses = [] + normal_vectors = [] + + first_request = True + frame_id = 0 + while True: + success, frame = video_src.read() + if not success: + break + frame_id += 1 + frame_bytes = cv2.imencode('.jpg', frame)[1].tobytes() + + tracking_tp_results = KafkaRequestProcessorTest.wait_for_results(tracking_consumer, 1, 60_000, + tracking_storage) + tracking_results = tracking_tp_results[self.tracking_tp] + patches_offsets = [] + poses_ids = [] + for tracking_entry in tracking_results[0].tracking.entries: + if tracking_entry.detection.class_id == 1 and tracking_entry.detection.score > 0.5: + poses_ids.append(tracking_entry.id) + patches_offsets.append((tracking_entry.detection.box_top_left_x, + tracking_entry.detection.box_top_left_y)) + + pose3d_tp_results = KafkaRequestProcessorTest.wait_for_results(pose3d_consumer, 1, 60_000, pose3d_storage) + pose3d_results = pose3d_tp_results[self.pose3d_tp] + + pose_entries = [] + for entry, offset in zip(pose3d_results[0].pose3d.entries, patches_offsets): + pose = [] + for kp_ind, joint in enumerate(entry.joints): + pose.append([joint.x + offset[0], joint.y + offset[1], joint.z]) + pose_entries.append(pose) + pose_entries = np.array(pose_entries) + + # Estimating distance + print("Distance estimation: " + str(frame_id)) + self.send_distance(manager_id=1, video_id=video_id, frame_id=frame_id, image=frame_bytes, + camera_height_mm=params['camera_height_mm'], + focal_length_mm=params['focal_length_mm'], + pixel_size_mm=params['pixel_size_mm'], entries=pose_entries) + self.producer.flush() + if first_request: + print("Waiting for the distance estimation service to response...") + distance_tp_results = KafkaRequestProcessorTest.wait_for_results(distance_consumer, 1, + 300_000 if first_request else 60_000) + self.assertEqual(1, len(distance_tp_results)) + distance_results = distance_tp_results[self.distance_tp] + self.assertEqual(1, len(distance_results)) + self.assertEqual(len(pose_entries), len(distance_results[0].distance.entries)) + if distance_results[0].HasField("err"): + raise RuntimeError(distance_results[0].err.msg) + + nv_obj = distance_results[0].distance.normal_vector + normal_vector = (nv_obj.x, nv_obj.y, nv_obj.z) + + corrected_pose_entries = {} + for p_id, entry in zip(poses_ids, distance_results[0].distance.entries): + pose = [] + for p in entry.points: + pose.append([p.x, p.y, p.z]) + corrected_pose_entries[p_id] = np.array(pose) + print("Corrected poses: " + str(corrected_pose_entries)) + + resulting_poses.append(corrected_pose_entries) + normal_vectors.append(normal_vector) + + # Actions detection + print("Actions detection: " + str(frame_id)) + self.send_actions(manager_id=1, video_id=video_id, frame_id=frame_id, poses=corrected_pose_entries, + window_size_frames=frames_count) + self.producer.flush() + + actions_tp_results = KafkaRequestProcessorTest.wait_for_results(actions_consumer, 1, 60_000) + self.assertEqual(len(actions_tp_results), 1) + actions_results = actions_tp_results[self.actions_tp] + self.assertEqual(len(actions_results), 1) + if actions_results[0].HasField("err"): + raise RuntimeError(actions_results[0].err.msg) + + actions = {} + for action_info in actions_results[0].actions.entries: + p_key = (action_info.first_person_id,) if action_info.second_person_id == 0 else \ + (action_info.first_person_id, action_info.second_person_id) + if p_key not in actions: + actions[p_key] = [] + actions[p_key].append(action_info.action) + actions_f.write('{} {} {}\n'.format(frame_id, actions_results[0].actions.latency_frames, actions)) + + first_request = False + + scene_file_path = os.path.join(result_dir_path, video_name + '.html') + print("Building skeletons animation") + pose3d_utils.build_3d_skeletons_animation(resulting_poses, VHM_PARENT_IDS, scene_file_path, + normal_vec_list=normal_vectors, + camera_height_mm=params['camera_height_mm']) + + actions_f.close() + + print("Waiting for finish confirmations") + self.send_distance(manager_id=1, video_id=video_id, frame_id=frame_id, finished=True) + self.send_actions(manager_id=1, video_id=video_id, frame_id=frame_id, finished=True) + self.producer.flush() + + self.assertEqual(len(KafkaRequestProcessorTest.wait_for_results(tracking_consumer, 1, 60_000, + tracking_storage)), 1) + self.assertEqual(len(KafkaRequestProcessorTest.wait_for_results(pose3d_consumer, 1, 60_000, + pose3d_storage)), 1) + self.assertEqual(len(KafkaRequestProcessorTest.wait_for_results(distance_consumer, 1, 60_000)), 1) + self.assertEqual(len(KafkaRequestProcessorTest.wait_for_results(actions_consumer, 1, 60_000)), 1) + + def create_detection_consumer(self): + detection_consumer: KafkaConsumer = KafkaRequestProcessorTest.get_test_consumer(self.bootstrap_servers, + client_id='test-det-c') + detection_consumer.assign([self.detection_tp]) + detection_consumer.seek(self.detection_tp, 0) + return detection_consumer + + def create_tracking_consumer(self): + tracking_consumer: KafkaConsumer = KafkaRequestProcessorTest.get_test_consumer(self.bootstrap_servers, + client_id='test-track-c') + tracking_consumer.assign([self.tracking_tp]) + tracking_consumer.seek(self.tracking_tp, 0) + return tracking_consumer + + def create_pose3d_consumer(self): + pose3d_consumer: KafkaConsumer = KafkaRequestProcessorTest.get_test_consumer(self.bootstrap_servers, + client_id='test-pose3d-c') + pose3d_consumer.assign([self.pose3d_tp]) + pose3d_consumer.seek(self.pose3d_tp, 0) + return pose3d_consumer + + def create_distance_consumer(self): + distance_consumer: KafkaConsumer = KafkaRequestProcessorTest.get_test_consumer(self.bootstrap_servers, + client_id='test-dist-c') + distance_consumer.assign([self.distance_tp]) + distance_consumer.seek(self.distance_tp, 0) + return distance_consumer + + def create_actions_consumer(self): + actions_consumer: KafkaConsumer = KafkaRequestProcessorTest.get_test_consumer(self.bootstrap_servers, + client_id='test-act-c') + actions_consumer.assign([self.actions_tp]) + actions_consumer.seek(self.actions_tp, 0) + return actions_consumer def send_detection(self, manager_id, video_id, frame_id=None, finished=None, image=None): r = PerceptionRequest(manager_id=manager_id, video_id=video_id) diff --git a/test/complete_perception_test/docker-compose.yml b/test/complete_perception_test/docker-compose.yml index 3f23861bbe77a7223c14305c6eb0126434282fd6..b4a4dd48c8421be521222ffdac1e89085424dd72 100644 --- a/test/complete_perception_test/docker-compose.yml +++ b/test/complete_perception_test/docker-compose.yml @@ -28,100 +28,85 @@ services: detection_cbnet: build: ../../detection_cbnet/docker-build-context image: devbeh/detection_cbnet - depends_on: - - kafka volumes: - type: bind source: ../../ target: /opt/detection_cbnet/src - networks: - - complete_perception_net + network_mode: host runtime: nvidia environment: - NVIDIA_VISIBLE_DEVICES=${DEVBEH_DETECTION_NVIDIA_VISIBLE_DEVICES:-0} command: 'python run.py detection_cbnet --log_dir /opt/detection_cbnet/logs --weights_url https://www.dropbox.com/s/0vsk5zld23tgrka/htc_cbv2_swin_large22k_patch4_window7_mstrain_400-1400_giou_4conv1f_adamw_1x_coco.pth?dl=1 - --bootstrap_servers kafka:29097' + --bootstrap_servers localhost:9097 --max_size 518400' # 720 * 720 tracking: build: ../../tracking/docker-build-context image: devbeh/tracking - depends_on: - - kafka volumes: - type: bind # Подключаем директорию с кодом. Данный volume не является read_only, поскольку программа может автоматически # скачать и сохранить в нем файлы весов моделей source: ../../ target: /opt/tracking/src - networks: - - complete_perception_net + network_mode: host runtime: nvidia environment: - NVIDIA_VISIBLE_DEVICES=${DEVBEH_TRACKING_NVIDIA_VISIBLE_DEVICES:-0} command: 'python run.py tracking --log_dir /opt/tracking/logs --fairmot_weights_url ${DEVBEH_FAIRMOT_WEIGHTS_URL:-https://drive.google.com/u/0/uc?export=download&confirm=VHcw&id=1iqRQjsG9BawIl8SlFomMg5iwkb6nqSpi} - --bootstrap_servers kafka:29097' + --bootstrap_servers localhost:9097' pose3d_vhm: build: ../../pose3d_vhm/docker-build-context image: devbeh/pose3d_vhm - depends_on: - - kafka volumes: - type: bind # Подключаем директорию с кодом. Данный volume не является read_only, поскольку программа может автоматически # скачать и сохранить в нем файлы весов моделей source: ../../ target: /opt/pose3d_vhm/src - networks: - - complete_perception_net + network_mode: host runtime: nvidia environment: - NVIDIA_VISIBLE_DEVICES=${DEVBEH_POSE3D_VHM_NVIDIA_VISIBLE_DEVICES:-0} command: 'python run.py pose3d_vhm --log_dir /opt/pose3d_vhm/logs --weights_url ${DEVBEH_POSE3D_WEIGHTS_URL:-https://www.dropbox.com/s/5889lels808tm0v/model_chall_train_152ft_384x288.pth.tar?dl=1} - --bootstrap_servers kafka:29097' + --bootstrap_servers localhost:9097' distance: # Текущей директорией в путях является директория, в которой расположен данный docker-compose.yml файл build: ../../distance/docker-build-context image: devbeh/distance - depends_on: - - kafka volumes: - type: bind # Подключаем директорию с кодом. Данный volume не является read_only, поскольку программа может автоматически # скачать и сохранить в нем файлы весов моделей source: ../../ target: /opt/distance/src - networks: - - complete_perception_net + network_mode: host runtime: nvidia environment: - NVIDIA_VISIBLE_DEVICES=${DEVBEH_DISTANCE_NVIDIA_VISIBLE_DEVICES:-0} command: 'python run.py distance --log_dir /opt/distance/logs --weights_url ${DEVBEH_DISTANCE_WEIGHTS_URL:-https://www.dropbox.com/s/yjcg6s57n581sk0/checkpoint.zip?dl=1} - --bootstrap_servers kafka:29097 + --bootstrap_servers localhost:9097 --with_mask --estimation_period_frames 1' actions: # Текущей директорией в путях является директория, в которой расположен данный docker-compose.yml файл build: ../../actions/docker-build-context image: devbeh/actions - depends_on: - - kafka volumes: - type: bind # Подключаем директорию с кодом. Данный volume не является read_only, поскольку программа может автоматически # скачать и сохранить в нем файлы весов моделей source: ../../ target: /opt/actions/src - networks: - - complete_perception_net + network_mode: host runtime: nvidia environment: - NVIDIA_VISIBLE_DEVICES=${DEVBEH_ACTIONS_NVIDIA_VISIBLE_DEVICES:-0} command: 'python run.py actions --log_dir /opt/actions/logs --weights_url ${DEVBEH_ACTIONS_WEIGHTS_URL:-https://drive.google.com/u/0/uc?id=11LiAv52Y4Ye_KdEpcxHHdaBuG7hxgHh-&export=download} - --bootstrap_servers kafka:29097 --window_size_frames 30 --window_step_frames 10 + --bootstrap_servers localhost:9097 --window_size_frames 30 --window_step_frames 10 --max_interaction_dist_mm 6000 --max_occlusion_dur_frames 10 --detection_threshold 5' networks: complete_perception_net: