Before you start, remember to import this:
import pyspark.sql.functions as F
from pyspark.sql.functions import col, expr
%%time
all_halos = spark.read.parquet('/user/csiaafpr/RockstarExtendedParquet/')
CPU times: user 3.03 ms, sys: 2.84 ms, total: 5.87 ms
halos = all_halos.where(col('redshift') == 0)
The list of available redshifts can be found here: UCHUU Snapshot Redshift correspondences
We can also select several redshifts:
halos = all_halos.where((col('redshift') == 1.54) | (col('redshift') == 0.49))
Or even a range of redshits:
halos = all_halos.where((col('redshift') < 1.54) & (col('redshift') > 0.49))
If you are familiar with SQL you can also express the condition as a SQL predicate:
halos = all_halos.where('redshift > 0.10 and redshift < 0.50')
Select HOST halos for Z=0 (redshift=0) in the Mvir range cmass_min - cmass_max
cmass_min = 2.00e15
cmass_max = 2.03e15
hosts = all_halos.where((col('redshift') == 0.49)
& (col('pid') == -1)
& (col('Mvir') > cmass_min)
& (col('Mvir') < cmass_max))
In the most generic case, we can even restrict the halos selected indicating additional conditions they must fullfil and selecting just part of the columns of the dataframe, including additional computed ones and taking just a random sample of the halos:
halos = (all_halos.where((col('redshift') == 1.54)
& (col('pid') == -1)
& (col('Mvir') > 1.0e14)
& (col('Mvir') < 1.3e14)
& (col('Xoff')/col('Rvir') < 0.05)
& (col('Spin') < 0.03))
.select('id', 'x', 'y', 'z', 'vx', 'vy', 'vz', 'Mvir', 'Rvir', expr('Rvir/Rs_Klypin'))
.sample(0.08))
Count the number of halos we have selected:
%%time
halos.count()
CPU times: user 5 ms, sys: 1.21 ms, total: 6.21 ms
%%time
halos.show(2)
+--------------+------------------+-------+-------+------+------+-------+--------+-------+------------------+ | id| x| y| z| vx| vy| vz| Mvir| Rvir|(Rvir / Rs_Klypin)| +--------------+------------------+-------+-------+------+------+-------+--------+-------+------------------+ | 4226292660622|2.8652900000000003|623.168|388.151|-41.45|-352.1|-256.26|1.013E14| 1140.7| 6.73086568361922| |81020308687922| 563.068|1758.17|1832.21|-13.99| 91.62| -99.75|1.135E14|1184.58| 6.28584467132214| +--------------+------------------+-------+-------+------+------+-------+--------+-------+------------------+ only showing top 2 rows CPU times: user 6.52 ms, sys: 1.64 ms, total: 8.16 ms Wall time: 37.4 s
halos.write.parquet('halos')
Parquet is the recommended format to save data.
halos = spark.read.parquet('halos')
The results will be stored in HDFS.
halos.write.csv('halos-csv')
halos.write.json('halos-json')