anki_vector.connection

Management of the connection to and from Vector.

Functions

on_connection_thread([log_messaging, …])

A decorator generator used internally to denote which functions will run on the connection thread.

Classes

Connection(name, host, cert_file, guid[, …])

Creates and maintains a aiogrpc connection including managing the connection thread.

ControlPriorityLevel

Enum used to specify the priority level for the program.

class anki_vector.connection.ControlPriorityLevel

Enum used to specify the priority level for the program.

DEFAULT_PRIORITY = 20

Runs below Mandatory Physical Reactions such as tucking Vector’s head and arms during a fall, yet above Trigger-Word Detection. Default for normal operation.

OVERRIDE_BEHAVIORS_PRIORITY = 10

Runs above mandatory physical reactions, will drive off table, perform while on a slope, ignore low battery state, work in the dark, etc.

RESERVE_CONTROL = 30

Holds control of robot before/after other SDK connections Used to disable idle behaviors. Not to be used for regular behavior control.

class anki_vector.connection.Connection(name, host, cert_file, guid, behavior_control_level=<ControlPriorityLevel.DEFAULT_PRIORITY: 20>)

Creates and maintains a aiogrpc connection including managing the connection thread. The connection thread decouples the actual messaging layer from the user’s main thread, and requires any network requests to be ran using asyncio.run_coroutine_threadsafe() to make them run on the other thread. Connection provides two helper functions for running a function on the connection thread: run_coroutine() and run_soon().

This class may be used to bypass the structures of the python sdk handled by Robot, and instead talk to aiogrpc more directly.

The values for the cert_file location and the guid can be found in your home directory in the sdk_config.ini file.

import anki_vector

# Connect to your Vector
conn = anki_vector.connection.Connection("Vector-XXXX", "XX.XX.XX.XX:443", "/path/to/file.cert", "<guid>")
conn.connect()
# Run your commands
async def play_animation():
    # Run your commands
    anim = anki_vector.messaging.protocol.Animation(name="anim_pounce_success_02")
    anim_request = anki_vector.messaging.protocol.PlayAnimationRequest(animation=anim)
    return await conn.grpc_interface.PlayAnimation(anim_request) # This needs to be run in an asyncio loop
conn.run_coroutine(play_animation()).result()
# Close the connection
conn.close()
Parameters
  • name (str) – Vector’s name in the format of “Vector-XXXX”.

  • host (str) – The IP address and port of Vector in the format “XX.XX.XX.XX:443”.

  • cert_file (str) – The location of the certificate file on disk.

  • guid (str) – Your robot’s unique secret key.

  • behavior_control_level (ControlPriorityLevel) – pass one of ControlPriorityLevel priority levels if the connection requires behavior control, or None to decline control.

property behavior_control_level

Returns the specific ControlPriorityLevel requested for behavior control.

To be able to directly control Vector’s motors, override his screen, play an animation, etc., the Connection will need behavior control. This property identifies the enumerated level of behavior control that the SDK will maintain over the robot.

For more information about behavior control, see behavior.

import anki_vector

with anki_vector.Robot() as robot:
    print(robot.conn.behavior_control_level) # Will print ControlPriorityLevel.DEFAULT_PRIORITY
    robot.conn.release_control()
    print(robot.conn.behavior_control_level) # Will print None
Return type

ControlPriorityLevel

close()

Cleanup the connection, and shutdown all the event handlers.

Usually this should be invoked by the Robot class when it closes.

import anki_vector

# Connect to your Vector
conn = anki_vector.connection.Connection("Vector-XXXX", "XX.XX.XX.XX:443", "/path/to/file.cert", "<guid>")
conn.connect()
# Run your commands
async def play_animation():
    # Run your commands
    anim = anki_vector.messaging.protocol.Animation(name="anim_pounce_success_02")
    anim_request = anki_vector.messaging.protocol.PlayAnimationRequest(animation=anim)
    return await conn.grpc_interface.PlayAnimation(anim_request) # This needs to be run in an asyncio loop
conn.run_coroutine(play_animation()).result()
# Close the connection
conn.close()
connect(timeout=10.0)

Connect to Vector. This will start the connection thread which handles all messages between Vector and Python.

import anki_vector

# Connect to your Vector
conn = anki_vector.connection.Connection("Vector-XXXX", "XX.XX.XX.XX:443", "/path/to/file.cert", "<guid>")
conn.connect()
# Run your commands
async def play_animation():
    # Run your commands
    anim = anki_vector.messaging.protocol.Animation(name="anim_pounce_success_02")
    anim_request = anki_vector.messaging.protocol.PlayAnimationRequest(animation=anim)
    return await conn.grpc_interface.PlayAnimation(anim_request) # This needs to be run in an asyncio loop
conn.run_coroutine(play_animation()).result()
# Close the connection
conn.close()
Parameters

timeout (float) – The time allotted to attempt a connection, in seconds.

Return type

None

property control_granted_event

This provides an asyncio.Event that a user may wait() upon to detect when Vector has given control of the behavior system to the SDK program.

import anki_vector

async def wait_for_control(conn: anki_vector.connection.Connection):
    await conn.control_granted_event.wait()
    # Run commands that require behavior control
Return type

Event

property control_lost_event

This provides an asyncio.Event that a user may wait() upon to detect when Vector has taken control of the behavior system at a higher priority.

import anki_vector

async def auto_reconnect(conn: anki_vector.connection.Connection):
    await conn.control_lost_event.wait()
    conn.request_control()
Return type

Event

property grpc_interface

A direct reference to the connected aiogrpc interface.

This may be used to directly call grpc messages bypassing anki_vector.Robot

import anki_vector

