-
Notifications
You must be signed in to change notification settings - Fork 0
/
RGBD_manipulation.py
2019 lines (1458 loc) · 83.9 KB
/
RGBD_manipulation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import os.path as osp
import argparse
import time
import json
import base64
import re
import requests
from openai import OpenAI
import csv
import cv2 as cv
import math
import numpy as np
import datetime
from collections import deque
from abc import ABC, abstractmethod
from softgym.registered_env import env_arg_dict, SOFTGYM_ENVS
from softgym.utils.normalized_env import normalize
from softgym.utils import camera_utils
from softgym.utils.visualization import save_numpy_as_gif
import pyflex
from matplotlib import pyplot as plt
from PIL import Image
from manipulation import RGB_manipulation,encode_image
with open("GPT-API-Key.txt", "r") as f:
api_key = f.read().strip()
client = OpenAI(api_key=api_key)
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
class RGBD_manipulation_part_obs(RGB_manipulation):
"""
This is the manipulation class for the RGBD observation input (paper's method).
Added attributes:
re_consider: whether to enable the evaluation module
in_context_learning: whether to use in-context learning
demo_dir: the directory of the demonstration data
"""
def __init__(self,env,env_name,obs_dir,goal_image,goal_config,goal_depth,re_consider=True,in_context_learning=False,demo_dir="./demo/Manual_test14",img_size=720):
super().__init__(env=env,env_name=env_name,obs_dir=obs_dir,goal_image=goal_image,goal_config=goal_config,goal_depth=goal_depth,img_size=img_size)
self.re_consider=re_consider
self.in_context_learning=in_context_learning
self.demo_dir=demo_dir
def save_obs(self, image, rgbd=None, specifier="init"):
"""
Save the observation to the specified directory in the formate of image and rgbd(.npy).
Input:
image: the image to be saved
rgbd: the rgbd to be saved (default to be None)
specifier: the specifier (usually should be the number of step) of the observation.
Output:
img_path: the path of the saved image
"""
save_name_image = osp.join(self.obs_dir, "image")
save_image = Image.fromarray(image)
img_path=save_name_image+'_'+specifier+'.png'
save_image.save(img_path)
print('observation save to {} \n'.format(img_path))
if rgbd is not None:
save_name_rgbd=osp.join(self.obs_dir,"RGBD")
rgbd_path=save_name_rgbd+'_'+specifier+'.npy'
np.save(rgbd_path, rgbd)
return img_path
def get_center_point_bounding_box(self,img_path,depth,need_box=False):
"""
Get the center point of the bounding box of the fabric in the image and save the image with the center point marked.
Input:
img_path: the path of the image
depth: the depth image of the fabric
need_box: whether to draw the bounding box in the image
Output:
center_point_pixel: the pixel of the center point of the bounding box
rgb: the image with the center point marked
"""
rgb=cv.imread(img_path)# The corner detection step should be finished
top,bottom,left,right=self.get_bounds(depth)
center_point_pixel=[((right-left)//2)+left,((top-bottom)//2)+bottom]
cv.circle(rgb,(center_point_pixel[0],center_point_pixel[1]),4,(0,0,0),-1)
if need_box:
for i in range(bottom,top,4):
rgb[i][left]=[255,0,0]
rgb[i][right]=[255,0,0]
for j in range(left,right,4):
rgb[bottom][j]=[255,0,0]
rgb[top][j]=[255,0,0]
if self.goal_config:
goal_top=center_point_pixel[0]+(self.goal_height)//2
goal_bottom=center_point_pixel[0]-(self.goal_height)//2
goal_left=center_point_pixel[1]-(self.goal_height)//2
goal_right=center_point_pixel[1]+(self.goal_height)//2
## Draw the flattened box
for i in range(goal_bottom,goal_top,2):
rgb[i][goal_left]=[255,255,255]
rgb[i][goal_right]=[255,255,255]
for j in range(goal_left,goal_right,2):
rgb[goal_bottom][j]=[255,255,255]
rgb[goal_top][j]=[255,255,255]
# save_image = Image.fromarray(rgb)
# save_image.save(osp.join(save_obs_dir,'processed.png'))
# np.save("init_obs.npy",obs)
return center_point_pixel,rgb
def response_process(self,response,messages=None):
"""
Process the response from GPT to get the picking point, direction, distance. Map the picking pixel to 3D coordinate
and then use move direction and distance to calculate the placing point.
return both picking point and placing point in 3D coordinate and pixel coordinate.
If the response doesn't contain pick point, direction, distance, return None to let the recal module to ask GPT again.
Input:
response: the response from GPT
Output:
pick_coords: the 3D coordinate of the picking point
place_coords: the 3D coordinate of the placing point
pick_pixel: the pixel coordinate of the picking point
place_pixel: the pixel coordinate of the placing point
"""
if 'choices' not in response.json():
# GPT has some common error.
print(response.json())
return None, None,None,None
else:
# GPT doesn't run into error.
response_message=response.json()['choices'][0]['message']['content']
print(response_message)
# Use regular expression to extract the pick point, direction and distance.
pick_pattern = r'Pick point:.*?\[(.*?)\]'
direction_pattern=r'Moving direction:.*?(\d+/\d+)'
distance_pattern=r'Moving distance:.*?(\d+\.?\d*)'
pick_match = re.search(pick_pattern, response_message)
direction_match = re.search(direction_pattern, response_message)
distance_match = re.search(distance_pattern, response_message)
# Get pick point (pixel) from GPT response and transform it to 3D coordinate.
if not pick_match:
return None,None,None,None
pick_coords = [int(val) for val in pick_match.group(1).split(',')]
pick_pixel=pick_coords
pick_coords=camera_utils.find_nearest(self.pixel_coords,pick_coords[1],pick_coords[0])# map the pixel to 3D coordinate
pick_coords=self.pixel_coords[pick_coords[0]][pick_coords[1]] # The 3D coordinate of the picking point
# Get moving direction and distance from GPT response.
moving_direction = direction_match.group(1) if direction_match else None
if moving_direction is None:
return None,None,None,None
numerator, denominator = moving_direction.split('/')
moving_direction=float(numerator)/float(denominator)
moving_distance = float(distance_match.group(1)) if distance_match else None
if moving_distance is None:
return None,None,None,None
# Calculate the placing point based on the picking point, moving direction and distance.
curr_config=self.env.get_current_config()
dimx,dimy=curr_config['ClothSize']
size=max(dimx,dimy)*self.env.cloth_particle_radius
actual_direction=moving_direction*np.pi
actual_distance=moving_distance*size
delta_x=actual_distance*np.sin(actual_direction)
delta_y=actual_distance*np.cos(actual_direction)
place_coords = pick_coords.copy()
place_coords[0]+=delta_x
place_coords[2]+=delta_y
# calculate the pixel coordinate of the placing point
pixel_size=max(self.goal_height,self.goal_width)
delta_x_pixel=int(pixel_size*np.cos(actual_direction)*moving_distance)
delta_y_pixel=int(pixel_size*np.sin(actual_direction)*moving_distance)
place_pixel=[pick_pixel[0]+delta_x_pixel,pick_pixel[1]-delta_y_pixel]
return pick_coords, place_coords, pick_pixel,place_pixel
def vis_result(self,place_pixel,pick_pixel=None,img_path=None,img=None):
"""
Visualize the result of the pick-and-place action.
If provide both place pixel and pick pixel, draw a circle at the pick pixel and an arrow pointing to the place pixel.
If only provide place pixel, draw a circle at the place pixel (This is to visualize the action from last step).
Input:
place_pixel: the pixel coordinate of the placing point
pick_pixel: the pixel coordinate of the picking point
img_path: the path of the image (If no image is provided, use the img_path to load the image)
img: the image to be visualized
Output:
img: the image with the pick-and-place action visualized
"""
if img_path:
img=cv.imread(img_path)
if pick_pixel is not None:
cv.circle(img, (int(pick_pixel[0]), int(pick_pixel[1])), 5, (0, 255, 0), 2)
cv.arrowedLine(img, (int(pick_pixel[0]), int(pick_pixel[1])), (int(place_pixel[0]), int(place_pixel[1])), (0, 255, 0), 2)
cv.circle(img,(int(place_pixel[0]), int(place_pixel[1])), 5, (128, 0, 128), 2)
else:
cv.circle(img, (int(place_pixel[0]), int(place_pixel[1])), 3, (0, 0, 255), 2)
return img
def _cal_direction(self,start,end):
"""
Given a start point and end point, calculate the direction of the vector from start to end.
Input:
start: the start point
end: the end point
Output:
angle: the angle of the vector from start to end
"""
vector=[end[0]-start[0],start[1]-end[1]]
angle=np.arctan2(vector[1],vector[0])
angle=angle/np.pi
if angle<0.125:
angle+=2
return angle
def recal(self,response_message,place_pixel,pick_pixel,center,img,depth_img=None,last_pick_point=None, last_pick_point_oppo=None):
"""
This is the module to check the correctness of the predicted pick-and-place action (recal module or Evaluation module in the paper).
It will check whether the picking point is close to last picking point and whether the move direction approximately aligns with the
direction starting from the center point to the chosen picking point.
It will return the check results of both direction check and picking point appoximity check with the visualization of the action.
Input:
response_message: the response message from GPT
place_pixel: the pixel coordinate of the placing point
pick_pixel: the pixel coordinate of the picking point
center: the pixel coordinate of the center point
img: the image to be visualized
depth_img: the depth image of the fabric
last_pick_point: the pixel coordinate of the last picking point
last_pick_point_oppo: the pixel coordinate of the symmetric point of the last picking point
Output:
correct_message: the correction message to be sent to the GPT for re-consideration
check_result: whether the picking point is close to the last picking point and the move direction is approximately correct
direction_check: whether the move direction is approximately correct
"""
# 0. setup the parameters
# Whether to check the directions only.
# **This is for ablation study, normally should be false**
check_directions_only=False
# Visualization of the predicted action
img=self.vis_result(place_pixel=place_pixel,pick_pixel=pick_pixel,img=img.copy())
vis_result_path=self.paths['processed vis image']
cv.imwrite(vis_result_path,img)
encoded_vis_result=encode_image(vis_result_path)
if depth_img is not None:
depth_img=self.vis_result(place_pixel=place_pixel,pick_pixel=pick_pixel,img=depth_img.copy())
vis_result_depth_path=self.paths['processed vis depth']
cv.imwrite(vis_result_depth_path,depth_img)
encoded_vis_result=encode_image(vis_result_depth_path)
correct_message=[]
text_correct_message="""
I am providing you with the visualization result of your predicted pick-and-place action. In the image you can see a green circle which is your predicted picking point and a green arrow which points to your predicted move direction and a purple circle at the end of that arrow denoting the estimated placing point.\n
"""
if self.depth_reasoning:
# if the depth_reasoning is enabled, the depth image will be provided to the recal module.
# Normally this will be false as the depth_reasoning method is not used in the paper due to much worse performance.
text_correct_message+="""
I am also providing you the visualization result of your predicted pick-and-place action on the corresponding depth image. In the depth image you can also see a green circle which is your predicted picking point and a green arrow which pointing to the your predicted move direction and a purple circle at the end of that arrow denoting the estimated placing point.\n
"""
# 1. Pick point approximity check
if check_directions_only:
last_pick_point=None
if last_pick_point is not None:
# Check the distance between the predicted picking point and the last picking point and its symmetric point.
pick_check=(abs(pick_pixel[0]-last_pick_point[0])>50) or (abs(pick_pixel[1]-last_pick_point[1])>50)
pick_oppo_check=(abs(pick_pixel[0]-last_pick_point_oppo[0])>50) or (abs(pick_pixel[1]-last_pick_point_oppo[1])>50)
if pick_check and pick_oppo_check:
position_message="By calculation, the chosen picking point is not near the last picking point or its symmetric point, you can stick with this picking point."
elif pick_check:
position_message=f"By calculation, the chosen picking point is near the last picking point's symmetric point. The chosen picking point is [{pick_pixel[0]},{pick_pixel[1]}] and the last picking point's symmetric point is [{last_pick_point_oppo[0]},{last_pick_point_oppo[1]}] so the pick point is within 100 pixel range of that point, please choose another point to pick."
else:
position_message=f"By calculation, the chosen picking point is near the last picking point. The chosen picking point is [{pick_pixel[0]},{pick_pixel[1]}] and the last picking point is [{last_pick_point[0]},{last_pick_point[1]}] so the picking point is within 100 pixel range of that point, please choose another point to pick."
text_correct_message+=position_message
else:
# If no last picking point is provided, skip the checking of the picking point (set the result to be true).
pick_check=True
pick_oppo_check=True
# 2. Direction check
direction_pattern=r'Moving direction:.*?(\d+/\d+)'
direction_match = re.search(direction_pattern, response_message)
moving_direction = direction_match.group(1)
numerator, denominator = moving_direction.split('/')
moving_direction=float(numerator)/float(denominator)
print(f"the moving_direction is {moving_direction} with type {type(moving_direction)}")
# 2.a Calculate the direction from the center point to the picking point
direction=self._cal_direction(center,pick_pixel)
print(f"the cal_direction is {direction} with type {type(direction)}")
# 2.b Check whether the predicted direction is close to the calculated direction (+/- 0.25*pi)
difference=np.abs(moving_direction-direction)
print(f"difference is {difference}")
possible_directions=[]
for i in range(1,9):
possible_directions.append(i/4)
possible_directions=np.array(possible_directions)
possible_directions_diff=np.abs(possible_directions-direction)
choice=np.argmin(possible_directions_diff)
str_direction=self.directions[choice]
left=choice-1
right=choice+2
if left<0:
left=8+left
if right>8:
right=right-8
if left<right:
accept_direction_list=possible_directions[left:right]
str_direction_list=self.directions[left:right]
else:
accept_direction_list=possible_directions[left:]+possible_directions[:right]
str_direction_list=self.directions[left:]+self.directions[:right]
str_direction_list=f"[{','.join(str_direction_list)}]"
direction_check=(moving_direction in accept_direction_list) or difference<0.25
print(direction_check)
# 3. Get the result of both checks and generate the correction message w.r.t different check results.
if direction_check and pick_check and pick_oppo_check:
direction_message="\n By calculating the pick point you choose and the center point, the direction starting from the center point to the picking point is roughly "+str_direction+". The direction you predicted falls in the acceptable range."
elif pick_check and pick_oppo_check:
direction_message="\n The picking point is an acceptable choice as it's not near to the last picking point or its symmetric point. But by calculating the pick point you choose and center point, the direction starting from the center point to the picking point is roughly "+str_direction+". The direction you predicted doesn't fall in the acceptable range. Please use "+str_direction+"as the moving direction if you want to pick the same picking point."
else:
direction_message="\n The picking point is not an accept choice as it's near to last picking point or its symmetric point. The predicted moving direction is also incorrect."
check_result=direction_check and pick_oppo_check and pick_check
text_correct_message+=direction_message
correction_message="""
Based on the assistance of the previous calculation, do you think your predicted move will help flatten the fabric? If so, you can repeat your answer. If you don't think this move will help flatten the fabric, you should give a new prediction following the same output format.
"""
text_correct_message+=correction_message
text_content={
"type":"text",
"text":text_correct_message,
}
image_content={
"type":"image_url",
"image_url":{
"url":f"data:image/jpeg;base64,{encoded_vis_result}",
"detail":"high"
}
}
correct_message.append(text_content)
correct_message.append(image_content)
return correct_message,check_result,direction_check
def get_pick_place(self,messages,headers):
"""
This function is used to get the picking point and placing point from GPT with correct format.
Input:
messages: the conversation history. Normally it will include the system prompt.
headers: the headers for the GPT API
Output:
pick_point: the 3D coordinate of the picking point
place_point: the 3D coordinate of the placing point
pick_pixel: the pixel coordinate of the picking point
place_pixel: the pixel coordinate of the placing point
response_message: the response message from GPT
"""
# 0. Setup the parameters and GPT agent
payload={
"model":"gpt-4-vision-preview",
"messages":messages,
"max_tokens": 1024,
"temperature":0.1,
"top_p":1,
"frequency_penalty":0,
"presence_penalty":0
}
re_cal=True
# 1. Deal with different types of errors from GPT
while re_cal:
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
pick_point,place_point,pick_pixel,_=self.response_process(response)
if 'choices' in response.json():
# GPT doesn't run into error.
response_message=response.json()['choices'][0]['message']['content']
if pick_point is not None:
# GPT gives the correct output
re_cal=False
break
else:
# GPT gives the output with format error
# (The result doesn't contain pick point, direction, distance or not in the desired format)
re_cal=True
time.sleep(30)
format_error_message="The output given by you has format error, please output your result according to the given format."
messages.append(
{
"role":"assistant",
"content":response_message,
})
messages.append(
{
"role":"user",
"content":[
{"type":"text",
"text":format_error_message,
}
]
}
)
else:
# GPT runs into error. In our tests sometimes it's due to "inappropriate content" or "model error"
re_cal=True
time.sleep(30)
format_error_message="I am passing you only two images with one being a fabric lying on the black surface and another is the depth image of that fabric with the cloth being in grayscale and the background being yellow (near brown). There's no inapproriate content. "
messages.append(
{
"role":"user",
"content":[
{"type":"text",
"text":format_error_message,
}
]
}
)
place_pixel=camera_utils.get_pixel_coord_from_world(place_point,(self.img_size,self.img_size),self.env)
place_pixel=place_pixel.astype(int)
return pick_point,place_point,pick_pixel,place_pixel,response_message
def build_in_context_learning_prompt(self,
demo_dir="./demo/demorun5",
):
"""
This function is used to build the in-context learning prompt for the GPT.
Note that we didn't use this in the paper as the in-context learning is not helping in our case (perhaps with some dataset creation techniques it will work).
"""
# image_paths=[]
input_image_paths=[]
output_image_paths=[]
examples="""\n\nHere are some examples for you to reference:\n\n"""
examples+="\n\n-------This is the beginning of a full demonstration of 5 consecutive steps to flatten the fabric-------------\n\n"
for i in range(5):
# demo_path=osp.join(demo_dir,self.env_name)
# # pc_path=demo_path+'_inittest'+str(i)+'.csv'
# # pc=np.genfromtxt(pc_path, delimiter=',')
# pc=self.trim_pc(pc,rate=5)
# pc=np.round(pc,3)
# pointcloud=self.obs_to_str(pc)
input_text_path=osp.join(demo_dir,"demo_step_"+str(i)+'_corners.txt')
input_image_path=osp.join(demo_dir,"processed_image_demo_step_"+str(i)+'.png')
output_text_path=osp.join(demo_dir,"user_input_demo_step_"+str(i)+'.txt')
output_image_path=osp.join(demo_dir,"Vis_result_demo_step_"+str(i)+'.png')
with open (input_text_path,'r+') as p:
input_message=p.read()
with open (output_text_path,'r+') as p:
output_message=p.read()
example="\n##Step "+str(i+1)+"\n"
str_input="\n##Input:\n"+input_message
str_output="\n##Expected output:\n"+output_message+"\n (The visualization of the action is also provided)"
example=example+str_input+"\n\n"+str_output
examples=examples+example+"\n\n"
input_image_paths.append(input_image_path)
output_image_paths.append(output_image_path)
# input_img={
# "type":"image_url",
# "image_url":{
# "url":f"data:image/jpeg;base64,{input_image_path}",
# "detail":"high"
# }
# }
# output_img={
# "type":"image_url",
# "image_url":{
# "url":f"data:image/jpeg;base64,{output_image_path}",
# "detail":"high"
# }
# }
# image_paths.append(input_img)
# image_paths.append(output_img)
examples+="\n\n-------This is the end of the 5 step demostration-------------\n\n"
# uncomment here for debugging
# with open("system_prompt_temp.txt","w+") as sys_promp:
# sys_promp.write(system_prompt)
return examples,input_image_paths,output_image_paths
# return examples, image_paths
def communicate(self,
headers,
messages,
encoded_image,
corners,
center_point_pixel,
curr_coverage,
last_step_info,
direction_seg=8,
distance_seg=4):
"""
This function is used to communicate with the GPT to get the actual pick-and-place action for implementation.
Input:
headers: the headers for the GPT API
messages: the conversation history
encoded_image: the encoded image to be sent to the GPT
corners: the corners detected by the Shi-Tomasi corner detector
center_point_pixel: the pixel coordinate of the center point of the fabric
curr_coverage: the current coverage of the fabric
last_step_info: the information of the last step (coverage, pick point, place point)
direction_seg: the number of segments for the direction
distance_seg: the number of segments for the distance
Output:
pick_point: the 3D coordinate of the picking point
place_point: the 3D coordinate of the placing point
messages: the updated conversation history
last_step_info: the information of the this step
"""
# 0. Setup the parameters
content=[]
corner_str_lst=[]
for corner in corners:#perhaps do sth here
corner_str=f"[{corner[0]},{corner[1]}]"
corner_str_lst.append(corner_str)
corners_str=f"{', '.join(corner_str_lst)}"
print("test corners output: \n",corners_str)
center_point_str=f"[{center_point_pixel[0]}, {center_point_pixel[1]}]"
# 1. prepare the user prompt for input
if last_step_info is None:
# The first step
coverage_message="This is the coverage of the cloth now:"+str(curr_coverage)+".\n"
last_pick_point=None
last_pick_point_oppo=None
text_user_prompt={
"type":"text",
"text":coverage_message+"I am providing you the processed image of the current situation of the cloth to be smoothened. The blue points that you can see are the corners detected by Shi-Tomasi corner detector and here is their corresponding pixel:\n"+corners_str+"\n\nAnd the black point represents the center point of the cloth which is the center point of the cloth's bounding box. Its pixel is "+center_point_str+"\n\nJudging from the input image and the pixel coordinates of the corners and center point, please make the inference following the strategy and output the result using the required format."
}
else:
# This step is not the first step
coverage_change=curr_coverage-last_step_info['coverage']
coverage_message="This is the coverage of the cloth now:"+str(curr_coverage)+".\n"
last_pick_point=last_step_info['place_pixel']
last_pick_point_oppo=[center_point_pixel[0]*2-last_pick_point[0],center_point_pixel[1]*2-last_pick_point[1]]
last_pick_point_str=f'[{last_pick_point[0]},{last_pick_point[1]}]'
last_pick_point_oppo_str=f'[{last_pick_point_oppo[0]},{last_pick_point_oppo[1]}]'
text_user_prompt={
"type":"text",
"text":coverage_message+"I am providing you the processed image of the current situation of the cloth to be smoothened. The blue points that you can see are the corners detected by Shi-Tomasi corner detector and here is their corresponding pixel:\n"+corners_str+"\n\nAnd the black point represents the center point of the cloth which is the center point of the cloth's bounding box. Its pixel is "+center_point_str+"\n\n The red points are the pick point chosen last time and its symmetric point. Its pixel is "+last_pick_point_str+", and its symmetric point's pixel is "+last_pick_point_oppo_str+". It's advised to pick points that are not near those two points.\n\nJudging from the input image and the pixel coordinates of the corners and center point, please make the inference following the strategy and output the result using the required format."
}
# 1.b If the goal configuration is enabled, add the goal configuration information to the user prompt
if self.goal_config:
goal_config_information="\nTo help you with the task while planning, the image also has a white rectangular box around the cloth representing the goal configuration of the cloth which is the flattened cloth's outline. Please use it for reference"
# goal_config_information could have the pixel values of the bounding box
text_user_prompt["text"]+=goal_config_information
content.append(text_user_prompt)
image_user_prompt={
"type":"image_url",
"image_url":{
"url":f"data:image/jpeg;base64,{encoded_image}",
"detail":"high"
}
}
content.append(image_user_prompt)
message={
"role":"user",
"content":content,
}
messages.append(message)
# 2. Pass the user prompt and system prompt to GPT and get the response
pick_point,place_point,pick_pixel,place_pixel,response_message=self.get_pick_place(messages=messages,headers=headers)
messages.append(
{
"role":"assistant",
"content":response_message,
}
)
# 3. Use the recal (Evaluation) module to check the correctness of the predicted action
if self.re_consider:
steps=0 # set a counter to limit the number of recal steps
recon_message,check_result,direction_check=self.recal(response_message=response_message,place_pixel=place_pixel,pick_pixel=pick_pixel,center=center_point_pixel,img=self._step_image, last_pick_point=last_pick_point,last_pick_point_oppo=last_pick_point_oppo)
while not check_result:
# If the response fails to pass the evalution, ask GPT to reconsider the action with added correction message.
messages.append({
"role":"user",
"content":recon_message,
})
pick_point,place_point,pick_pixel,place_pixel,response_message=self.get_pick_place(messages=messages,headers=headers)
messages.append(
{
"role":"assistant",
"content":response_message,
}
)
steps+=1
if steps>=3 and direction_check:
# We check both direction and pick point approximity for at most 3 times. After 3 evalutions, we only check on the direction.
break
recon_message,check_result,direction_check=self.recal(response_message=response_message,place_pixel=place_pixel,pick_pixel=pick_pixel,center=center_point_pixel,img=self._step_image, last_pick_point=last_pick_point,last_pick_point_oppo=last_pick_point_oppo)
# 4. Visualize the result of the pick-and-place action (before actual interaction)
img=self.vis_result(place_pixel=place_pixel,pick_pixel=pick_pixel,img=self._step_image)
vis_result_path=osp.join(self.obs_dir,"Vis_result_"+self._specifier+".png")
cv.imwrite(vis_result_path,img)
# 5. Update the last_step_info
last_step_info={
"pick_pixel":pick_pixel,
"place_pixel":place_pixel,
}
return pick_point,place_point,messages,last_step_info
def communicate_with_depth(self,
headers,
messages,
encoded_image,
encoded_depth_image,
corners,
center_point_pixel,
curr_coverage,
last_step_info,
direction_seg=8,
distance_seg=4):
"""
Used for depth_reasoning method. Deprecated.
"""
content=[]
corner_str_lst=[]
for corner in corners:
corner_str=f"[{corner[0]},{corner[1]}]"
corner_str_lst.append(corner_str)
corners_str=f"{', '.join(corner_str_lst)}"
# print("test corners output: \n",corners_str)
center_point_str=f"[{center_point_pixel[0]}, {center_point_pixel[1]}]"
if last_step_info is None:
coverage_message="This is the coverage of the cloth now:"+str(curr_coverage)+".\n"
text_user_prompt={
"type":"text",
"text":coverage_message+"I am providing you the processed image (image 1) of the current situation of the cloth to be smoothened. The blue points that you can see is the corners detected by Shi-Tomasi corner detector and here is their corresponding pixel:\n"+corners_str+"\n\nAnd the black point represents the center point of the cloth which is the center point of the cloth's bounding box. Its pixel is "+center_point_str+". \n\n I am also providing you the corresponding depth image (image 2) of the cloth."+"\n\nJudging from the input image , depth image and the pixel coordinates of the corners and center point, please making the inference following the strategy elaborated in the system prompt and output the result using the required format."
}
else:
coverage_change=curr_coverage-last_step_info['coverage']
coverage_message="This is the coverage of the cloth now:"+str(curr_coverage)+". With the action you predicted last time, the coverage of the fabric changed by "+str(coverage_change)+". If it's positive, the coverage increased otherwise the coverage drops.\n"
last_pick_point=last_step_info['place_pixel']
last_pick_point_str=f'[{last_pick_point[0]},{last_pick_point[1]}]'
text_user_prompt={
"type":"text",
"text":coverage_message+"I am providing you the processed image (image 1) of the current situation of the cloth to be smoothened. The blue points that you can see is the corners detected by Shi-Tomasi corner detector and here is their corresponding pixel:\n"+corners_str+"\n\nAnd the black point represents the center point of the cloth which is the center point of the cloth's bounding box. Its pixel is "+center_point_str+". \n\n I am also providing you the corresponding depth image (image 2) of the cloth."+"\n\n The red point is the pick point chosen last time. Its pixel is "+last_pick_point_str+"\n\nJudging from the input image, depth image and the pixel coordinates of the corners and center point, please making the inference following the strategy elaborated in the system prompt and output the result using the required format."
}
if self.goal_config:
goal_config_information="\nTo help you with the task while planning, the image also has a white rectangular box around the cloth representing the goal configuration of the cloth which is the flattened cloth's outline. Please use it for reference"
# goal_config_information could have the pixel values of the bounding box
text_user_prompt["text"]+=goal_config_information
content.append(text_user_prompt)
image_user_prompt={
"type":"image_url",
"image_url":{
"url":f"data:image/jpeg;base64,{encoded_image}",
"detail":"high"
}
}
content.append(image_user_prompt)
image_user_prompt_depth={
"type":"image_url",
"image_url":{
"url":f"data:image/jpeg;base64,{encoded_depth_image}",
"detail":"high"
}
}
content.append(image_user_prompt_depth)
message={
"role":"user",
"content":content,
}
messages.append(message)
new_messages=[messages[0]]
new_messages.append(message)
pick_point,place_point,pick_pixel,place_pixel,response_message=self.get_pick_place(messages=messages,headers=headers)
messages.append(
{
"role":"assistant",
"content":response_message,
}
)
if self.re_consider:
recon_message=self.recal(place_pixel=place_pixel,pick_pixel=pick_pixel,center=center_point_pixel,img=self._step_image)
messages.append({
"role":"user",
"content":recon_message,
})
pick_point,place_point,pick_pixel,place_pixel,response_message=self.get_pick_place(messages=messages,headers=headers)
messages.append(
{
"role":"assistant",
"content":response_message,
}
)
self._step_image=self.vis_result(place_pixel=place_pixel,pick_pixel=pick_pixel,img=self._step_image)
cv.imwrite(self.paths['processed vis image'],self._step_image)
self.depth_image=self.vis_result(place_pixel=place_pixel,pick_pixel=pick_pixel,img=self.depth_image)
cv.imwrite(self.paths['processed vis depth'],self._step_image)
raw_vis_image=self.vis_result(place_pixel=place_pixel,pick_pixel=pick_pixel,img=cv.imread(self.paths["raw image"]))
cv.imwrite(self.paths['raw vis image'],raw_vis_image)
raw_vis_depth=self.vis_result(place_pixel=place_pixel,pick_pixel=pick_pixel,img=cv.imread(self.paths["raw depth"]))
cv.imwrite(self.paths['raw vis depth'],raw_vis_depth)
last_step_info={
"pick_pixel":pick_pixel,
"place_pixel":place_pixel,
}
return pick_point,place_point,messages,last_step_info
def single_step(self, frames, last_step_info=None,corner_limit=10,need_box=True, direction_seg=8, distance_seg=4, specifier="init"):
"""
This is used to generate the pick-and-place action for a single step manually.
We used this to generate the demonstration data for the ICL and finetuning.
"""
default_pos=np.array([0.0,0.2,0.0]).squeeze()
operation_height=0.1
self._specifier=specifier
self.paths={
"raw image":osp.join(self.obs_dir,"raw_image_"+specifier+".png"),
"raw depth":osp.join(self.obs_dir,"raw_depth_image_"+specifier+".png"),
"processed image":osp.join(self.obs_dir,"processed_image_"+specifier+".png"),
"processed depth":osp.join(self.obs_dir,"processed_depth_image_"+specifier+".png"),
"raw vis image":osp.join(self.obs_dir,"Raw_vis_result_"+specifier+".png"),
"raw vis depth":osp.join(self.obs_dir,"Raw_vis_result_depth_"+specifier+".png"),
"processed vis image":osp.join(self.obs_dir,"Vis_result_"+specifier+".png"),
"processed vis depth":osp.join(self.obs_dir,"Vis_result_depth_"+specifier+".png"),
}
# step 0.a : get and save obs before interaction:
obs=self.env.get_rgbd()
image=(obs[:,:,:-1]*255).astype(np.uint8)
image_raw=Image.fromarray(image)
image_raw.save(self.paths['raw image'])# Raw image (image 1)
depth=np.round(obs[:,:,3:].squeeze(),3)
self.depth=depth
self.depth_image=self.map_depth_to_image(depth_array=depth)
raw_depth_image=Image.fromarray(self.depth_image)
raw_depth_image.save(self.paths['raw depth'])# Raw depth (image 2)
self.pixel_coords=camera_utils.get_world_coords(rgb=image,depth=depth,env=self.env)[:,:,:-1]
# step 0.b : to process the image and depth image
image=self.aug_background(image,depth)
img_path=self.save_obs(rgbd=obs,image=image,specifier=specifier)
depth_image_path=self.save_obs(image=self.depth_image,specifier="depth_"+specifier)
# step 0.c: get the corners on the image as well as the corner coordinates
corners,img=self.get_corners_img(img_path=img_path,depth=depth,specifier=specifier,corner_limit=corner_limit)# The imgs will have corners marked at this stage
# step 0.d: get the center point via bounding box of the fabric:
center_point_pixel,preprocessed_img=self.get_center_point_bounding_box(img_path=img_path,depth=depth,need_box=need_box)