Particle Filters
State Model
Generates a new state from the current state. A slightly random walk that bounces off walls.
# x, y, v_x, v_y
function next_state(state::NTuple{4, Number})::NTuple{4, Number}
x = state[1]
y = state[2]
v_x = state[3] + (rand(rng)-0.5)/100
v_y = state[4] + (rand(rng)-0.5)/100
if (0 < state[1] < 50) && (0 < state[2] < 50)
# lower wall
elseif (0 < state[1] < 50) && (state[2] <= 0)
v_y = -1 * v_y
# upper wall
elseif (0 < state[1] < 50) && (state[2] >= 50)
v_y = -1 * v_y
# left wall
elseif (state[1] <= 0) && (0 < state[2] < 50)
v_x = -1 * v_x
# right wall
elseif (state[1] >= 50) && (0 < state[2] < 50)
v_x = -1 * v_x
# corner
else
v_x = -1 * v_x
v_y = -1 * v_y
end
v_y = max(-1,min(1, v_y))
v_x = max(-1,min(1, v_x))
return (x + v_x, y + v_y, v_x, v_y)
end;
Data
Generate some data– a random walk using the state function
Animation of random walk:
Observation Model
A circle that “observes” how close the target is to the center of the circle. This is a noisy observation!
function observation(state::NTuple{4,Number}, center=(25,25))::Int
x = state[1]
y = state[2]
dist = sqrt( (state[1]-center[1])^2 + (state[2]-center[2])^2) + rand(rng, (-5:5)) # distance + noise
if dist < 5
return 1
elseif dist < 10
return 2
elseif dist < 15
return 3
elseif dist < 20
return 4
end
return 5
end
observation (generic function with 2 methods)
Animation of Sensor
Code
dimension = 600
centers = [(25, 25) for i in 1:steps]
anim = @animate for i in 1:steps
o = observation(states[i], centers[i])
scatter(data[i], markercolor=colors[24], ms=5, lab="", alpha = 1, xlim=(0,50), ylim=(0, 50))
scatter!(centers[i], markercolor=colors[2], ms=dimension/10, lab="", alpha = (o == 1 ? 0.25 : 0.08),
xlim=(0,50), ylim=(0, 50))
scatter!(centers[i], markercolor=colors[5], ms=dimension/5, lab="", alpha = (o == 2 ? 0.15 : 0.08),
xlim=(0,50), ylim=(0, 50))
scatter!(centers[i], markercolor=colors[8], ms=dimension/3.25, lab="", alpha = (o == 3 ? 0.15 : 0.08),
xlim=(0,50), ylim=(0, 50))
scatter!(centers[i], markercolor=colors[12], ms=dimension/2.375, lab="", alpha = (o == 4 ? 0.15 : 0.08),
xlim=(0,50), ylim=(0, 50))
plot!(size=(dimension,dimension))
end
gif(anim, fps=15);
Particles
Each particle is a generated models of how the target behaves.
Particles, animated. They aren’t being filtered, so they’re just random model instances.
Code
particles_data = [ [(particles[j][i][1], particles[j][i][2]) for i in 1:length(particles[j])] for j in 1:length(particles)];
anim = @animate for i in 1:steps
scatter(data[i], markercolor=colors[15], ms=6, lab="", alpha = 1, xlim=(0,50), ylim=(0, 50))
for j in 1:length(particles_data)
scatter!(particles_data[j][i], markercolor=colors[24], ms=3, lab="", alpha = 1, xlim=(0,50), ylim=(0, 50))
end
plot!(size=(dimension,dimension))
end
gif(anim, fps=14)
Observation Probabilities
Recall that the CDF is \(P(X \leq x)\)
We are calcualting \(P(O|S)\)
- \(O\) is the observation
- \(S\) is the state
function p_obs(obs::Int, state::NTuple{2, Number}, center=(25, 25))::Float64
# center = (25,25)
dist = sqrt( (state[1]-center[1])^2 + (state[2]-center[2])^2 ) # true distance
probs = []
for i in 5:5:20 # start:increment:stop (different from Python)
push!(probs,cdf_u5(dist, i))
end
push!(probs, 1)
probs = [i > 1 ? max(j-probs[i-1],0) : j for (i, j) in enumerate(probs)]
return probs[obs]
end;
Generative Model for Filter
The generative model for the filter doesn’t need to be the same as the “true” model, as long as it captures everything that is possible in the true model.
Code
function next_state_filter(state::NTuple{4, Number})::NTuple{4, Number}
x = state[1]
y = state[2]
v_x = state[3] + (rand(rng)-0.5)/30
v_y = state[4] + (rand(rng)-0.5)/30
if (0 < state[1] < 50) && (0 < state[2] < 50)
# lower wall
elseif (0 < state[1] < 50) && (state[2] <= 0)
v_y = -1 * v_y
# upper wall
elseif (0 < state[1] < 50) && (state[2] >= 50)
v_y = -1 * v_y
# left wall
elseif (state[1] <= 0) && (0 < state[2] < 50)
v_x = -1 * v_x
# right wall
elseif (state[1] >= 50) && (0 < state[2] < 50)
v_x = -1 * v_x
# corner
else
v_x = -1 * v_x
v_y = -1 * v_y
end
v_y = max(-1,min(1, v_y))
v_x = max(-1,min(1, v_x))
return (x + v_x, y + v_y, v_x, v_y)
end;
Our first particle filter.
num_particles = 16000
particles = []
particles = [(rand(rng)*50, rand(rng)*50, (rand(rng)-0.5), (rand(rng)-0.5)) for i in 1:num_particles]
particle_steps = []
sensor_center = (25, 25)
new_particles = particles
for step in 1:steps
current_observation = observation(states[step], sensor_center)
weights = [p_obs(current_observation, (particles[i][1], particles[i][2]), sensor_center) for i in 1:num_particles]
while sum(weights) == 0
particles = [(rand(rng)*50, rand(rng)*50, (rand(rng)-0.5), (rand(rng)-0.5)) for i in 1:num_particles]
weights = [p_obs(current_observation, (particles[i][1], particles[i][2]), sensor_center) for i in 1:num_particles]
print('!')
end
new_particles = []
for particle in 1:num_particles
if weights[particle] != 0
new_particle = particles[particle]
else
new_particle = sample(rng, particles, Weights(weights))
end
new_particle = next_state_filter(new_particle)
push!(new_particles, new_particle)
end
particles = new_particles
push!(particle_steps, new_particles)
end;
Animating the particle filter.
Code
particles_data = [[(particle_steps[i][j][1], particle_steps[i][j][2]) for j in 1:length(particles)] for i in 1:steps];
anim = @animate for i in 1:steps # steps
scatter(data[i], markercolor=colors[15], ms=6, lab="", alpha = 1, xlim=(0,50), ylim=(0, 50))
for j in 1:100:length(particles_data[1])
scatter!(particles_data[i][j], markercolor=colors[24], ms=2, lab="", alpha = 0.25, xlim=(0,50), ylim=(0, 50))
end
o = observation(states[i], centers[i])
scatter!(centers[i], markercolor=colors[2], ms=dimension/10, lab="", alpha = (o == 1 ? 0.25 : 0.08),
xlim=(0,50), ylim=(0, 50))
scatter!(centers[i], markercolor=colors[5], ms=dimension/5, lab="", alpha = (o == 2 ? 0.15 : 0.08),
xlim=(0,50), ylim=(0, 50))
scatter!(centers[i], markercolor=colors[8], ms=dimension/3.25, lab="", alpha = (o == 3 ? 0.15 : 0.08),
xlim=(0,50), ylim=(0, 50))
scatter!(centers[i], markercolor=colors[12], ms=dimension/2.375, lab="", alpha = (o == 4 ? 0.15 : 0.08),
xlim=(0,50), ylim=(0, 50))
plot!(size=(dimension,dimension))
plot!(size=(dimension,dimension))
end
gif(anim, fps=12)
We can use the same model with more than one sensor.
num_particles = 12000
particles = []
particles = [(rand(rng)*50, rand(rng)*50, (rand(rng)-0.5), (rand(rng)-0.5)) for i in 1:num_particles]
particle_steps = []
centers = []
push!(centers,(10, 15))
push!(centers,(40, 15))
new_particles = particles
for step in 1:steps
current_observations = [observation(states[step], centers[i]) for i in 1:2]
weights= [[p_obs(current_observations[j], (particles[i][1], particles[i][2]), centers[j]) for i in 1:num_particles] for j in 1:2]
weights = exp.(sum([log.(weights[i]) for i in 1:2]))
while sum(weights) == 0
particles = [(rand(rng)*50, rand(rng)*50, (rand(rng)-0.5), (rand(rng)-0.5)) for i in 1:num_particles]
current_observations = [observation(states[step], centers[i]) for i in 1:2]
weights= [[p_obs(current_observations[j], (particles[i][1], particles[i][2]), centers[j])
for i in 1:num_particles] for j in 1:2]
weights = exp.(sum([log.(weights[j]) for j in 1:2]))
print('!')
end
new_particles = []
for particle in 1:num_particles
if weights[particle] != 0
new_particle = particles[particle]
else
new_particle = sample(rng, particles, Weights(weights))
end
new_particle = next_state_filter(new_particle)
push!(new_particles, new_particle)
end
particles = new_particles
push!(particle_steps, new_particles)
end
#| code-fold: true
particles_data = [[(particle_steps[i][j][1], particle_steps[i][j][2]) for j in 1:length(particles)] for i in 1:steps];
anim = @animate for i in 1:steps # steps
scatter(data[i], markercolor=colors[15], ms=6, lab="", alpha = 1, xlim=(0,50), ylim=(0, 50))
for j in 1:50:length(particles_data[1])
scatter!(particles_data[i][j], markercolor=colors[24], ms=2, lab="", alpha = 0.25, xlim=(0,50), ylim=(0, 50))
end
o = [observation(states[i], centers[k]) for k in 1:2]
for k in 1:2
scatter!(centers[k], markercolor=colors[2], ms=dimension/10, lab="", alpha = (o[k] == 1 ? 0.25 : 0.08),
xlim=(0,50), ylim=(0, 50))
scatter!(centers[k], markercolor=colors[5], ms=dimension/5, lab="", alpha = (o[k] == 2 ? 0.15 : 0.08),
xlim=(0,50), ylim=(0, 50))
scatter!(centers[k], markercolor=colors[8], ms=dimension/3.25, lab="", alpha = (o[k] == 3 ? 0.15 : 0.08),
xlim=(0,50), ylim=(0, 50))
scatter!(centers[k], markercolor=colors[12], ms=dimension/2.375, lab="", alpha = (o[k] == 4 ? 0.15 : 0.08),
xlim=(0,50), ylim=(0, 50))
end
plot!(size=(dimension,dimension))
end
gif(anim, fps=15)
The model works with arbitrary sensor configurations. Here, a third sensor is added.
num_particles = 16000
particles = []
particles = [(rand(rng)*50, rand(rng)*50, (rand(rng)-0.5), (rand(rng)-0.5)) for i in 1:num_particles]
particle_steps = []
centers = []
push!(centers,(10, 15))
push!(centers,(40, 15))
push!(centers,(25, 35))
new_particles = particles
for step in 1:steps
current_observations = [observation(states[step], centers[i]) for i in 1:3]
weights= [[p_obs(current_observations[j], (particles[i][1], particles[i][2]), centers[j]) for i in 1:num_particles] for j in 1:3]
weights = exp.(sum([log.(weights[i]) for i in 1:3]))
while sum(weights) == 0
particles = [(rand(rng)*50, rand(rng)*50, (rand(rng)-0.5), (rand(rng)-0.5)) for i in 1:num_particles]
current_observations = [observation(states[step], centers[i]) for i in 1:3]
weights= [[p_obs(current_observations[j], (particles[i][1], particles[i][2]), centers[j])
for i in 1:num_particles] for j in 1:3]
weights = exp.(sum([log.(weights[j]) for j in 1:3]))
print('!')
end
new_particles = []
for particle in 1:num_particles
if weights[particle] != 0
new_particle = particles[particle]
else
new_particle = sample(rng, particles, Weights(weights))
end
new_particle = next_state_filter(new_particle)
push!(new_particles, new_particle)
end
particles = new_particles
push!(particle_steps, new_particles)
end
Code
particles_data = [[(particle_steps[i][j][1], particle_steps[i][j][2]) for j in 1:length(particles)] for i in 1:steps];
anim = @animate for i in 1:steps # steps
scatter(data[i], markercolor=colors[15], ms=6, lab="", alpha = 1, xlim=(0,50), ylim=(0, 50))
for j in 1:25:length(particles_data[1])
scatter!(particles_data[i][j], markercolor=colors[24], ms=2, lab="", alpha = 0.25, xlim=(0,50), ylim=(0, 50))
end
o = [observation(states[i], centers[k]) for k in 1:3]
for k in 1:3
scatter!(centers[k], markercolor=colors[2], ms=dimension/10, lab="", alpha = (o[k] == 1 ? 0.25 : 0.08),
xlim=(0,50), ylim=(0, 50))
scatter!(centers[k], markercolor=colors[5], ms=dimension/5, lab="", alpha = (o[k] == 2 ? 0.15 : 0.08),
xlim=(0,50), ylim=(0, 50))
scatter!(centers[k], markercolor=colors[8], ms=dimension/3.25, lab="", alpha = (o[k] == 3 ? 0.15 : 0.08),
xlim=(0,50), ylim=(0, 50))
scatter!(centers[k], markercolor=colors[12], ms=dimension/2.375, lab="", alpha = (o[k] == 4 ? 0.15 : 0.08),
xlim=(0,50), ylim=(0, 50))
end
plot!(size=(dimension,dimension))
end
gif(anim, fps=15)