1use crate::io::watermark::pq_writer;
20use crate::io::{ArrowSnafu, ExportCfg, ParquetSnafu, StdIOSnafu};
21use crate::linalg::allocator::Allocator;
22use crate::linalg::{DefaultAllocator, DimName};
23use crate::md::trajectory::Interpolatable;
24use crate::md::StateParameter;
25use crate::od::estimate::*;
26use crate::State;
27use crate::{od::*, Spacecraft};
28use anise::ephemerides::ephemeris::{Covariance, Ephemeris, EphemerisRecord};
29use arrow::array::{Array, BooleanBuilder, Float64Builder, StringBuilder};
30use arrow::datatypes::{DataType, Field, Schema};
31use arrow::record_batch::RecordBatch;
32use hifitime::TimeScale;
33use log::{info, warn};
34use msr::sensitivity::TrackerSensitivity;
35use nalgebra::Const;
36use parquet::arrow::ArrowWriter;
37use snafu::prelude::*;
38use std::collections::HashMap;
39use std::fs::File;
40use std::path::{Path, PathBuf};
41
42use super::ODSolution;
43
44impl<MsrSize: DimName, Trk: TrackerSensitivity<Spacecraft, Spacecraft>>
45 ODSolution<Spacecraft, KfEstimate<Spacecraft>, MsrSize, Trk>
46where
47 DefaultAllocator: Allocator<MsrSize>
48 + Allocator<MsrSize, <Spacecraft as State>::Size>
49 + Allocator<Const<1>, MsrSize>
50 + Allocator<<Spacecraft as State>::Size>
51 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>
52 + Allocator<MsrSize, MsrSize>
53 + Allocator<MsrSize, <Spacecraft as State>::Size>
54 + Allocator<<Spacecraft as State>::Size, MsrSize>
55 + Allocator<<Spacecraft as State>::Size>
56 + Allocator<<Spacecraft as State>::VecLength>
57 + Allocator<<Spacecraft as State>::Size, <Spacecraft as State>::Size>,
58{
59 pub fn to_parquet<P: AsRef<Path>>(&self, path: P, cfg: ExportCfg) -> Result<PathBuf, ODError> {
61 ensure!(
62 !self.estimates.is_empty(),
63 TooFewMeasurementsSnafu {
64 need: 1_usize,
65 action: "exporting OD results"
66 }
67 );
68
69 if self.estimates.len() != self.residuals.len() {
70 return Err(ODError::ODConfigError {
71 source: ConfigError::InvalidConfig {
72 msg: format!(
73 "Estimates ({}) and residuals ({}) are not aligned.",
74 self.estimates.len(),
75 self.residuals.len()
76 ),
77 },
78 });
79 }
80
81 if self.estimates.len() != self.gains.len() {
82 return Err(ODError::ODConfigError {
83 source: ConfigError::InvalidConfig {
84 msg: format!(
85 "Estimates ({}) and filter gains ({}) are not aligned.",
86 self.estimates.len(),
87 self.gains.len()
88 ),
89 },
90 });
91 }
92
93 if self.estimates.len() != self.filter_smoother_ratios.len() {
94 return Err(ODError::ODConfigError {
95 source: ConfigError::InvalidConfig {
96 msg: format!(
97 "Estimates ({}) and filter-smoother ratios ({}) are not aligned.",
98 self.estimates.len(),
99 self.filter_smoother_ratios.len()
100 ),
101 },
102 });
103 }
104
105 let tick = Epoch::now().unwrap();
106 info!("Exporting orbit determination result to parquet file...");
107
108 if cfg.step.is_some() {
109 warn!("The `step` parameter in the export is not supported for orbit determination exports.");
110 }
111
112 let path_buf = cfg.actual_path(path);
114
115 let mut hdrs = vec![Field::new("Epoch (UTC)", DataType::Utf8, false)];
117
118 let frame = self.estimates[0].state().frame();
119
120 let more_meta = Some(vec![
121 (
122 "Frame".to_string(),
123 serde_dhall::serialize(&frame)
124 .static_type_annotation()
125 .to_string()
126 .map_err(|e| ODError::ODIOError {
127 source: InputOutputError::SerializeDhall {
128 what: format!("frame `{frame}`"),
129 err: e.to_string(),
130 },
131 })?,
132 ),
133 (
134 "SRP Area (m2)".to_string(),
135 self.estimates
136 .first()
137 .as_ref()
138 .unwrap()
139 .nominal_state()
140 .srp
141 .area_m2
142 .to_string(),
143 ),
144 (
145 "Drag Area (m2)".to_string(),
146 self.estimates
147 .first()
148 .as_ref()
149 .unwrap()
150 .nominal_state()
151 .drag
152 .area_m2
153 .to_string(),
154 ),
155 ]);
156
157 let mut fields = match cfg.fields {
158 Some(fields) => fields,
159 None => Spacecraft::export_params(),
160 };
161
162 fields.retain(|param| match self.estimates[0].state().value(*param) {
164 Ok(_) => param != &StateParameter::GuidanceMode(),
165 Err(_) => false,
166 });
167
168 for field in &fields {
169 hdrs.push(field.to_field(more_meta.clone()));
170 }
171
172 let mut sigma_fields = fields.clone();
173 sigma_fields.retain(|param| matches!(param, StateParameter::Element(_oe)));
175
176 for field in &sigma_fields {
177 hdrs.push(field.to_cov_field(more_meta.clone()));
178 }
179
180 let state_items = ["X", "Y", "Z", "Vx", "Vy", "Vz", "Cr", "Cd", "Mass"];
181 let state_units = [
182 "km", "km", "km", "km/s", "km/s", "km/s", "unitless", "unitless", "kg",
183 ];
184 let mut cov_units = vec![];
185
186 for i in 0..state_items.len() {
187 for j in i..state_items.len() {
188 let cov_unit = if i < 3 {
189 if j < 3 {
190 "km^2"
191 } else if (3..6).contains(&j) {
192 "km^2/s"
193 } else if j == 8 {
194 "km*kg"
195 } else {
196 "km"
197 }
198 } else if (3..6).contains(&i) {
199 if (3..6).contains(&j) {
200 "km^2/s^2"
201 } else if j == 8 {
202 "km/s*kg"
203 } else {
204 "km/s"
205 }
206 } else if i == 8 || j == 8 {
207 "kg^2"
208 } else {
209 "unitless"
210 };
211
212 cov_units.push(cov_unit);
213 }
214 }
215
216 let est_size = <Spacecraft as State>::Size::dim();
217
218 let mut idx = 0;
219 for i in 0..state_items.len() {
220 for j in i..state_items.len() {
221 hdrs.push(Field::new(
222 format!(
223 "Covariance {}*{} ({frame:x}) ({})",
224 state_items[i], state_items[j], cov_units[idx]
225 ),
226 DataType::Float64,
227 false,
228 ));
229 idx += 1;
230 }
231 }
232
233 for (i, coord) in state_items.iter().enumerate() {
235 hdrs.push(Field::new(
236 format!("Sigma {coord} ({frame:x}) ({})", state_units[i]),
237 DataType::Float64,
238 false,
239 ));
240 }
241
242 for (i, coord) in state_items.iter().enumerate().take(6) {
244 hdrs.push(Field::new(
245 format!("Sigma {coord} (RIC) ({})", state_units[i]),
246 DataType::Float64,
247 false,
248 ));
249 }
250
251 let mut msr_fields = Vec::new();
253 for f in &self.measurement_types {
254 msr_fields.push(
255 f.to_field()
256 .with_nullable(true)
257 .with_name(format!("Prefit residual: {f:?} ({})", f.unit())),
258 );
259 }
260 for j in 0..MsrSize::DIM {
261 msr_fields.push(Field::new(
262 format!("Whitened residual #{j}"),
263 DataType::Float64,
264 true,
265 ));
266 }
267 for f in &self.measurement_types {
268 msr_fields.push(
269 f.to_field()
270 .with_nullable(true)
271 .with_name(format!("Postfit residual: {f:?} ({})", f.unit())),
272 );
273 }
274 for f in &self.measurement_types {
275 msr_fields.push(
276 f.to_field()
277 .with_nullable(true)
278 .with_name(format!("Measurement noise: {f:?} ({})", f.unit())),
279 );
280 }
281 for f in &self.measurement_types {
282 msr_fields.push(
283 f.to_field()
284 .with_nullable(true)
285 .with_name(format!("Real observation: {f:?} ({})", f.unit())),
286 );
287 }
288 for f in &self.measurement_types {
289 msr_fields.push(
290 f.to_field()
291 .with_nullable(true)
292 .with_name(format!("Computed observation: {f:?} ({})", f.unit())),
293 );
294 }
295
296 msr_fields.push(Field::new("Residual ratio", DataType::Float64, true));
297 msr_fields.push(Field::new("Residual Rejected", DataType::Boolean, true));
298 msr_fields.push(Field::new("Tracker", DataType::Utf8, true));
299
300 hdrs.append(&mut msr_fields);
301
302 if self.measurement_types.len() == MsrSize::DIM {
304 for i in 0..state_items.len() {
305 for f in &self.measurement_types {
306 hdrs.push(Field::new(
307 format!(
308 "Gain {}*{f:?} ({}*{})",
309 state_items[i],
310 cov_units[i],
311 f.unit()
312 ),
313 DataType::Float64,
314 true,
315 ));
316 }
317 }
318 } else {
319 for state_item in &state_items {
320 for j in 0..MsrSize::DIM {
321 hdrs.push(Field::new(
322 format!("Gain {state_item}*[{j}]"),
323 DataType::Float64,
324 true,
325 ));
326 }
327 }
328 }
329
330 for i in 0..state_items.len() {
332 hdrs.push(Field::new(
333 format!(
334 "Filter-smoother ratio {} ({})",
335 state_items[i], cov_units[i],
336 ),
337 DataType::Float64,
338 true,
339 ));
340 }
341
342 let schema = Arc::new(Schema::new(hdrs));
344 let mut record: Vec<Arc<dyn Array>> = Vec::new();
345
346 let (estimates, residuals) =
348 if cfg.start_epoch.is_some() || cfg.end_epoch.is_some() || cfg.step.is_some() {
349 let start = cfg
351 .start_epoch
352 .unwrap_or_else(|| self.estimates.first().unwrap().state().epoch());
353 let end = cfg
354 .end_epoch
355 .unwrap_or_else(|| self.estimates.last().unwrap().state().epoch());
356
357 let mut residuals: Vec<Option<Residual<MsrSize>>> =
358 Vec::with_capacity(self.residuals.len());
359 let mut estimates = Vec::with_capacity(self.estimates.len());
360
361 for (estimate, residual) in self.estimates.iter().zip(self.residuals.iter()) {
362 if estimate.epoch() >= start && estimate.epoch() <= end {
363 estimates.push(*estimate);
364 residuals.push(residual.clone());
365 }
366 }
367
368 (estimates, residuals)
369 } else {
370 (self.estimates.to_vec(), self.residuals.to_vec())
371 };
372
373 let mut utc_epoch = StringBuilder::new();
377 for s in &estimates {
378 utc_epoch.append_value(s.epoch().to_time_scale(TimeScale::UTC).to_isoformat());
379 }
380 record.push(Arc::new(utc_epoch.finish()));
381
382 for field in fields {
384 let mut data = Float64Builder::new();
385 for s in &estimates {
386 data.append_value(
387 s.state()
388 .value(field)
389 .context(ODStateSnafu { action: "export" })?,
390 );
391 }
392 record.push(Arc::new(data.finish()));
393 }
394
395 for field in sigma_fields {
397 let mut data = Float64Builder::new();
398 for s in &estimates {
399 if let StateParameter::Element(oe) = field {
400 data.append_value(s.sigma_for(oe).unwrap());
401 }
402 }
403 record.push(Arc::new(data.finish()));
404 }
405
406 for i in 0..est_size {
408 for j in i..est_size {
409 let mut data = Float64Builder::new();
410 for s in &estimates {
411 data.append_value(s.covar()[(i, j)]);
412 }
413 record.push(Arc::new(data.finish()));
414 }
415 }
416
417 for i in 0..est_size {
419 let mut data = Float64Builder::new();
420 for s in &estimates {
421 data.append_value(s.covar()[(i, i)].sqrt());
422 }
423 record.push(Arc::new(data.finish()));
424 }
425
426 let mut ric_covariances = Vec::new();
428
429 for s in &estimates {
430 let dcm_ric2inertial = s
431 .state()
432 .orbit()
433 .dcm_from_ric_to_inertial()
434 .unwrap()
435 .state_dcm();
436
437 let cov = s.covar();
439 let orbit_cov = cov.fixed_view::<6, 6>(0, 0);
440
441 let ric_covar = dcm_ric2inertial * orbit_cov * dcm_ric2inertial.transpose();
443 ric_covariances.push(ric_covar);
444 }
445
446 for i in 0..6 {
448 let mut data = Float64Builder::new();
449 for cov in ric_covariances.iter().take(estimates.len()) {
450 data.append_value(cov[(i, i)].sqrt());
451 }
452 record.push(Arc::new(data.finish()));
453 }
454
455 for msr_type in &self.measurement_types {
458 let mut data = Float64Builder::new();
459 for resid_opt in &residuals {
460 if let Some(resid) = resid_opt {
461 match resid.prefit(*msr_type) {
462 Some(prefit) => data.append_value(prefit),
463 None => data.append_null(),
464 };
465 } else {
466 data.append_null();
467 }
468 }
469 record.push(Arc::new(data.finish()));
470 }
471
472 for j in 0..MsrSize::DIM {
474 let mut data = Float64Builder::new();
475 for resid_opt in &residuals {
476 if let Some(resid) = resid_opt {
477 data.append_value(resid.whitened_resid[j])
478 } else {
479 data.append_null();
480 }
481 }
482 record.push(Arc::new(data.finish()));
483 }
484
485 for msr_type in &self.measurement_types {
487 let mut data = Float64Builder::new();
488 for resid_opt in &residuals {
489 if let Some(resid) = resid_opt {
490 match resid.postfit(*msr_type) {
491 Some(postfit) => data.append_value(postfit),
492 None => data.append_null(),
493 };
494 } else {
495 data.append_null();
496 }
497 }
498 record.push(Arc::new(data.finish()));
499 }
500 for msr_type in &self.measurement_types {
502 let mut data = Float64Builder::new();
503 for resid_opt in &residuals {
504 if let Some(resid) = resid_opt {
505 match resid.trk_noise(*msr_type) {
506 Some(noise) => data.append_value(noise),
507 None => data.append_null(),
508 };
509 } else {
510 data.append_null();
511 }
512 }
513 record.push(Arc::new(data.finish()));
514 }
515 for msr_type in &self.measurement_types {
517 let mut data = Float64Builder::new();
518 for resid_opt in &residuals {
519 if let Some(resid) = resid_opt {
520 match resid.real_obs(*msr_type) {
521 Some(postfit) => data.append_value(postfit),
522 None => data.append_null(),
523 };
524 } else {
525 data.append_null();
526 }
527 }
528 record.push(Arc::new(data.finish()));
529 }
530 for msr_type in &self.measurement_types {
532 let mut data = Float64Builder::new();
533 for resid_opt in &residuals {
534 if let Some(resid) = resid_opt {
535 match resid.computed_obs(*msr_type) {
536 Some(postfit) => data.append_value(postfit),
537 None => data.append_null(),
538 };
539 } else {
540 data.append_null();
541 }
542 }
543 record.push(Arc::new(data.finish()));
544 }
545 let mut data = Float64Builder::new();
547 for resid_opt in &residuals {
548 if let Some(resid) = resid_opt {
549 data.append_value(resid.ratio);
550 } else {
551 data.append_null();
552 }
553 }
554 record.push(Arc::new(data.finish()));
555
556 let mut data = BooleanBuilder::new();
558 for resid_opt in &residuals {
559 if let Some(resid) = resid_opt {
560 data.append_value(resid.rejected);
561 } else {
562 data.append_null();
563 }
564 }
565 record.push(Arc::new(data.finish()));
566
567 let mut data = StringBuilder::new();
569 for resid_opt in &residuals {
570 if let Some(resid) = resid_opt {
571 data.append_value(
572 resid
573 .tracker
574 .clone()
575 .unwrap_or("Undefined tracker".to_string()),
576 );
577 } else {
578 data.append_null();
579 }
580 }
581 record.push(Arc::new(data.finish()));
582
583 for i in 0..est_size {
585 for j in 0..MsrSize::DIM {
586 let mut data = Float64Builder::new();
587 for opt_k in &self.gains {
588 if let Some(k) = opt_k {
589 data.append_value(k[(i, j)]);
590 } else {
591 data.append_null();
592 }
593 }
594 record.push(Arc::new(data.finish()));
595 }
596 }
597
598 for i in 0..est_size {
600 let mut data = Float64Builder::new();
601 for opt_fsr in &self.filter_smoother_ratios {
602 if let Some(fsr) = opt_fsr {
603 data.append_value(fsr[i]);
604 } else {
605 data.append_null();
606 }
607 }
608 record.push(Arc::new(data.finish()));
609 }
610
611 info!("Serialized {} estimates and residuals", estimates.len());
612
613 let mut metadata = HashMap::new();
615 metadata.insert(
616 "Purpose".to_string(),
617 "Orbit determination results".to_string(),
618 );
619 if let Some(add_meta) = cfg.metadata {
620 for (k, v) in add_meta {
621 metadata.insert(k, v);
622 }
623 }
624
625 let props = pq_writer(Some(metadata));
626
627 let file = File::create(&path_buf)
628 .context(StdIOSnafu {
629 action: "creating OD results file",
630 })
631 .context(ODIOSnafu)?;
632
633 let mut writer = ArrowWriter::try_new(file, schema.clone(), props)
634 .context(ParquetSnafu {
635 action: "exporting OD results",
636 })
637 .context(ODIOSnafu)?;
638
639 let batch = RecordBatch::try_new(schema, record)
640 .context(ArrowSnafu {
641 action: "writing OD results (building batch record)",
642 })
643 .context(ODIOSnafu)?;
644
645 writer
646 .write(&batch)
647 .context(ParquetSnafu {
648 action: "writing OD results",
649 })
650 .context(ODIOSnafu)?;
651
652 writer
653 .close()
654 .context(ParquetSnafu {
655 action: "closing OD results file",
656 })
657 .context(ODIOSnafu)?;
658
659 let tock_time = Epoch::now().unwrap() - tick;
661 info!(
662 "Orbit determination results written to {} in {tock_time}",
663 path_buf.display()
664 );
665 Ok(path_buf)
666 }
667
668 pub fn to_ephemeris(&self, object_id: String) -> Ephemeris {
670 let mut ephem = Ephemeris::new(object_id);
671
672 for estimate in &self.estimates {
673 let covar = Covariance {
674 matrix: estimate.covar.fixed_view::<6, 6>(0, 0).into(),
675 local_frame: anise::ephemerides::ephemeris::LocalFrame::Inertial,
676 };
677 let rcrd = EphemerisRecord {
678 orbit: estimate.orbital_state(),
679 covar: Some(covar),
680 };
681 ephem.insert(rcrd);
682 }
683
684 ephem
685 }
686}