|
0
|
1 |
import os, unittest |
|
|
2 |
from django.contrib.gis.geos import * |
|
|
3 |
from django.contrib.gis.db.models import Collect, Count, Extent, F, Union |
|
29
|
4 |
from django.contrib.gis.geometry.backend import Geometry |
|
|
5 |
from django.contrib.gis.tests.utils import mysql, oracle, postgis, spatialite, no_mysql, no_oracle, no_spatialite |
|
0
|
6 |
from django.conf import settings |
|
|
7 |
from models import City, Location, DirectoryEntry, Parcel, Book, Author |
|
|
8 |
|
|
|
9 |
cities = (('Aurora', 'TX', -97.516111, 33.058333), |
|
|
10 |
('Roswell', 'NM', -104.528056, 33.387222), |
|
|
11 |
('Kecksburg', 'PA', -79.460734, 40.18476), |
|
|
12 |
) |
|
|
13 |
|
|
|
14 |
class RelatedGeoModelTest(unittest.TestCase): |
|
|
15 |
|
|
|
16 |
def test01_setup(self): |
|
|
17 |
"Setting up for related model tests." |
|
|
18 |
for name, state, lon, lat in cities: |
|
|
19 |
loc = Location.objects.create(point=Point(lon, lat)) |
|
|
20 |
c = City.objects.create(name=name, state=state, location=loc) |
|
|
21 |
|
|
|
22 |
def test02_select_related(self): |
|
|
23 |
"Testing `select_related` on geographic models (see #7126)." |
|
|
24 |
qs1 = City.objects.all() |
|
|
25 |
qs2 = City.objects.select_related() |
|
|
26 |
qs3 = City.objects.select_related('location') |
|
|
27 |
|
|
|
28 |
for qs in (qs1, qs2, qs3): |
|
|
29 |
for ref, c in zip(cities, qs): |
|
|
30 |
nm, st, lon, lat = ref |
|
|
31 |
self.assertEqual(nm, c.name) |
|
|
32 |
self.assertEqual(st, c.state) |
|
|
33 |
self.assertEqual(Point(lon, lat), c.location.point) |
|
|
34 |
|
|
|
35 |
@no_mysql |
|
|
36 |
def test03_transform_related(self): |
|
|
37 |
"Testing the `transform` GeoQuerySet method on related geographic models." |
|
|
38 |
# All the transformations are to state plane coordinate systems using |
|
|
39 |
# US Survey Feet (thus a tolerance of 0 implies error w/in 1 survey foot). |
|
|
40 |
tol = 0 |
|
|
41 |
|
|
|
42 |
def check_pnt(ref, pnt): |
|
|
43 |
self.assertAlmostEqual(ref.x, pnt.x, tol) |
|
|
44 |
self.assertAlmostEqual(ref.y, pnt.y, tol) |
|
|
45 |
self.assertEqual(ref.srid, pnt.srid) |
|
|
46 |
|
|
|
47 |
# Each city transformed to the SRID of their state plane coordinate system. |
|
|
48 |
transformed = (('Kecksburg', 2272, 'POINT(1490553.98959621 314792.131023984)'), |
|
|
49 |
('Roswell', 2257, 'POINT(481902.189077221 868477.766629735)'), |
|
|
50 |
('Aurora', 2276, 'POINT(2269923.2484839 7069381.28722222)'), |
|
|
51 |
) |
|
|
52 |
|
|
|
53 |
for name, srid, wkt in transformed: |
|
|
54 |
# Doing this implicitly sets `select_related` select the location. |
|
|
55 |
# TODO: Fix why this breaks on Oracle. |
|
|
56 |
qs = list(City.objects.filter(name=name).transform(srid, field_name='location__point')) |
|
|
57 |
check_pnt(GEOSGeometry(wkt, srid), qs[0].location.point) |
|
|
58 |
|
|
|
59 |
@no_mysql |
|
|
60 |
@no_spatialite |
|
|
61 |
def test04a_related_extent_aggregate(self): |
|
|
62 |
"Testing the `extent` GeoQuerySet aggregates on related geographic models." |
|
|
63 |
# This combines the Extent and Union aggregates into one query |
|
|
64 |
aggs = City.objects.aggregate(Extent('location__point')) |
|
|
65 |
|
|
|
66 |
# One for all locations, one that excludes Roswell. |
|
|
67 |
all_extent = (-104.528060913086, 33.0583305358887,-79.4607315063477, 40.1847610473633) |
|
|
68 |
txpa_extent = (-97.51611328125, 33.0583305358887,-79.4607315063477, 40.1847610473633) |
|
|
69 |
e1 = City.objects.extent(field_name='location__point') |
|
|
70 |
e2 = City.objects.exclude(name='Roswell').extent(field_name='location__point') |
|
|
71 |
e3 = aggs['location__point__extent'] |
|
|
72 |
|
|
|
73 |
# The tolerance value is to four decimal places because of differences |
|
|
74 |
# between the Oracle and PostGIS spatial backends on the extent calculation. |
|
|
75 |
tol = 4 |
|
|
76 |
for ref, e in [(all_extent, e1), (txpa_extent, e2), (all_extent, e3)]: |
|
|
77 |
for ref_val, e_val in zip(ref, e): self.assertAlmostEqual(ref_val, e_val, tol) |
|
|
78 |
|
|
|
79 |
@no_mysql |
|
|
80 |
def test04b_related_union_aggregate(self): |
|
|
81 |
"Testing the `unionagg` GeoQuerySet aggregates on related geographic models." |
|
|
82 |
# This combines the Extent and Union aggregates into one query |
|
|
83 |
aggs = City.objects.aggregate(Union('location__point')) |
|
|
84 |
|
|
|
85 |
# These are the points that are components of the aggregate geographic |
|
|
86 |
# union that is returned. |
|
|
87 |
p1 = Point(-104.528056, 33.387222) |
|
|
88 |
p2 = Point(-97.516111, 33.058333) |
|
|
89 |
p3 = Point(-79.460734, 40.18476) |
|
|
90 |
|
|
|
91 |
# Creating the reference union geometry depending on the spatial backend, |
|
|
92 |
# as Oracle will have a different internal ordering of the component |
|
|
93 |
# geometries than PostGIS. The second union aggregate is for a union |
|
|
94 |
# query that includes limiting information in the WHERE clause (in other |
|
|
95 |
# words a `.filter()` precedes the call to `.unionagg()`). |
|
29
|
96 |
if oracle: |
|
0
|
97 |
ref_u1 = MultiPoint(p3, p1, p2, srid=4326) |
|
|
98 |
ref_u2 = MultiPoint(p3, p2, srid=4326) |
|
|
99 |
else: |
|
|
100 |
ref_u1 = MultiPoint(p1, p2, p3, srid=4326) |
|
|
101 |
ref_u2 = MultiPoint(p2, p3, srid=4326) |
|
|
102 |
|
|
|
103 |
u1 = City.objects.unionagg(field_name='location__point') |
|
|
104 |
u2 = City.objects.exclude(name='Roswell').unionagg(field_name='location__point') |
|
|
105 |
u3 = aggs['location__point__union'] |
|
|
106 |
|
|
|
107 |
self.assertEqual(ref_u1, u1) |
|
|
108 |
self.assertEqual(ref_u2, u2) |
|
|
109 |
self.assertEqual(ref_u1, u3) |
|
|
110 |
|
|
|
111 |
def test05_select_related_fk_to_subclass(self): |
|
|
112 |
"Testing that calling select_related on a query over a model with an FK to a model subclass works" |
|
|
113 |
# Regression test for #9752. |
|
|
114 |
l = list(DirectoryEntry.objects.all().select_related()) |
|
|
115 |
|
|
|
116 |
def test06_f_expressions(self): |
|
|
117 |
"Testing F() expressions on GeometryFields." |
|
|
118 |
# Constructing a dummy parcel border and getting the City instance for |
|
|
119 |
# assigning the FK. |
|
|
120 |
b1 = GEOSGeometry('POLYGON((-97.501205 33.052520,-97.501205 33.052576,-97.501150 33.052576,-97.501150 33.052520,-97.501205 33.052520))', srid=4326) |
|
|
121 |
pcity = City.objects.get(name='Aurora') |
|
|
122 |
|
|
|
123 |
# First parcel has incorrect center point that is equal to the City; |
|
|
124 |
# it also has a second border that is different from the first as a |
|
|
125 |
# 100ft buffer around the City. |
|
|
126 |
c1 = pcity.location.point |
|
|
127 |
c2 = c1.transform(2276, clone=True) |
|
|
128 |
b2 = c2.buffer(100) |
|
|
129 |
p1 = Parcel.objects.create(name='P1', city=pcity, center1=c1, center2=c2, border1=b1, border2=b2) |
|
|
130 |
|
|
|
131 |
# Now creating a second Parcel where the borders are the same, just |
|
|
132 |
# in different coordinate systems. The center points are also the |
|
|
133 |
# the same (but in different coordinate systems), and this time they |
|
|
134 |
# actually correspond to the centroid of the border. |
|
|
135 |
c1 = b1.centroid |
|
|
136 |
c2 = c1.transform(2276, clone=True) |
|
|
137 |
p2 = Parcel.objects.create(name='P2', city=pcity, center1=c1, center2=c2, border1=b1, border2=b1) |
|
|
138 |
|
|
|
139 |
# Should return the second Parcel, which has the center within the |
|
|
140 |
# border. |
|
|
141 |
qs = Parcel.objects.filter(center1__within=F('border1')) |
|
|
142 |
self.assertEqual(1, len(qs)) |
|
|
143 |
self.assertEqual('P2', qs[0].name) |
|
|
144 |
|
|
29
|
145 |
if not mysql: |
|
0
|
146 |
# This time center2 is in a different coordinate system and needs |
|
|
147 |
# to be wrapped in transformation SQL. |
|
|
148 |
qs = Parcel.objects.filter(center2__within=F('border1')) |
|
|
149 |
self.assertEqual(1, len(qs)) |
|
|
150 |
self.assertEqual('P2', qs[0].name) |
|
|
151 |
|
|
|
152 |
# Should return the first Parcel, which has the center point equal |
|
|
153 |
# to the point in the City ForeignKey. |
|
|
154 |
qs = Parcel.objects.filter(center1=F('city__location__point')) |
|
|
155 |
self.assertEqual(1, len(qs)) |
|
|
156 |
self.assertEqual('P1', qs[0].name) |
|
|
157 |
|
|
29
|
158 |
if not mysql: |
|
0
|
159 |
# This time the city column should be wrapped in transformation SQL. |
|
|
160 |
qs = Parcel.objects.filter(border2__contains=F('city__location__point')) |
|
|
161 |
self.assertEqual(1, len(qs)) |
|
|
162 |
self.assertEqual('P1', qs[0].name) |
|
|
163 |
|
|
|
164 |
def test07_values(self): |
|
|
165 |
"Testing values() and values_list() and GeoQuerySets." |
|
|
166 |
# GeoQuerySet and GeoValuesQuerySet, and GeoValuesListQuerySet respectively. |
|
|
167 |
gqs = Location.objects.all() |
|
|
168 |
gvqs = Location.objects.values() |
|
|
169 |
gvlqs = Location.objects.values_list() |
|
|
170 |
|
|
|
171 |
# Incrementing through each of the models, dictionaries, and tuples |
|
|
172 |
# returned by the different types of GeoQuerySets. |
|
|
173 |
for m, d, t in zip(gqs, gvqs, gvlqs): |
|
|
174 |
# The values should be Geometry objects and not raw strings returned |
|
|
175 |
# by the spatial database. |
|
29
|
176 |
self.failUnless(isinstance(d['point'], Geometry)) |
|
|
177 |
self.failUnless(isinstance(t[1], Geometry)) |
|
0
|
178 |
self.assertEqual(m.point, d['point']) |
|
|
179 |
self.assertEqual(m.point, t[1]) |
|
|
180 |
|
|
|
181 |
def test08_defer_only(self): |
|
|
182 |
"Testing defer() and only() on Geographic models." |
|
|
183 |
qs = Location.objects.all() |
|
|
184 |
def_qs = Location.objects.defer('point') |
|
|
185 |
for loc, def_loc in zip(qs, def_qs): |
|
|
186 |
self.assertEqual(loc.point, def_loc.point) |
|
|
187 |
|
|
|
188 |
def test09_pk_relations(self): |
|
|
189 |
"Ensuring correct primary key column is selected across relations. See #10757." |
|
|
190 |
# Adding two more cities, but this time making sure that their location |
|
|
191 |
# ID values do not match their City ID values. |
|
|
192 |
loc1 = Location.objects.create(point='POINT (-95.363151 29.763374)') |
|
|
193 |
loc2 = Location.objects.create(point='POINT (-96.801611 32.782057)') |
|
|
194 |
dallas = City.objects.create(name='Dallas', state='TX', location=loc2) |
|
|
195 |
houston = City.objects.create(name='Houston', state='TX', location=loc1) |
|
|
196 |
|
|
|
197 |
# The expected ID values -- notice the last two location IDs |
|
|
198 |
# are out of order. We want to make sure that the related |
|
|
199 |
# location ID column is selected instead of ID column for |
|
|
200 |
# the city. |
|
|
201 |
city_ids = (1, 2, 3, 4, 5) |
|
|
202 |
loc_ids = (1, 2, 3, 5, 4) |
|
|
203 |
ids_qs = City.objects.order_by('id').values('id', 'location__id') |
|
|
204 |
for val_dict, c_id, l_id in zip(ids_qs, city_ids, loc_ids): |
|
|
205 |
self.assertEqual(val_dict['id'], c_id) |
|
|
206 |
self.assertEqual(val_dict['location__id'], l_id) |
|
|
207 |
|
|
|
208 |
def test10_combine(self): |
|
|
209 |
"Testing the combination of two GeoQuerySets. See #10807." |
|
|
210 |
buf1 = City.objects.get(name='Aurora').location.point.buffer(0.1) |
|
|
211 |
buf2 = City.objects.get(name='Kecksburg').location.point.buffer(0.1) |
|
|
212 |
qs1 = City.objects.filter(location__point__within=buf1) |
|
|
213 |
qs2 = City.objects.filter(location__point__within=buf2) |
|
|
214 |
combined = qs1 | qs2 |
|
|
215 |
names = [c.name for c in combined] |
|
|
216 |
self.assertEqual(2, len(names)) |
|
|
217 |
self.failUnless('Aurora' in names) |
|
|
218 |
self.failUnless('Kecksburg' in names) |
|
|
219 |
|
|
|
220 |
def test11_geoquery_pickle(self): |
|
|
221 |
"Ensuring GeoQuery objects are unpickled correctly. See #10839." |
|
|
222 |
import pickle |
|
|
223 |
from django.contrib.gis.db.models.sql import GeoQuery |
|
|
224 |
qs = City.objects.all() |
|
|
225 |
q_str = pickle.dumps(qs.query) |
|
|
226 |
q = pickle.loads(q_str) |
|
|
227 |
self.assertEqual(GeoQuery, q.__class__) |
|
|
228 |
|
|
|
229 |
# TODO: fix on Oracle -- get the following error because the SQL is ordered |
|
|
230 |
# by a geometry object, which Oracle apparently doesn't like: |
|
|
231 |
# ORA-22901: cannot compare nested table or VARRAY or LOB attributes of an object type |
|
|
232 |
@no_oracle |
|
|
233 |
def test12a_count(self): |
|
|
234 |
"Testing `Count` aggregate use with the `GeoManager` on geo-fields." |
|
|
235 |
# Creating a new City, 'Fort Worth', that uses the same location |
|
|
236 |
# as Dallas. |
|
|
237 |
dallas = City.objects.get(name='Dallas') |
|
|
238 |
ftworth = City.objects.create(name='Fort Worth', state='TX', location=dallas.location) |
|
29
|
239 |
|
|
0
|
240 |
# Count annotation should be 2 for the Dallas location now. |
|
|
241 |
loc = Location.objects.annotate(num_cities=Count('city')).get(id=dallas.location.id) |
|
|
242 |
self.assertEqual(2, loc.num_cities) |
|
|
243 |
|
|
|
244 |
def test12b_count(self): |
|
|
245 |
"Testing `Count` aggregate use with the `GeoManager` on non geo-fields. See #11087." |
|
|
246 |
# Creating some data for the Book/Author non-geo models that |
|
|
247 |
# use GeoManager. See #11087. |
|
|
248 |
tp = Author.objects.create(name='Trevor Paglen') |
|
|
249 |
Book.objects.create(title='Torture Taxi', author=tp) |
|
|
250 |
Book.objects.create(title='I Could Tell You But Then You Would Have to be Destroyed by Me', author=tp) |
|
|
251 |
Book.objects.create(title='Blank Spots on the Map', author=tp) |
|
|
252 |
wp = Author.objects.create(name='William Patry') |
|
|
253 |
Book.objects.create(title='Patry on Copyright', author=wp) |
|
|
254 |
|
|
|
255 |
# Should only be one author (Trevor Paglen) returned by this query, and |
|
|
256 |
# the annotation should have 3 for the number of books. Also testing |
|
|
257 |
# with a `GeoValuesQuerySet` (see #11489). |
|
|
258 |
qs = Author.objects.annotate(num_books=Count('books')).filter(num_books__gt=1) |
|
|
259 |
vqs = Author.objects.values('name').annotate(num_books=Count('books')).filter(num_books__gt=1) |
|
|
260 |
self.assertEqual(1, len(qs)) |
|
|
261 |
self.assertEqual(3, qs[0].num_books) |
|
|
262 |
self.assertEqual(1, len(vqs)) |
|
|
263 |
self.assertEqual(3, vqs[0]['num_books']) |
|
|
264 |
|
|
|
265 |
# TODO: The phantom model does appear on Oracle. |
|
|
266 |
@no_oracle |
|
|
267 |
def test13_select_related_null_fk(self): |
|
|
268 |
"Testing `select_related` on a nullable ForeignKey via `GeoManager`. See #11381." |
|
|
269 |
no_author = Book.objects.create(title='Without Author') |
|
|
270 |
b = Book.objects.select_related('author').get(title='Without Author') |
|
|
271 |
# Should be `None`, and not a 'dummy' model. |
|
|
272 |
self.assertEqual(None, b.author) |
|
|
273 |
|
|
|
274 |
@no_mysql |
|
|
275 |
@no_oracle |
|
|
276 |
@no_spatialite |
|
|
277 |
def test14_collect(self): |
|
|
278 |
"Testing the `collect` GeoQuerySet method and `Collect` aggregate." |
|
|
279 |
# Reference query: |
|
29
|
280 |
# SELECT AsText(ST_Collect("relatedapp_location"."point")) FROM "relatedapp_city" LEFT OUTER JOIN |
|
|
281 |
# "relatedapp_location" ON ("relatedapp_city"."location_id" = "relatedapp_location"."id") |
|
0
|
282 |
# WHERE "relatedapp_city"."state" = 'TX'; |
|
|
283 |
ref_geom = fromstr('MULTIPOINT(-97.516111 33.058333,-96.801611 32.782057,-95.363151 29.763374,-96.801611 32.782057)') |
|
29
|
284 |
|
|
0
|
285 |
c1 = City.objects.filter(state='TX').collect(field_name='location__point') |
|
|
286 |
c2 = City.objects.filter(state='TX').aggregate(Collect('location__point'))['location__point__collect'] |
|
|
287 |
|
|
|
288 |
for coll in (c1, c2): |
|
|
289 |
# Even though Dallas and Ft. Worth share same point, Collect doesn't |
|
|
290 |
# consolidate -- that's why 4 points in MultiPoint. |
|
|
291 |
self.assertEqual(4, len(coll)) |
|
|
292 |
self.assertEqual(ref_geom, coll) |
|
|
293 |
|
|
|
294 |
# TODO: Related tests for KML, GML, and distance lookups. |
|
|
295 |
|
|
|
296 |
def suite(): |
|
|
297 |
s = unittest.TestSuite() |
|
|
298 |
s.addTest(unittest.makeSuite(RelatedGeoModelTest)) |
|
|
299 |
return s |