anki_vector.connection¶
Management of the connection to and from Vector.
Functions
|
A decorator generator used internally to denote which functions will run on the connection thread. |
Classes
|
Creates and maintains a aiogrpc connection including managing the connection thread. |
Enum used to specify the priority level for the program. |
-
class
anki_vector.connection.
ControlPriorityLevel
¶ Enum used to specify the priority level for the program.
-
DEFAULT_PRIORITY
= 20¶ Runs below Mandatory Physical Reactions such as tucking Vector’s head and arms during a fall, yet above Trigger-Word Detection. Default for normal operation.
-
OVERRIDE_BEHAVIORS_PRIORITY
= 10¶ Runs above mandatory physical reactions, will drive off table, perform while on a slope, ignore low battery state, work in the dark, etc.
-
RESERVE_CONTROL
= 30¶ Holds control of robot before/after other SDK connections Used to disable idle behaviors. Not to be used for regular behavior control.
-
-
class
anki_vector.connection.
Connection
(name, host, cert_file, guid, behavior_control_level=<ControlPriorityLevel.DEFAULT_PRIORITY: 20>)¶ Creates and maintains a aiogrpc connection including managing the connection thread. The connection thread decouples the actual messaging layer from the user’s main thread, and requires any network requests to be ran using
asyncio.run_coroutine_threadsafe()
to make them run on the other thread. Connection provides two helper functions for running a function on the connection thread:run_coroutine()
andrun_soon()
.This class may be used to bypass the structures of the python sdk handled by
Robot
, and instead talk to aiogrpc more directly.The values for the cert_file location and the guid can be found in your home directory in the sdk_config.ini file.
import anki_vector # Connect to your Vector conn = anki_vector.connection.Connection("Vector-XXXX", "XX.XX.XX.XX:443", "/path/to/file.cert", "<guid>") conn.connect() # Run your commands async def play_animation(): # Run your commands anim = anki_vector.messaging.protocol.Animation(name="anim_pounce_success_02") anim_request = anki_vector.messaging.protocol.PlayAnimationRequest(animation=anim) return await conn.grpc_interface.PlayAnimation(anim_request) # This needs to be run in an asyncio loop conn.run_coroutine(play_animation()).result() # Close the connection conn.close()
- Parameters
name (
str
) – Vector’s name in the format of “Vector-XXXX”.host (
str
) – The IP address and port of Vector in the format “XX.XX.XX.XX:443”.cert_file (
str
) – The location of the certificate file on disk.guid (
str
) – Your robot’s unique secret key.behavior_control_level (
ControlPriorityLevel
) – pass one ofControlPriorityLevel
priority levels if the connection requires behavior control, or None to decline control.
-
property
behavior_control_level
¶ Returns the specific
ControlPriorityLevel
requested for behavior control.To be able to directly control Vector’s motors, override his screen, play an animation, etc., the
Connection
will need behavior control. This property identifies the enumerated level of behavior control that the SDK will maintain over the robot.For more information about behavior control, see behavior.
import anki_vector with anki_vector.Robot() as robot: print(robot.conn.behavior_control_level) # Will print ControlPriorityLevel.DEFAULT_PRIORITY robot.conn.release_control() print(robot.conn.behavior_control_level) # Will print None
- Return type
-
close
()¶ Cleanup the connection, and shutdown all the event handlers.
Usually this should be invoked by the Robot class when it closes.
import anki_vector # Connect to your Vector conn = anki_vector.connection.Connection("Vector-XXXX", "XX.XX.XX.XX:443", "/path/to/file.cert", "<guid>") conn.connect() # Run your commands async def play_animation(): # Run your commands anim = anki_vector.messaging.protocol.Animation(name="anim_pounce_success_02") anim_request = anki_vector.messaging.protocol.PlayAnimationRequest(animation=anim) return await conn.grpc_interface.PlayAnimation(anim_request) # This needs to be run in an asyncio loop conn.run_coroutine(play_animation()).result() # Close the connection conn.close()
-
connect
(timeout=10.0)¶ Connect to Vector. This will start the connection thread which handles all messages between Vector and Python.
import anki_vector # Connect to your Vector conn = anki_vector.connection.Connection("Vector-XXXX", "XX.XX.XX.XX:443", "/path/to/file.cert", "<guid>") conn.connect() # Run your commands async def play_animation(): # Run your commands anim = anki_vector.messaging.protocol.Animation(name="anim_pounce_success_02") anim_request = anki_vector.messaging.protocol.PlayAnimationRequest(animation=anim) return await conn.grpc_interface.PlayAnimation(anim_request) # This needs to be run in an asyncio loop conn.run_coroutine(play_animation()).result() # Close the connection conn.close()
- Parameters
timeout (
float
) – The time allotted to attempt a connection, in seconds.- Return type
None
-
property
control_granted_event
¶ This provides an
asyncio.Event
that a user maywait()
upon to detect when Vector has given control of the behavior system to the SDK program.import anki_vector async def wait_for_control(conn: anki_vector.connection.Connection): await conn.control_granted_event.wait() # Run commands that require behavior control
- Return type
Event
-
property
control_lost_event
¶ This provides an
asyncio.Event
that a user maywait()
upon to detect when Vector has taken control of the behavior system at a higher priority.import anki_vector async def auto_reconnect(conn: anki_vector.connection.Connection): await conn.control_lost_event.wait() conn.request_control()
- Return type
Event
-
property
grpc_interface
¶ A direct reference to the connected aiogrpc interface.
This may be used to directly call grpc messages bypassing
anki_vector.Robot
import anki_vector # Connect to your Vector conn = anki_vector.connection.Connection("Vector-XXXX", "XX.XX.XX.XX:443", "/path/to/file.cert", "<guid>") conn.connect() # Run your commands async def play_animation(): # Run your commands anim = anki_vector.messaging.protocol.Animation(name="anim_pounce_success_02") anim_request = anki_vector.messaging.protocol.PlayAnimationRequest(animation=anim) return await conn.grpc_interface.PlayAnimation(anim_request) # This needs to be run in an asyncio loop conn.run_coroutine(play_animation()).result() # Close the connection conn.close()
- Return type
ExternalInterfaceStub
-
property
loop
¶ A direct reference to the loop on the connection thread. Can be used to run functions in on thread.
import anki_vector import asyncio async def connection_function(): print("I'm running in the connection thread event loop.") with anki_vector.Robot() as robot: asyncio.run_coroutine_threadsafe(connection_function(), robot.conn.loop)
- Return type
BaseEventLoop
- Returns
The loop running inside the connection thread
-
release_control
(timeout=10.0)¶ Explicitly release control. Typically used after detecting
control_lost_event()
.To be able to directly control Vector’s motors, override his screen, play an animation, etc., the
Connection
will need behavior control. This function will release control of Vector’s behavior system. This will raise aVectorControlTimeoutException
if it fails to receive a control_lost event before the timeout.import anki_vector async def wait_for_control(conn: anki_vector.connection.Connection): await conn.control_granted_event.wait() # Run commands that require behavior control conn.release_control()
- Parameters
timeout (
float
) – The time allotted to attempt to release control, in seconds.
-
request_control
(behavior_control_level=<ControlPriorityLevel.DEFAULT_PRIORITY: 20>, timeout=10.0)¶ Explicitly request behavior control. Typically used after detecting
control_lost_event()
.To be able to directly control Vector’s motors, override his screen, play an animation, etc., the
Connection
will need behavior control. This function will acquire control of Vector’s behavior system. This will raise aVectorControlTimeoutException
if it fails to gain control before the timeout.For more information about behavior control, see behavior
import anki_vector async def auto_reconnect(conn: anki_vector.connection.Connection): await conn.control_lost_event.wait() conn.request_control(timeout=5.0)
- Parameters
timeout (
float
) – The time allotted to attempt a connection, in seconds.behavior_control_level (
ControlPriorityLevel
) – request control of Vector’s behavior system at a specific level of control. SeeControlPriorityLevel
for more information.
-
property
requires_behavior_control
¶ True if the
Connection
requires behavior control.To be able to directly control Vector’s motors, override his screen, play an animation, etc., the
Connection
will need behavior control. This boolean signifies that theConnection
will try to maintain control of Vector’s behavior system even after losing control to higher priority robot behaviors such as returning home to charge a low battery.For more information about behavior control, see behavior.
import time import anki_vector def callback(robot, event_type, event): robot.conn.request_control() print(robot.conn.requires_behavior_control) # Will print True robot.anim.play_animation_trigger('GreetAfterLongTime') robot.conn.release_control() with anki_vector.Robot(behavior_control_level=None) as robot: print(robot.conn.requires_behavior_control) # Will print False robot.events.subscribe(callback, anki_vector.events.Events.robot_observed_face) # Waits 10 seconds. Show Vector your face. time.sleep(10)
- Return type
-
run_coroutine
(coro)¶ Runs a given awaitable on the connection thread’s event loop. Cannot be called from within the connection thread.
import anki_vector async def my_coroutine(): print("Running on the connection thread") return "Finished" with anki_vector.Robot() as robot: result = robot.conn.run_coroutine(my_coroutine())
-
run_soon
(coro)¶ Schedules the given awaitable to run on the event loop for the connection thread.
import anki_vector import time async def my_coroutine(): print("Running on the connection thread") with anki_vector.Robot() as robot: robot.conn.run_soon(my_coroutine()) time.sleep(1)
- Parameters
coro (
Awaitable
) – The coroutine, task or any awaitable to schedule for execution on the connection thread.- Return type
None
-
property
thread
¶ A direct reference to the connection thread. Available to callers to determine if the current thread is the connection thread.
import anki_vector import threading with anki_vector.Robot() as robot: if threading.current_thread() is robot.conn.thread: print("This code is running on the connection thread") else: print("This code is not running on the connection thread")
- Return type
- Returns
The connection thread where all of the grpc messages are being processed.
-
anki_vector.connection.
on_connection_thread
(log_messaging=True, requires_control=True, is_cancellable_behavior=False)¶ A decorator generator used internally to denote which functions will run on the connection thread. This unblocks the caller of the wrapped function and allows them to continue running while the messages are being processed.
import anki_vector class MyComponent(anki_vector.util.Component): @connection._on_connection_thread() async def on_connection_thread(self): # Do work on the connection thread
- Parameters
log_messaging (
bool
) – True if the log output should include the entire message or just the size. Recommended for large binary return values.requires_control (
bool
) – True if the function should wait until behavior control is granted before executing.is_cancellable_behavior – True if the behavior can be cancelled before it has completed.
- Return type
- Returns
A decorator which has 3 possible returns based on context: the result of the decorated function, the
concurrent.futures.Future
which points to the decorated function, or theasyncio.Future
which points to the decorated function. These contexts are: when the robot is aRobot
, when the robot is anAsyncRobot
, and when called from the connection thread respectively.