# Connect to your Vector
conn = anki_vector.connection.Connection("Vector-XXXX", "XX.XX.XX.XX:443", "/path/to/file.cert", "<guid>")
conn.connect()
# Run your commands
async def play_animation():
    # Run your commands
    anim = anki_vector.messaging.protocol.Animation(name="anim_pounce_success_02")
    anim_request = anki_vector.messaging.protocol.PlayAnimationRequest(animation=anim)
    return await conn.grpc_interface.PlayAnimation(anim_request) # This needs to be run in an asyncio loop
conn.run_coroutine(play_animation()).result()
# Close the connection
conn.close()
Return type

ExternalInterfaceStub

property loop

A direct reference to the loop on the connection thread. Can be used to run functions in on thread.

import anki_vector
import asyncio

async def connection_function():
    print("I'm running in the connection thread event loop.")

with anki_vector.Robot() as robot:
    asyncio.run_coroutine_threadsafe(connection_function(), robot.conn.loop)
Return type

BaseEventLoop

Returns

The loop running inside the connection thread

release_control(timeout=10.0)

Explicitly release control. Typically used after detecting control_lost_event().

To be able to directly control Vector’s motors, override his screen, play an animation, etc., the Connection will need behavior control. This function will release control of Vector’s behavior system. This will raise a VectorControlTimeoutException if it fails to receive a control_lost event before the timeout.

import anki_vector

async def wait_for_control(conn: anki_vector.connection.Connection):
    await conn.control_granted_event.wait()
    # Run commands that require behavior control
    conn.release_control()
Parameters

timeout (float) – The time allotted to attempt to release control, in seconds.

request_control(behavior_control_level=<ControlPriorityLevel.DEFAULT_PRIORITY: 20>, timeout=10.0)

Explicitly request behavior control. Typically used after detecting control_lost_event().

To be able to directly control Vector’s motors, override his screen, play an animation, etc., the Connection will need behavior control. This function will acquire control of Vector’s behavior system. This will raise a VectorControlTimeoutException if it fails to gain control before the timeout.

For more information about behavior control, see behavior

import anki_vector

async def auto_reconnect(conn: anki_vector.connection.Connection):
    await conn.control_lost_event.wait()
    conn.request_control(timeout=5.0)
Parameters
  • timeout (float) – The time allotted to attempt a connection, in seconds.

  • behavior_control_level (ControlPriorityLevel) – request control of Vector’s behavior system at a specific level of control. See ControlPriorityLevel for more information.

property requires_behavior_control

True if the Connection requires behavior control.

To be able to directly control Vector’s motors, override his screen, play an animation, etc., the Connection will need behavior control. This boolean signifies that the Connection will try to maintain control of Vector’s behavior system even after losing control to higher priority robot behaviors such as returning home to charge a low battery.

For more information about behavior control, see behavior.

import time

import anki_vector

def callback(robot, event_type, event):
    robot.conn.request_control()
    print(robot.conn.requires_behavior_control) # Will print True
    robot.anim.play_animation_trigger('GreetAfterLongTime')
    robot.conn.release_control()

with anki_vector.Robot(behavior_control_level=None) as robot:
    print(robot.conn.requires_behavior_control) # Will print False
    robot.events.subscribe(callback, anki_vector.events.Events.robot_observed_face)

    # Waits 10 seconds. Show Vector your face.
    time.sleep(10)
Return type

bool

run_coroutine(coro)

Runs a given awaitable on the connection thread’s event loop. Cannot be called from within the connection thread.

import anki_vector

async def my_coroutine():
    print("Running on the connection thread")
    return "Finished"

with anki_vector.Robot() as robot:
    result = robot.conn.run_coroutine(my_coroutine())
Parameters

coro (Awaitable) – The coroutine, task or any other awaitable which should be executed.

Return type

Any

Returns

The result of the awaitable’s execution.

run_soon(coro)

Schedules the given awaitable to run on the event loop for the connection thread.

import anki_vector
import time

async def my_coroutine():
    print("Running on the connection thread")

with anki_vector.Robot() as robot:
    robot.conn.run_soon(my_coroutine())
    time.sleep(1)
Parameters

coro (Awaitable) – The coroutine, task or any awaitable to schedule for execution on the connection thread.

Return type

None

property thread

A direct reference to the connection thread. Available to callers to determine if the current thread is the connection thread.

import anki_vector
import threading

with anki_vector.Robot() as robot:
    if threading.current_thread() is robot.conn.thread:
        print("This code is running on the connection thread")
    else:
        print("This code is not running on the connection thread")
Return type

Thread

Returns

The connection thread where all of the grpc messages are being processed.

anki_vector.connection.on_connection_thread(log_messaging=True, requires_control=True, is_cancellable_behavior=False)

A decorator generator used internally to denote which functions will run on the connection thread. This unblocks the caller of the wrapped function and allows them to continue running while the messages are being processed.

import anki_vector

class MyComponent(anki_vector.util.Component):
    @connection._on_connection_thread()
    async def on_connection_thread(self):
        # Do work on the connection thread
Parameters
  • log_messaging (bool) – True if the log output should include the entire message or just the size. Recommended for large binary return values.

  • requires_control (bool) – True if the function should wait until behavior control is granted before executing.

  • is_cancellable_behavior – True if the behavior can be cancelled before it has completed.

Return type

Callable[[Coroutine[Component, Any, None]], Any]

Returns

A decorator which has 3 possible returns based on context: the result of the decorated function, the concurrent.futures.Future which points to the decorated function, or the asyncio.Future which points to the decorated function. These contexts are: when the robot is a Robot, when the robot is an AsyncRobot, and when called from the connection thread respectively.