Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

835 #7

Merged
merged 13 commits into from
Jul 25, 2024
46 changes: 41 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,37 @@ filename
from edi
) fgs
) trnx
) clms
) clms;

--Create a "Claims Header" table

drop table if exists claim_header;
create table claim_header as
select filename,
tax_id,
sender,
transaction_type,
clms.claim_header.*,
clms.diagnosis.*,
clms.patient.*,
clms.payer.*,
clms.providers.*
from stg_claims
clms.providers.*,
clms.patient.name as patient_name,
clms.patient.patient_relationship_cd,
clms.patient.street as patient_street,
clms.patient.city as patient_city,
clms.patient.zip as patient_zip,
clms.patient.dob as patient_dob,
clms.patient.dob_format as patient_dob_format,
clms.patient.gender_cd as patient_gender_cd,
clms.subscriber.subsciber_identifier,
clms.subscriber.name as subscriber_name,
clms.subscriber.subscriber_relationship_cd,
clms.subscriber.street as subscriber_street,
clms.subscriber.city as subscriber_city,
clms.subscriber.zip as subscriber_zip,
clms.subscriber.dob as subscriber_dob,
clms.subscriber.dob_format as subscriber_dob_format,
clms.subscriber.gender_cd as subscriber_gender_cd
from stg_claims;

--Create a "Claim Line" table
create table claim_line as
Expand All @@ -97,6 +113,26 @@ from stg_claims
![image](images/claim_header.png?raw=true)
![image](images/claim_line.png?raw=true)

### 835 sample

```python
df = spark.read.text("sampledata/835/*txt", wholetext = True)

rdd = (
df.withColumn("filename", input_file_name()).rdd
.map(lambda x: (x.asDict().get("filename"),x.asDict().get("value")))
.map(lambda x: (x[0], EDI(x[1])))
.map(lambda x: { **{'filename': x[0]}, **hm.to_json(x[1])} )
.map(lambda x: json.dumps(x))
)
claims = spark.read.json(rdd)

#Create Claims tables from the EDI transactions
#...
```
![image](images/remittance.png?raw=true)


## Different EDI Formats

Default format used is AnsiX12 (* as a delim and ~ as segment separator)
Expand Down
16 changes: 10 additions & 6 deletions databricksx12/edi.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,25 @@ def segment_count(self):
#
# Returns all segments matching segment_name
#
def segments_by_name(self, segment_name, range_start=-1, range_end=None):
return [x for i,x in enumerate(self.data) if x.segment_name() == segment_name and range_start <= i <= (range_end or len(self.data))]
def segments_by_name(self, segment_name, range_start=-1, range_end=None, data = None):
if data is None:
data = self.data
return [x for i,x in enumerate(data) if x.segment_name() == segment_name and range_start <= i <= (range_end or len(data))]

#
# Returns a tuple of all segments matching segment_name and their index
#
def segments_by_name_index(self, segment_name, range_start=-1, range_end = None):
return [(i,x) for i,x in enumerate(self.data) if x.segment_name() == segment_name and range_start <= i <= (range_end or len(self.data))]
def segments_by_name_index(self, segment_name, range_start=-1, range_end = None, data = None):
if data is None:
data = self.data
return [(i,x) for i,x in enumerate(data) if x.segment_name() == segment_name and range_start <= i <= (range_end or len(data))]

#
# Return the first occurence of the specified index
#
def index_of_segment(self, segments, segment_name):
def index_of_segment(self, segments, segment_name, search_start_idx=0):
try:
return min([(i) for i,x in enumerate(segments) if x.segment_name() == segment_name])
return min([(i) for i,x in enumerate(segments) if x.segment_name() == segment_name and i >=search_start_idx])
except:
return -1 #not found

Expand Down
Loading
Loading